Skip to content

Commit

Permalink
fix: retry init due to intermittent errors on extraction pipeline (#240)
Browse files Browse the repository at this point in the history
- Generally, extraction pipeline errors should not stop the connector from being initialised / running
- however, there are cases when the extraction pipeline create/byids fails on startup due to whatever reasons (the API itself is soon to be replaced with v2, so I don't think it's wise to dig for the root cause)
- This is a problem, because some people rely on the ext pipeline notifications (e.g. connector is down), and previously the pipeline would be off if the initialisation failed from the get go
- This patch allows the ext pipeline to be initialized later on runtime whenever it's needed
  • Loading branch information
polomani authored Mar 6, 2025
1 parent ffbd236 commit 800ef10
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 79 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file.

## Release v1.0.0-beta-014 (2025-03-05)

### Bug Fixes

* Resolved an issue where the connector failed to initialize the extraction pipeline on startup, causing false alerts indicating the pipeline was offline. Added support for late initialization of the extraction pipeline in case of such failures.

## Release v1.0.0-beta-013 (2025-02-17)

### Features
Expand Down
21 changes: 10 additions & 11 deletions Cognite.Simulator.Tests/UtilsTests/ConnectorRuntimeUnitTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ namespace Cognite.Simulator.Tests.UtilsTests
[Collection(nameof(SequentialTestCollection))]
public class ConnectorRuntimeUnitTest
{
private static readonly IDictionary<Func<string, bool>, (Func<HttpResponseMessage> responseFunc, int callCount, int? maxCalls)> endpointMappings =
new ConcurrentDictionary<Func<string, bool>, (Func<HttpResponseMessage>, int, int?)>(
new Dictionary<Func<string, bool>, (Func<HttpResponseMessage>, int, int?)>
{
{ uri => uri.Contains("/extpipes"), (MockExtPipesEndpoint, 0, null) },
{ uri => uri.EndsWith("/simulators/list") || uri.EndsWith("/simulators"), (MockSimulatorsEndpoint, 0, null) },
{ uri => uri.Contains("/simulators/integrations"), (MockSimulatorsIntegrationsEndpoint, 0, null) },
{ uri => uri.Contains("/simulators/routines/revisions/list"), (MockSimulatorRoutineRevEndpoint, 0, 1) }
}
);
private static readonly IList<SimpleRequestMocker> endpointMockTemplates = new List<SimpleRequestMocker>
{
new SimpleRequestMocker(uri => uri.EndsWith("/token"), MockAzureAADTokenEndpoint),
new SimpleRequestMocker(uri => uri.Contains("/extpipes"), MockExtPipesEndpoint),
new SimpleRequestMocker(uri => uri.EndsWith("/simulators/list") || uri.EndsWith("/simulators") || uri.EndsWith("/simulators/update"), MockSimulatorsEndpoint),
new SimpleRequestMocker(uri => uri.Contains("/simulators/integrations"), MockSimulatorsIntegrationsEndpoint),
new SimpleRequestMocker(uri => uri.Contains("/simulators/routines/revisions/list"), MockSimulatorRoutineRevEndpoint, 1),
new SimpleRequestMocker(uri => true, () => GoneResponse)
};

// We need to mock the HttpClientFactory to return the mocked responses
// First few requests return the mocked responses, then we return a 410 Gone
Expand All @@ -48,7 +47,7 @@ public async Task TestConnectorRuntimeWithRestart()
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(3));

var mocks = GetMockedHttpClientFactory(mockRequestsAsync(endpointMappings));
var mocks = GetMockedHttpClientFactory(MockRequestsAsync(endpointMockTemplates));
var mockFactory = mocks.factory;

DefaultConnectorRuntime<DefaultAutomationConfig, DefaultModelFilestate, DefaultModelFileStatePoco>.ConfigureServices = (services) => {
Expand Down
87 changes: 87 additions & 0 deletions Cognite.Simulator.Tests/UtilsTests/ExtPipelineUnitTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System;

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

using Moq;
using Xunit;

using Cognite.Simulator.Utils;

using static Cognite.Simulator.Tests.UtilsTests.TestUtilities;

namespace Cognite.Simulator.Tests.UtilsTests
{
[Collection(nameof(SequentialTestCollection))]
public class ExtPipelineUnitTest
{

private static readonly List<SimpleRequestMocker> endpointMappings = new List<SimpleRequestMocker>
{
new SimpleRequestMocker(uri => uri.EndsWith("/token"), MockAzureAADTokenEndpoint).ShouldBeCalled(Times.Once()),
new SimpleRequestMocker(uri => uri.EndsWith("/extpipes/byids"), MockBadRequest, 1),
new SimpleRequestMocker(uri => uri.EndsWith("/extpipes/byids"), () => OkItemsResponse(""), 2),
new SimpleRequestMocker(uri => uri.EndsWith("/extpipes"), MockBadRequest, 1),
new SimpleRequestMocker(uri => uri.EndsWith("/extpipes"), MockExtPipesEndpoint, 1).ShouldBeCalled(Times.Once()),
new SimpleRequestMocker(uri => uri.EndsWith("/extpipes/runs"), MockExtPipesEndpoint),
};

[Fact]
/// <summary>
/// Test the late initialization of the extraction pipeline.
/// 1. On the first attempt it tries to get the pipeline, the /extpipes/byids fails with a 400.
/// 2. Next try, the /extpipes/byids returns an empty response. So the connector will try to create the pipeline. Create fails with a 400.
/// 3. Next try, the /extpipes/byids returns an empty response. The connector will try to create the pipeline again. Create succeeds.
/// 4. The connector will then try to notify the pipeline, which should succeed.
/// </summary>
public async Task TestExtPipelineRetryInitRemote()
{
// Arrange
var services = new ServiceCollection();
services.AddSingleton(SeedData.SimulatorCreate);

var httpMock = GetMockedHttpClientFactory(MockRequestsAsync(endpointMappings));
var mockFactory = httpMock.factory;
services.AddSingleton(mockFactory.Object);
services.AddCogniteTestClient();

var mockedLogger = new Mock<ILogger<ExtractionPipeline>>();
services.AddSingleton(mockedLogger.Object);

var connectorConfig = new ConnectorConfig
{
NamePrefix = SeedData.TestIntegrationExternalId,
DataSetId = SeedData.TestDataSetId,
PipelineNotification = new PipelineNotificationConfig(),
};
services.AddExtractionPipeline(connectorConfig);

using var provider = services.BuildServiceProvider();

var extPipeline = provider.GetRequiredService<ExtractionPipeline>();

// Act
await extPipeline.Init(connectorConfig, CancellationToken.None);

using var tokenSource = new CancellationTokenSource();
tokenSource.CancelAfter(TimeSpan.FromSeconds(5));

await Assert.ThrowsAsync<TaskCanceledException>(() => extPipeline.PipelineUpdate(tokenSource.Token));

// Assert
VerifyLog(mockedLogger, LogLevel.Warning, "Could not find an extraction pipeline with id utils-tests-pipeline, attempting to create one", Times.AtLeast(1), true);
VerifyLog(mockedLogger, LogLevel.Warning, "Could not retrieve or create extraction pipeline from CDF: CogniteSdk.ResponseException: Bad Request", Times.AtLeast(1), true);
VerifyLog(mockedLogger, LogLevel.Debug, "Pipeline utils-tests-pipeline created successfully", Times.Once());
VerifyLog(mockedLogger, LogLevel.Debug, "Notifying extraction pipeline, status: seen", Times.AtLeastOnce());

foreach (var mocker in endpointMappings)
{
mocker.AssertCallCount();
}
}
}
}
29 changes: 10 additions & 19 deletions Cognite.Simulator.Tests/UtilsTests/FileStorageClientUnitTest.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;

Expand Down Expand Up @@ -40,17 +39,13 @@ HttpResponseMessage MockFilesDownloadEndpointLarge()
public async Task TestFileStorageClientFailMaxSizeFiles()
{

IDictionary<Func<string, bool>, (Func<HttpResponseMessage> responseFunc, int callCount, int? maxCalls)> endpointMappings =
new ConcurrentDictionary<Func<string, bool>, (Func<HttpResponseMessage>, int, int?)>(
new Dictionary<Func<string, bool>, (Func<HttpResponseMessage>, int, int?)>
{
{ uri => uri.Contains("/files/download"), (MockFilesDownloadEndpointMaxSize, 0, 1) },
}
);

var endpointMockTemplates = new List<SimpleRequestMocker>
{
new SimpleRequestMocker(uri => uri.Contains("/files/download"), MockFilesDownloadEndpointMaxSize, 1)
};

var services = new ServiceCollection();
var httpMocks = GetMockedHttpClientFactory(mockRequestsAsync(endpointMappings));
var httpMocks = GetMockedHttpClientFactory(MockRequestsAsync(endpointMockTemplates));
var mockHttpClientFactory = httpMocks.factory;
var mockedLogger = new Mock<ILogger<FileStorageClient>>();

Expand All @@ -70,17 +65,13 @@ public async Task TestFileStorageClientFailMaxSizeFiles()
[Fact]
public async Task TestFileStorageClientFailLargeFiles()
{

IDictionary<Func<string, bool>, (Func<HttpResponseMessage> responseFunc, int callCount, int? maxCalls)> endpointMappings =
new ConcurrentDictionary<Func<string, bool>, (Func<HttpResponseMessage>, int, int?)>(
new Dictionary<Func<string, bool>, (Func<HttpResponseMessage>, int, int?)>
{
{ uri => uri.Contains("/files/download"), (MockFilesDownloadEndpointLarge, 0, 1) },
}
);
var endpointMockTemplates = new List<SimpleRequestMocker>
{
new SimpleRequestMocker(uri => uri.Contains("/files/download"), MockFilesDownloadEndpointLarge, 1)
};

var services = new ServiceCollection();
var httpMocks = GetMockedHttpClientFactory(mockRequestsAsync(endpointMappings));
var httpMocks = GetMockedHttpClientFactory(MockRequestsAsync(endpointMockTemplates));
var mockHttpClientFactory = httpMocks.factory;
var mockedLogger = new Mock<ILogger<FileStorageClient>>();

Expand Down
109 changes: 88 additions & 21 deletions Cognite.Simulator.Tests/UtilsTests/TestUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Microsoft.Extensions.Logging;
using Moq;
using Moq.Protected;
using Xunit;

namespace Cognite.Simulator.Tests.UtilsTests
{
Expand Down Expand Up @@ -79,18 +80,19 @@ public static void WriteConfig()
File.WriteAllText(filePath, yamlContent);
}

private static HttpResponseMessage GoneResponse =
public static HttpResponseMessage GoneResponse =
CreateResponse(HttpStatusCode.Gone, "{\"error\": {\"code\": 410,\"message\": \"Gone\"}}");

/// <summary>
/// Mocks the HttpClientFactory to return the mocked responses
/// Example format for endpointMappings:
/// { uri => uri.Contains("/extpipes"), (MockExtPipesEndpoint, 0, 2) },
/// 2 is the number of times the response function will be called before returning a 410 Gone
/// if maxCalls is null, the response function will return the same response indefinitely
/// Mocks the requests to the endpoints with the given templates.
/// Goes through the list of templates in order and returns the response from the first template that matches the request.
/// If no template matches, returns a 501 Not Implemented response.
/// If a template has a max call count, it will only be used that many times. After that, it will be skipped.
/// </summary>
/// <param name="endpointMappings">Dictionary of URL matchers and response functions</param>
public static Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> mockRequestsAsync(IDictionary<Func<string, bool>, (Func<HttpResponseMessage> responseFunc, int callCount, int? maxCalls)> endpointMappings)
/// <param name="endpointMockTemplates"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> MockRequestsAsync(IList<SimpleRequestMocker> endpointMockTemplates)
{

return async (HttpRequestMessage message, CancellationToken token) =>
Expand All @@ -101,24 +103,89 @@ public static Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessa
throw new ArgumentNullException(nameof(uri));
}

foreach (var mapping in endpointMappings)
foreach (var mockTemplate in endpointMockTemplates)
{
if (mapping.Key(uri))
if (mockTemplate.Matches(uri))
{
var (responseFunc, callCount, maxCalls) = mapping.Value;
if (maxCalls.HasValue && callCount >= maxCalls)
{
return GoneResponse;
}
endpointMappings[mapping.Key] = (responseFunc, callCount + 1, maxCalls);
return responseFunc();
return mockTemplate.GetResponse();
}
}

return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent("{\"items\":[]}") };
return CreateResponse(HttpStatusCode.NotImplemented, "Not implemented");
};
}

/// <summary>
/// A container for easier mocking of endpoints.
/// </summary>
public class SimpleRequestMocker
{
private readonly Func<string, bool> _uriMatcher;
private readonly Func<HttpResponseMessage> _responseFunc;
private int _callCount = 0;
private readonly int? _maxCalls = null;
private Times _expectedCalls = Times.AtLeastOnce();

/// <summary>
/// Creates a new SimpleRequestMocker.
/// </summary>
/// <param name="uriMatcher">Function to match the URI of the request.</param>
/// <param name="responseFunc">Function to generate the response.</param>
/// <param name="maxCalls">Maximum number of times this mocker can be called. If null, there is no limit.</param>
public SimpleRequestMocker(Func<string, bool> uriMatcher, Func<HttpResponseMessage> responseFunc, int? maxCalls = null)
{
_uriMatcher = uriMatcher;
_responseFunc = responseFunc;
_maxCalls = maxCalls;
}

/// <summary>
/// Sets the expected number of calls to this mocker.
/// After the test, you can call AssertCallCount to check if the number of calls was as expected.
/// Default is AtLeastOnce.
/// </summary>
/// <param name="expectedCalls"></param>
public SimpleRequestMocker ShouldBeCalled(Times expectedCalls)
{
_expectedCalls = expectedCalls;
return this;
}

private bool HasMoreCalls()
{
return !_maxCalls.HasValue || _callCount < _maxCalls;
}

public bool Matches(string uri)
{
return _uriMatcher(uri) && HasMoreCalls();
}

public void AssertCallCount()
{
Assert.True(_expectedCalls.Validate(_callCount), $"Unexpected number of calls to endpoint. Expected {_expectedCalls} but was {_callCount}.");
}

public HttpResponseMessage GetResponse()
{
if (!HasMoreCalls())
{
throw new InvalidOperationException("Maximum number of calls reached.");
}
_callCount++;
return _responseFunc();
}
}

public static HttpResponseMessage MockAzureAADTokenEndpoint()
{
return CreateResponse(HttpStatusCode.OK, "{\"access_token\": \"test_token\", \"expires_in\": 3600, \"token_type\": \"Bearer\"}");
}

public static HttpResponseMessage MockBadRequest()
{
return CreateResponse(HttpStatusCode.BadRequest, "Bad Request");
}

public static HttpResponseMessage MockExtPipesEndpoint()
{
var item = $@"{{
Expand Down Expand Up @@ -165,12 +232,12 @@ public static HttpResponseMessage MockSimulatorRoutineRevEndpoint()
return OkItemsResponse(item);
}

private static HttpResponseMessage OkItemsResponse(string item)
public static HttpResponseMessage OkItemsResponse(string item)
{
return CreateResponse(HttpStatusCode.OK, $"{{\"items\":[{item}]}}");
}

private static HttpResponseMessage CreateResponse(HttpStatusCode statusCode, string content)
public static HttpResponseMessage CreateResponse(HttpStatusCode statusCode, string content)
{
return new HttpResponseMessage(statusCode) { Content = new StringContent(content) };
}
Expand Down
Loading

0 comments on commit 800ef10

Please sign in to comment.