Skip to content

Commit

Permalink
feat: prevent old simulation runs from piling up [POFSP-915] (#230)
Browse files Browse the repository at this point in the history
* fix: prevent old simulation runs from piling up
* changelog & release
  • Loading branch information
polomani authored Feb 5, 2025
1 parent 06a3e09 commit 39e384e
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 16 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file.

## Release v1.0.0-beta-11 (2025-02-03)

### Features

* Introduce `connector.simulation-run-tolerance` configuration option to prevent simulation runs pile-up. The connector will time out simulation runs that are older than this value (in seconds).

## Release v1.0.0-beta-010 (2024-12-06)

### Features
Expand Down
47 changes: 47 additions & 0 deletions Cognite.Simulator.Tests/UtilsTests/SimulationRunUnitTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using CogniteSdk.Alpha;
using Cognite.Simulator.Utils;
using Xunit;
using System;
using Cognite.Extractor.Common;

namespace Cognite.Simulator.Tests.UtilsTests;

public class SimulationRunUnitTest {

[Fact]
public void TestRunReadinessTimeout (){
{
var input = new SimulationRunItem(new SimulationRun() {
CreatedTime = DateTime.UtcNow.AddSeconds(-10).ToUnixTimeMilliseconds(),
Status = SimulationRunStatus.ready,
Id = 1,
});

var ex = Assert.Throws<ConnectorException>(() => input.ValidateReadinessForExecution(1));
Assert.Equal("Simulation has timed out because it is older than 1 second(s)", ex.Message);
}

{
var input = new SimulationRunItem(new SimulationRun() {
CreatedTime = 0,
Status = SimulationRunStatus.ready,
Id = 1,
});

var ex = Assert.Throws<ConnectorException>(() => input.ValidateReadinessForExecution());
Assert.Equal("Simulation has timed out because it is older than 3600 second(s)", ex.Message);
}
}

[Fact]
public void TestRunReadinessAlreadyRunning (){
var input = new SimulationRunItem(new SimulationRun() {
CreatedTime = DateTime.UtcNow.ToUnixTimeMilliseconds(),
Status = SimulationRunStatus.running,
Id = 1,
});

var ex = Assert.Throws<ConnectorException>(() => input.ValidateReadinessForExecution());
Assert.Equal("Simulation entered unrecoverable state and failed", ex.Message);
}
}
3 changes: 1 addition & 2 deletions Cognite.Simulator.Utils/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ public class ConnectorConfig
/// The connector will run simulation run resource found on CDF that are not older than
/// this value (in seconds). In case it finds items older than this, the runs will
/// fail due to timeout
/// TODO: We should use this so that our runs dont pile up
/// </summary>
public int SimulationRunTolerance { get; set; } = 1800; // 30 min
public int SimulationRunTolerance { get; set; } = 3600; // 1h

/// <summary>
/// The connector will check if scheduled simulations should be triggered with
Expand Down
43 changes: 30 additions & 13 deletions Cognite.Simulator.Utils/SimulationRunnerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,14 @@ private async Task<SimulationRun> UpdateSimulationRunStatus(
simulationTime = simTime;
}

var message = statusMessage == null || statusMessage.Length < 255 ? statusMessage : statusMessage.Substring(0, 254);

var res = await _cdfSimulators.SimulationRunCallbackAsync(
new SimulationRunCallbackItem()
{
Id = runId,
Status = status,
StatusMessage = statusMessage,
StatusMessage = message,
SimulationTime = simulationTime
}, token).ConfigureAwait(false);

Expand Down Expand Up @@ -207,6 +209,7 @@ public async Task Run(CancellationToken token)
{
try
{
runItem.ValidateReadinessForExecution(_connectorConfig.SimulationRunTolerance);
(modelState, routineRev) = await GetModelAndRoutine(runItem, connectorIdList).ConfigureAwait(false);
if (routineRev == null || !connectorIdList.Contains(routineRev.SimulatorIntegrationExternalId))
{
Expand Down Expand Up @@ -236,14 +239,14 @@ await InitSimulationRun(
_logger.LogError(error.Message);
}
}
_logger.LogError("Simulation run failed with error: {Message}", ex);
_logger.LogWarning("Simulation run {id} failed with error: {Message}", runId, ex);
runItem.Run = await UpdateSimulationRunStatus(
runId,
SimulationRunStatus.failure,
ex.Message == null || ex.Message.Length < 255 ? ex.Message : ex.Message.Substring(0, 254),
ex.Message,
token,
runItem.RunConfiguration
).ConfigureAwait(false);
).ConfigureAwait(false);
}
finally
{
Expand Down Expand Up @@ -273,24 +276,20 @@ await InitSimulationRun(
_logger.LogError("Could not find a local model file to run Simulation run {Id}", runId);
throw new SimulationException($"Could not find a model file for {modelRevExternalId}");
}
V calcConfig = await RoutineLibrary.GetRoutineRevision(simEv.Run.RoutineRevisionExternalId).ConfigureAwait(false);
V routineRev = await RoutineLibrary.GetRoutineRevision(simEv.Run.RoutineRevisionExternalId).ConfigureAwait(false);

if (calcConfig == null)
if (routineRev == null)
{
_logger.LogError("Could not find a local configuration to run Simulation run {Id}", runId);
throw new SimulationException($"Could not find a routine revision for model: {modelRevExternalId} routineRevision: {simEv.Run.RoutineRevisionExternalId}");
}

if (!integrations.Contains(calcConfig.SimulatorIntegrationExternalId))
if (!integrations.Contains(routineRev.SimulatorIntegrationExternalId))
{
return (model, null);
}
if (simEv.Run.Status == SimulationRunStatus.running)
{
_logger.LogError("Simulation run {Id} could not finish properly. This could be due to a connector being unexpectedly stopped during the run", runId);
throw new ConnectorException("Simulation entered unrecoverable state failed");
}
return (model, calcConfig);

return (model, routineRev);
}

async void PublishConnectorStatus(ConnectorStatus status, CancellationToken token)
Expand Down Expand Up @@ -483,6 +482,24 @@ public SimulationRunItem(SimulationRun r)
{
Run = r;
}

/// <summary>
/// Validates the simulation run for readiness for execution.
/// Throws an exception if the simulation run is too old or in an invalid state
/// </summary>
/// <param name="maxAgeSeconds">Maximum age of the simulation run in seconds</param>
public void ValidateReadinessForExecution(int maxAgeSeconds = 3600) {

if (Run.CreatedTime < DateTime.UtcNow.AddSeconds(-1 * maxAgeSeconds).ToUnixTimeMilliseconds())
{
throw new ConnectorException($"Simulation has timed out because it is older than {maxAgeSeconds} second(s)");
}

if (Run.Status == SimulationRunStatus.running)
{
throw new ConnectorException("Simulation entered unrecoverable state and failed");
}
}
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.0-beta-010
1.0.0-beta-011

0 comments on commit 39e384e

Please sign in to comment.