diff --git a/src/Program.cs b/src/Program.cs index 5a89702..b7d339b 100644 --- a/src/Program.cs +++ b/src/Program.cs @@ -1,22 +1,22 @@ -using custom_metrics_emitter; - -IHost host = Host.CreateDefaultBuilder(args) - .ConfigureServices((hostContext, services) => - { - IConfiguration configuration = hostContext.Configuration; - - if (!string.IsNullOrEmpty(configuration["APPLICATIONINSIGHTS_CONNECTION_STRING"])) - { - services.AddApplicationInsightsTelemetryWorkerService(); - } - services.AddHostedService(); - services.AddLogging(opt => - { - opt.AddSimpleConsole(config => config.TimestampFormat = "[HH:mm:ss]"); - }); - - }) - .Build(); - -await host.RunAsync(); - +using custom_metrics_emitter; + +IHost host = Host.CreateDefaultBuilder(args) + .ConfigureServices((hostContext, services) => + { + IConfiguration configuration = hostContext.Configuration; + + if (!string.IsNullOrEmpty(configuration["APPLICATIONINSIGHTS_CONNECTION_STRING"])) + { + services.AddApplicationInsightsTelemetryWorkerService(); + } + services.AddHostedService(); + services.AddLogging(opt => + { + opt.AddSimpleConsole(config => config.TimestampFormat = "[HH:mm:ss]"); + }); + + }) + .Build(); + +await host.RunAsync(); + diff --git a/src/Worker.cs b/src/Worker.cs index fc183ab..a665e70 100644 --- a/src/Worker.cs +++ b/src/Worker.cs @@ -1,15 +1,15 @@ -namespace custom_metrics_emitter; - -using Azure.Identity; - -public class Worker : BackgroundService -{ - private readonly ILogger _logger = default!; - private readonly EmitterConfig _cfg = default!; - private readonly EventHubEmitter _ehEmitter = default!; - - public Worker(ILogger logger, IConfiguration configuration) - { +namespace custom_metrics_emitter; + +using Azure.Identity; + +public class Worker : BackgroundService +{ + private readonly ILogger _logger = default!; + private readonly EmitterConfig _cfg = default!; + private readonly EventHubEmitter _ehEmitter = default!; + + public Worker(ILogger logger, IConfiguration configuration) + { try { _logger = logger; @@ -32,59 +32,59 @@ public Worker(ILogger logger, IConfiguration configuration) : new DefaultAzureCredential(options: new() { ManagedIdentityClientId = _cfg.ManagedIdentityClientId }); _ehEmitter = new(_logger, _cfg, defaultCredential); - } - catch(Exception ex) + } + catch(Exception ex) { logger.LogError("{error}", ex.ToString()); - } - } - - protected override async Task ExecuteAsync(CancellationToken cancellationToken = default) - { - try - { - while (!cancellationToken.IsCancellationRequested) - { - _logger.LogInformation("Worker running at: {time}", DateTimeOffset.UtcNow); - var res = await _ehEmitter.ReadFromBlobStorageAndPublishToAzureMonitorAsync(cancellationToken); - - if (res.IsSuccessStatusCode) - { - _logger.LogInformation("Send Custom Metric end with status: {status}", res.StatusCode); - } - else - { - _logger.LogError("Error sending custom event with status: {status}", res.StatusCode); - } - - await Task.Delay(_cfg.CustomMetricInterval, cancellationToken); - } - } - catch (Exception ex) - { - _logger.LogError("{error}", ex.ToString()); - } - } -} - -/// -/// A helper class to make configuration parsing more fluent. -/// -internal static class IConfigurationExtensions -{ - internal static int GetIntOrDefault(this IConfiguration cfg, string name, int defaulT) => - !string.IsNullOrEmpty(cfg.GetValue(name)) && int.TryParse(cfg.GetValue(name), out int value) ? value : defaulT; - - internal static string Optional(this IConfiguration cfg, string name) => - cfg.GetValue(name) ?? string.Empty; - - internal static string Require(this IConfiguration cfg, string name) - { - var val = cfg.Optional(name); - if (string.IsNullOrEmpty(val)) - { - throw new ArgumentException($"Configuration error, missing key {name}", nameof(cfg)); - } - return val; - } -} \ No newline at end of file + } + } + + protected override async Task ExecuteAsync(CancellationToken cancellationToken = default) + { + try + { + while (!cancellationToken.IsCancellationRequested) + { + _logger.LogInformation("Worker running at: {time}", DateTimeOffset.UtcNow); + var res = await _ehEmitter.ReadFromBlobStorageAndPublishToAzureMonitorAsync(cancellationToken); + + if (res.IsSuccessStatusCode) + { + _logger.LogInformation("Send Custom Metric end with status: {status}", res.StatusCode); + } + else + { + _logger.LogError("Error sending custom event with status: {status}", res.StatusCode); + } + + await Task.Delay(_cfg.CustomMetricInterval, cancellationToken); + } + } + catch (Exception ex) + { + _logger.LogError("{error}", ex.ToString()); + } + } +} + +/// +/// A helper class to make configuration parsing more fluent. +/// +internal static class IConfigurationExtensions +{ + internal static int GetIntOrDefault(this IConfiguration cfg, string name, int defaulT) => + !string.IsNullOrEmpty(cfg.GetValue(name)) && int.TryParse(cfg.GetValue(name), out int value) ? value : defaulT; + + internal static string Optional(this IConfiguration cfg, string name) => + cfg.GetValue(name) ?? string.Empty; + + internal static string Require(this IConfiguration cfg, string name) + { + var val = cfg.Optional(name); + if (string.IsNullOrEmpty(val)) + { + throw new ArgumentException($"Configuration error, missing key {name}", nameof(cfg)); + } + return val; + } +} diff --git a/src/emitters/EmitterConfig.cs b/src/emitters/EmitterConfig.cs index 8af0b58..b6c1116 100644 --- a/src/emitters/EmitterConfig.cs +++ b/src/emitters/EmitterConfig.cs @@ -1,14 +1,14 @@ -namespace custom_metrics_emitter; - -public record EmitterConfig( - string Region, - string SubscriptionId, - string ResourceGroup, - string TenantId, - string EventHubNamespace, - string EventHubName, - string ConsumerGroup, - string CheckpointAccountName, - string CheckpointContainerName, - int CustomMetricInterval, +namespace custom_metrics_emitter; + +public record EmitterConfig( + string Region, + string SubscriptionId, + string ResourceGroup, + string TenantId, + string EventHubNamespace, + string EventHubName, + string ConsumerGroup, + string CheckpointAccountName, + string CheckpointContainerName, + int CustomMetricInterval, string ManagedIdentityClientId); \ No newline at end of file diff --git a/src/emitters/EmitterHelper.cs b/src/emitters/EmitterHelper.cs index 35fcf50..e3ef53e 100644 --- a/src/emitters/EmitterHelper.cs +++ b/src/emitters/EmitterHelper.cs @@ -1,4 +1,4 @@ -namespace custom_metrics_emitter.emitters; +namespace custom_metrics_emitter.emitters; using Azure.Core; using Azure.Identity; diff --git a/src/emitters/EmitterSchema.cs b/src/emitters/EmitterSchema.cs index 88fbfac..36961ef 100644 --- a/src/emitters/EmitterSchema.cs +++ b/src/emitters/EmitterSchema.cs @@ -1,10 +1,10 @@ -namespace custom_metrics_emitter.emitters; +namespace custom_metrics_emitter.emitters; using System.Text.Json.Serialization; // https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-store-custom-rest-api - -#pragma warning disable IDE1006 // Naming Styles + +#pragma warning disable IDE1006 // Naming Styles public record EmitterSchema( DateTime time, @@ -12,18 +12,18 @@ public record EmitterSchema( public record CustomMetricData( CustomMetricBaseData? baseData); - -public record CustomMetricBaseData( - string? metric, - string? Namespace, - IEnumerable? dimNames, - IEnumerable? series); - -public record CustomMetricBaseDataSeriesItem( - IEnumerable? dimValues, - [property: JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] long? min, - [property: JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] long? max, - [property: JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] long? sum, - [property: JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] long? count); - -#pragma warning restore IDE1006 // Naming Styles + +public record CustomMetricBaseData( + string? metric, + string? Namespace, + IEnumerable? dimNames, + IEnumerable? series); + +public record CustomMetricBaseDataSeriesItem( + IEnumerable? dimValues, + [property: JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] long? min, + [property: JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] long? max, + [property: JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] long? sum, + [property: JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] long? count); + +#pragma warning restore IDE1006 // Naming Styles diff --git a/src/emitters/EventHubEmitter.cs b/src/emitters/EventHubEmitter.cs index b35d898..5352152 100644 --- a/src/emitters/EventHubEmitter.cs +++ b/src/emitters/EventHubEmitter.cs @@ -1,58 +1,58 @@ -namespace custom_metrics_emitter; - -using Azure.Core; -using Azure.Identity; -using Azure.Storage.Blobs; -using Azure.Storage.Blobs.Models; -using custom_metrics_emitter.emitters; -using Azure.Messaging.EventHubs; +namespace custom_metrics_emitter; + +using Azure.Core; +using Azure.Identity; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; +using custom_metrics_emitter.emitters; +using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using System.Collections.Concurrent; -internal record LagInformation(string ConsumerName, string PartitionId, long Lag); - -public class EventHubEmitter -{ - private const string LAG_METRIC_NAME = "Lag"; - private const string EVENT_HUB_CUSTOM_METRIC_NAMESPACE = "Event Hub custom metrics"; - - // Implementation details from the EventHub .NET SDK - private const string SEQUENCE_NUMBER = "sequenceNumber"; - private const string OFFSET_KEY = "offset"; - private readonly string _prefix; - private string CheckpointBlobName(string consumerGroup, string partitionId) => $"{_prefix}/{consumerGroup.ToLowerInvariant()}/checkpoint/{partitionId}"; - - private const string SERVICE_BUS_HOST_SUFFIX = ".servicebus.windows.net"; - private const string STORAGE_HOST_SUFFIX = ".blob.core.windows.net"; - - private readonly ILogger _logger; - private readonly EmitterConfig _cfg; - private readonly string _eventhubresourceId; - +internal record LagInformation(string ConsumerName, string PartitionId, long Lag); + +public class EventHubEmitter +{ + private const string LAG_METRIC_NAME = "Lag"; + private const string EVENT_HUB_CUSTOM_METRIC_NAMESPACE = "Event Hub custom metrics"; + + // Implementation details from the EventHub .NET SDK + private const string SEQUENCE_NUMBER = "sequenceNumber"; + private const string OFFSET_KEY = "offset"; + private readonly string _prefix; + private string CheckpointBlobName(string consumerGroup, string partitionId) => $"{_prefix}/{consumerGroup.ToLowerInvariant()}/checkpoint/{partitionId}"; + + private const string SERVICE_BUS_HOST_SUFFIX = ".servicebus.windows.net"; + private const string STORAGE_HOST_SUFFIX = ".blob.core.windows.net"; + + private readonly ILogger _logger; + private readonly EmitterConfig _cfg; + private readonly string _eventhubresourceId; + private readonly EmitterHelper _emitter; - private readonly BlobContainerClient _checkpointContainerClient = default!; - private readonly Dictionary _eventhubConsumerClientsInfo = new(); - private readonly string[] _consumerGroups = default!; - - public EventHubEmitter(ILogger logger, EmitterConfig config, DefaultAzureCredential defaultCredential) + private readonly BlobContainerClient _checkpointContainerClient = default!; + private readonly Dictionary _eventhubConsumerClientsInfo = new(); + private readonly string[] _consumerGroups = default!; + + public EventHubEmitter(ILogger logger, EmitterConfig config, DefaultAzureCredential defaultCredential) { - (_logger, _cfg) = (logger, config); - - _emitter = new EmitterHelper(_logger, defaultCredential); - + (_logger, _cfg) = (logger, config); + + _emitter = new EmitterHelper(_logger, defaultCredential); + if (string.IsNullOrEmpty(config.ConsumerGroup)) { _consumerGroups = _emitter.GetAllConsumerGroup(_cfg.EventHubNamespace, _cfg.EventHubName); - } + } else { _consumerGroups = config.ConsumerGroup.Split(';'); - } - - _eventhubresourceId = $"/subscriptions/{_cfg.SubscriptionId}/resourceGroups/{_cfg.ResourceGroup}/providers/Microsoft.EventHub/namespaces/{_cfg.EventHubNamespace}"; - _prefix = $"{_cfg.EventHubNamespace.ToLowerInvariant()}{SERVICE_BUS_HOST_SUFFIX}/{_cfg.EventHubName.ToLowerInvariant()}"; - + } + + _eventhubresourceId = $"/subscriptions/{_cfg.SubscriptionId}/resourceGroups/{_cfg.ResourceGroup}/providers/Microsoft.EventHub/namespaces/{_cfg.EventHubNamespace}"; + _prefix = $"{_cfg.EventHubNamespace.ToLowerInvariant()}{SERVICE_BUS_HOST_SUFFIX}/{_cfg.EventHubName.ToLowerInvariant()}"; + _checkpointContainerClient = new BlobContainerClient( blobContainerUri: new($"https://{_cfg.CheckpointAccountName}{STORAGE_HOST_SUFFIX}/{_cfg.CheckpointContainerName}"), @@ -74,112 +74,112 @@ public EventHubEmitter(ILogger logger, EmitterConfig config, DefaultAzur } } - public async Task ReadFromBlobStorageAndPublishToAzureMonitorAsync(CancellationToken cancellationToken = default) - { - var totalLag = await GetLagAsync(cancellationToken); - - var emitterdata = new EmitterSchema( - time: DateTime.UtcNow, - data: new CustomMetricData( - baseData: new CustomMetricBaseData( - metric: LAG_METRIC_NAME, - Namespace: EVENT_HUB_CUSTOM_METRIC_NAMESPACE, - dimNames: new[] { "EventHubName", "ConsumerGroup", "PartitionId" }, - series: totalLag.Select((lagInfo, idx) => - new CustomMetricBaseDataSeriesItem( - dimValues: new[] { _cfg.EventHubName, lagInfo.ConsumerName, lagInfo.PartitionId }, - min: null, max: null, - count: idx + 1, - sum: lagInfo.Lag))))); - - return await _emitter.SendCustomMetric( - region: _cfg.Region, - resourceId: _eventhubresourceId, - metricToSend: emitterdata, - cancellationToken: cancellationToken); - } - - private async Task> GetLagAsync(CancellationToken cancellationToken = default) + public async Task ReadFromBlobStorageAndPublishToAzureMonitorAsync(CancellationToken cancellationToken = default) + { + var totalLag = await GetLagAsync(cancellationToken); + + var emitterdata = new EmitterSchema( + time: DateTime.UtcNow, + data: new CustomMetricData( + baseData: new CustomMetricBaseData( + metric: LAG_METRIC_NAME, + Namespace: EVENT_HUB_CUSTOM_METRIC_NAMESPACE, + dimNames: new[] { "EventHubName", "ConsumerGroup", "PartitionId" }, + series: totalLag.Select((lagInfo, idx) => + new CustomMetricBaseDataSeriesItem( + dimValues: new[] { _cfg.EventHubName, lagInfo.ConsumerName, lagInfo.PartitionId }, + min: null, max: null, + count: idx + 1, + sum: lagInfo.Lag))))); + + return await _emitter.SendCustomMetric( + region: _cfg.Region, + resourceId: _eventhubresourceId, + metricToSend: emitterdata, + cancellationToken: cancellationToken); + } + + private async Task> GetLagAsync(CancellationToken cancellationToken = default) { // Query all partitions in parallel - var tasks = from consumer in _consumerGroups - from id in _eventhubConsumerClientsInfo[consumer]._partitionIds - select new { consumerGroup = consumer, partitionId = id, Task = LagInPartition(consumer, id, cancellationToken) }; - - await Task.WhenAll(tasks.Select(s => s.Task)); - - return tasks - .Select(x => new LagInformation(x.consumerGroup, x.partitionId, x.Task.Result)) - .OrderBy(x => x.PartitionId); - } - - private async Task LagInPartition(string consumerGroup, - string partitionId, CancellationToken cancellationToken = default) - { - long retVal = 0; - try - { - var partitionInfo = await _eventhubConsumerClientsInfo[consumerGroup]._consumerClient.GetPartitionPropertiesAsync( - partitionId, - cancellationToken); - // if partitionInfo.LastEnqueuedOffset = -1, that means event hub partition is empty - if ((partitionInfo != null) && (partitionInfo.LastEnqueuedOffset == -1)) - { - _logger.LogInformation("LagInPartition Empty partition"); - } - else - { - string checkpointName = CheckpointBlobName(consumerGroup, partitionId); - _logger.LogInformation("LagInPartition Checkpoint GetProperties: {name}", checkpointName); - - BlobProperties properties = await _checkpointContainerClient - .GetBlobClient(checkpointName) - .GetPropertiesAsync(cancellationToken: cancellationToken) - .ConfigureAwait(false); - - string strSeqNum, strOffset; - if (properties.Metadata.TryGetValue(SEQUENCE_NUMBER, out strSeqNum!) && - properties.Metadata.TryGetValue(OFFSET_KEY, out strOffset!)) - { - if (long.TryParse(strSeqNum, out long seqNum)) - { - _logger.LogInformation("LagInPartition Start: {checkpoint name} seq={seqNum} offset={offset}", checkpointName, seqNum, strOffset); - - // If checkpoint.Offset is empty that means no messages has been processed from an event hub partition - // And since partitionInfo.LastSequenceNumber = 0 for the very first message hence - // total unprocessed message will be partitionInfo.LastSequenceNumber + 1 - if (string.IsNullOrEmpty(strOffset) == true) - { - retVal = partitionInfo!.LastEnqueuedSequenceNumber + 1; - } - else - { - if (partitionInfo!.LastEnqueuedSequenceNumber >= seqNum) - { - retVal = partitionInfo.LastEnqueuedSequenceNumber - seqNum; - } - else - { - // Partition is a circular buffer, so it is possible that - // partitionInfo.LastSequenceNumber < blob checkpoint's SequenceNumber - retVal = (long.MaxValue - partitionInfo.LastEnqueuedSequenceNumber) + seqNum; - - if (retVal < 0) - retVal = 0; - } - } - _logger.LogInformation("LagInPartition End: {checkpoint name} seq={seqNum} offset={offset} lag={lag}", checkpointName, seqNum, strOffset, retVal); - } - } - } - } - catch (Exception ex) - { - _logger.LogError("LagInPartition Error: {error}", ex.ToString()); - } - return retVal; - } - + var tasks = from consumer in _consumerGroups + from id in _eventhubConsumerClientsInfo[consumer]._partitionIds + select new { consumerGroup = consumer, partitionId = id, Task = LagInPartition(consumer, id, cancellationToken) }; + + await Task.WhenAll(tasks.Select(s => s.Task)); + + return tasks + .Select(x => new LagInformation(x.consumerGroup, x.partitionId, x.Task.Result)) + .OrderBy(x => x.PartitionId); + } + + private async Task LagInPartition(string consumerGroup, + string partitionId, CancellationToken cancellationToken = default) + { + long retVal = 0; + try + { + var partitionInfo = await _eventhubConsumerClientsInfo[consumerGroup]._consumerClient.GetPartitionPropertiesAsync( + partitionId, + cancellationToken); + // if partitionInfo.LastEnqueuedOffset = -1, that means event hub partition is empty + if ((partitionInfo != null) && (partitionInfo.LastEnqueuedOffset == -1)) + { + _logger.LogInformation("LagInPartition Empty partition"); + } + else + { + string checkpointName = CheckpointBlobName(consumerGroup, partitionId); + _logger.LogInformation("LagInPartition Checkpoint GetProperties: {name}", checkpointName); + + BlobProperties properties = await _checkpointContainerClient + .GetBlobClient(checkpointName) + .GetPropertiesAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + + string strSeqNum, strOffset; + if (properties.Metadata.TryGetValue(SEQUENCE_NUMBER, out strSeqNum!) && + properties.Metadata.TryGetValue(OFFSET_KEY, out strOffset!)) + { + if (long.TryParse(strSeqNum, out long seqNum)) + { + _logger.LogInformation("LagInPartition Start: {checkpoint name} seq={seqNum} offset={offset}", checkpointName, seqNum, strOffset); + + // If checkpoint.Offset is empty that means no messages has been processed from an event hub partition + // And since partitionInfo.LastSequenceNumber = 0 for the very first message hence + // total unprocessed message will be partitionInfo.LastSequenceNumber + 1 + if (string.IsNullOrEmpty(strOffset) == true) + { + retVal = partitionInfo!.LastEnqueuedSequenceNumber + 1; + } + else + { + if (partitionInfo!.LastEnqueuedSequenceNumber >= seqNum) + { + retVal = partitionInfo.LastEnqueuedSequenceNumber - seqNum; + } + else + { + // Partition is a circular buffer, so it is possible that + // partitionInfo.LastSequenceNumber < blob checkpoint's SequenceNumber + retVal = (long.MaxValue - partitionInfo.LastEnqueuedSequenceNumber) + seqNum; + + if (retVal < 0) + retVal = 0; + } + } + _logger.LogInformation("LagInPartition End: {checkpoint name} seq={seqNum} offset={offset} lag={lag}", checkpointName, seqNum, strOffset, retVal); + } + } + } + } + catch (Exception ex) + { + _logger.LogError("LagInPartition Error: {error}", ex.ToString()); + } + return retVal; + } + private class ConsumerClientInfo { public EventHubConsumerClient _consumerClient; @@ -190,5 +190,5 @@ public ConsumerClientInfo(EventHubConsumerClient consumerClient, string[] partit _consumerClient = consumerClient; _partitionIds = partitionIds; } - } + } } \ No newline at end of file