diff --git a/.github/workflows/ghcr_build_push.yml b/.github/workflows/ghcr_build_push.yml new file mode 100644 index 0000000..9f7b3de --- /dev/null +++ b/.github/workflows/ghcr_build_push.yml @@ -0,0 +1,41 @@ +name: Create and publish a Docker image + +on: + release: + types: [created] + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + build-and-push-image: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Log in to the Container registry + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@v4 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + - name: Build and push Docker image + uses: docker/build-push-action@v3 + with: + context: ./custom-metrics-emitter + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file diff --git a/.gitignore b/.gitignore index dfcfd56..c0eb359 100644 --- a/.gitignore +++ b/.gitignore @@ -1,350 +1,4 @@ -## Ignore Visual Studio temporary files, build results, and -## files generated by popular Visual Studio add-ons. -## -## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore +.vs -# User-specific files -*.rsuser -*.suo -*.user -*.userosscache -*.sln.docstates - -# User-specific files (MonoDevelop/Xamarin Studio) -*.userprefs - -# Mono auto generated files -mono_crash.* - -# Build results -[Dd]ebug/ -[Dd]ebugPublic/ -[Rr]elease/ -[Rr]eleases/ -x64/ -x86/ -[Aa][Rr][Mm]/ -[Aa][Rr][Mm]64/ -bld/ -[Bb]in/ -[Oo]bj/ -[Ll]og/ -[Ll]ogs/ - -# Visual Studio 2015/2017 cache/options directory -.vs/ -# Uncomment if you have tasks that create the project's static files in wwwroot -#wwwroot/ - -# Visual Studio 2017 auto generated files -Generated\ Files/ - -# MSTest test Results -[Tt]est[Rr]esult*/ -[Bb]uild[Ll]og.* - -# NUnit -*.VisualState.xml -TestResult.xml -nunit-*.xml - -# Build Results of an ATL Project -[Dd]ebugPS/ -[Rr]eleasePS/ -dlldata.c - -# Benchmark Results -BenchmarkDotNet.Artifacts/ - -# .NET Core -project.lock.json -project.fragment.lock.json -artifacts/ - -# StyleCop -StyleCopReport.xml - -# Files built by Visual Studio -*_i.c -*_p.c -*_h.h -*.ilk -*.meta -*.obj -*.iobj -*.pch -*.pdb -*.ipdb -*.pgc -*.pgd -*.rsp -*.sbr -*.tlb -*.tli -*.tlh -*.tmp -*.tmp_proj -*_wpftmp.csproj -*.log -*.vspscc -*.vssscc -.builds -*.pidb -*.svclog -*.scc - -# Chutzpah Test files -_Chutzpah* - -# Visual C++ cache files -ipch/ -*.aps -*.ncb -*.opendb -*.opensdf -*.sdf -*.cachefile -*.VC.db -*.VC.VC.opendb - -# Visual Studio profiler -*.psess -*.vsp -*.vspx -*.sap - -# Visual Studio Trace Files -*.e2e - -# TFS 2012 Local Workspace -$tf/ - -# Guidance Automation Toolkit -*.gpState - -# ReSharper is a .NET coding add-in -_ReSharper*/ -*.[Rr]e[Ss]harper -*.DotSettings.user - -# TeamCity is a build add-in -_TeamCity* - -# DotCover is a Code Coverage Tool -*.dotCover - -# AxoCover is a Code Coverage Tool -.axoCover/* -!.axoCover/settings.json - -# Visual Studio code coverage results -*.coverage -*.coveragexml - -# NCrunch -_NCrunch_* -.*crunch*.local.xml -nCrunchTemp_* - -# MightyMoose -*.mm.* -AutoTest.Net/ - -# Web workbench (sass) -.sass-cache/ - -# Installshield output folder -[Ee]xpress/ - -# DocProject is a documentation generator add-in -DocProject/buildhelp/ -DocProject/Help/*.HxT -DocProject/Help/*.HxC -DocProject/Help/*.hhc -DocProject/Help/*.hhk -DocProject/Help/*.hhp -DocProject/Help/Html2 -DocProject/Help/html - -# Click-Once directory -publish/ - -# Publish Web Output -*.[Pp]ublish.xml -*.azurePubxml -# Note: Comment the next line if you want to checkin your web deploy settings, -# but database connection strings (with potential passwords) will be unencrypted -*.pubxml -*.publishproj - -# Microsoft Azure Web App publish settings. Comment the next line if you want to -# checkin your Azure Web App publish settings, but sensitive information contained -# in these scripts will be unencrypted -PublishScripts/ - -# NuGet Packages -*.nupkg -# NuGet Symbol Packages -*.snupkg -# The packages folder can be ignored because of Package Restore -**/[Pp]ackages/* -# except build/, which is used as an MSBuild target. -!**/[Pp]ackages/build/ -# Uncomment if necessary however generally it will be regenerated when needed -#!**/[Pp]ackages/repositories.config -# NuGet v3's project.json files produces more ignorable files -*.nuget.props -*.nuget.targets - -# Microsoft Azure Build Output -csx/ -*.build.csdef - -# Microsoft Azure Emulator -ecf/ -rcf/ - -# Windows Store app package directories and files -AppPackages/ -BundleArtifacts/ -Package.StoreAssociation.xml -_pkginfo.txt -*.appx -*.appxbundle -*.appxupload - -# Visual Studio cache files -# files ending in .cache can be ignored -*.[Cc]ache -# but keep track of directories ending in .cache -!?*.[Cc]ache/ - -# Others -ClientBin/ -~$* -*~ -*.dbmdl -*.dbproj.schemaview -*.jfm -*.pfx -*.publishsettings -orleans.codegen.cs - -# Including strong name files can present a security risk -# (https://github.com/github/gitignore/pull/2483#issue-259490424) -#*.snk - -# Since there are multiple workflows, uncomment next line to ignore bower_components -# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) -#bower_components/ - -# RIA/Silverlight projects -Generated_Code/ - -# Backup & report files from converting an old project file -# to a newer Visual Studio version. Backup files are not needed, -# because we have git ;-) -_UpgradeReport_Files/ -Backup*/ -UpgradeLog*.XML -UpgradeLog*.htm -ServiceFabricBackup/ -*.rptproj.bak - -# SQL Server files -*.mdf -*.ldf -*.ndf - -# Business Intelligence projects -*.rdl.data -*.bim.layout -*.bim_*.settings -*.rptproj.rsuser -*- [Bb]ackup.rdl -*- [Bb]ackup ([0-9]).rdl -*- [Bb]ackup ([0-9][0-9]).rdl - -# Microsoft Fakes -FakesAssemblies/ - -# GhostDoc plugin setting file -*.GhostDoc.xml - -# Node.js Tools for Visual Studio -.ntvs_analysis.dat -node_modules/ - -# Visual Studio 6 build log -*.plg - -# Visual Studio 6 workspace options file -*.opt - -# Visual Studio 6 auto-generated workspace file (contains which files were open etc.) -*.vbw - -# Visual Studio LightSwitch build output -**/*.HTMLClient/GeneratedArtifacts -**/*.DesktopClient/GeneratedArtifacts -**/*.DesktopClient/ModelManifest.xml -**/*.Server/GeneratedArtifacts -**/*.Server/ModelManifest.xml -_Pvt_Extensions - -# Paket dependency manager -.paket/paket.exe -paket-files/ - -# FAKE - F# Make -.fake/ - -# CodeRush personal settings -.cr/personal - -# Python Tools for Visual Studio (PTVS) -__pycache__/ -*.pyc - -# Cake - Uncomment if you are using it -# tools/** -# !tools/packages.config - -# Tabs Studio -*.tss - -# Telerik's JustMock configuration file -*.jmconfig - -# BizTalk build output -*.btp.cs -*.btm.cs -*.odx.cs -*.xsd.cs - -# OpenCover UI analysis results -OpenCover/ - -# Azure Stream Analytics local run output -ASALocalRun/ - -# MSBuild Binary and Structured Log -*.binlog - -# NVidia Nsight GPU debugger configuration file -*.nvuser - -# MFractors (Xamarin productivity tool) working folder -.mfractor/ - -# Local History for Visual Studio -.localhistory/ - -# BeatPulse healthcheck temp database -healthchecksdb - -# Backup folder for Package Reference Convert tool in Visual Studio 2017 -MigrationBackup/ - -# Ionide (cross platform F# VS Code tools) working folder -.ionide/ +deploy/bicep/param_local.json +deploy/bicep/param_local_redeploy.json diff --git a/README.md b/README.md index 364f052..40a7ed3 100644 --- a/README.md +++ b/README.md @@ -1,57 +1,145 @@ -# Project Name +--- +page_type: sample +languages: +- bicep +- csharp +- azurecli +products: +- azure-event-hubs +- azure-monitor +--- -(short, 1-3 sentenced, description of the project) +# eventhub-custom-metrics-emitter -## Features +This sample project show how to implement sending a custom metric events to Azure Event Hub. Specifically, we will calculate the unprocessed events (The Lag) of specific consumer group in event hub. More about custom metrics in Azure Monitor check [this article](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-custom-overview) -This project framework provides the following features: -* Feature 1 -* Feature 2 -* ... +## Table of Contents +- [Custom-Metrics-Emitter](#custom-metrics-emitter) + - [Table of Contents](#table-of-contents) + - [Overview](#overview) + - [High-Level Solution Concept](#high-level-solution-concept) + - [Custom Metric](#custom-metric) + - [Step by step deployment](#step-by-step-deployment) + - [Pre-requisite](#pre-requisite) + - [Deploying the solution](#deploying-the-solution) + - [Configuring Bicep to deploy the solution](#configuring-bicep-to-deploy-the-solution) + - [Test Locally](#test-locally) -## Getting Started +## Overview -### Prerequisites +When using Azure Event Hub, it is important to monitor the lag of the consumer group. This is the number of events that have been sent to the event hub but have not yet been processed by the consumer group. This is important to monitor as it can indicate if the consuming applications are able to address the load of the event hub. If the lag is increasing, it may indicate that the consuming applications are not able to keep up with the load of the event hub. -(ideally very short, if any) +> This metric is not available in event hub standard metrics for now. -- OS -- Library version -- ... -### Installation +## High-Level Solution Concept -(ideally very short) +![image](design/design.png) -- npm install [package name] -- mvn install -- ... +The solution will create the required resources and deploy the solution on Azure Container App. The solution will run as a Azure Container App and will send a custom metric to Azure Monitor. The custom metric will include the following dimensions: -### Quickstart -(Add steps to get up and running quickly) +- Eventhub name (the topic) -1. git clone [repository clone url] -2. cd [repository name] -3. ... +- Consumer group name +- Partition ID -## Demo +The resources that will be created are: -A demo app is included to show how to use the project. +- Azure Contianer App Environment - this is the environment which will host the solution -To run the demo, follow these steps: +- Azure Container App - this is the container which will run the solution -(Add steps to start up the demo) +- Managed Identity - this is the identity which will be used by the container app to authenticate to Azure Event Hub and Azure Storage -1. -2. -3. +- Role assignments - these are the role assignments which will be assigned to the managed identity to allow it to read the checkpoints from Azure Storage and send the custom metric to Azure Monitor -## Resources +### Custom Metric -(Any additional resources or related projects) +Example of json schema which send a custom metric can be found [here](test/custom1.json) +As the schema also include the partition number as one of the dimensions - we can have a view of unprocessed events per partition: +![image](design/view.png) -- Link to supporting information -- Link to similar sample -- ... + + +## Step by step deployment + +This solution is aimed for customers/users who already have an Azure Event Hub and want to monitor the lag of a specific consumer group. Please follow the pre-requisites below before deploying the solution. The users running the solution will need to have the Contributor role on the resource group where the solution is deployed. (at least) + +The image for this solution was build using GitHub Action. The code is [here](.github/workflows/ghcr_build_push.yml), it leverages the ghcr as the container registry. The image is available ```docker pull ghcr.io/yodobrin/custom-metrics-emitter:latest``` + +### Pre-requisite +1. Deploy the following Azure services: + 1. Azure Event Hub + 2. Azure Storage + 3. Azure Application Insights (optional) +2. Producer and Consumer sample application for Azure Event hub [code example](https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-send?tabs=passwordless%2Croles-azure-portal) + +### Deploying the solution + +1. Clone this repository to a local directory + +2. Navigate to the directory where the repository was cloned, and to deploy/bicep folder + +3. Modify the params.json file to include the required parameters for the deployment (see below for more details) + +4. Run the following command to deploy the solution: + +```azcli +az deployment group create --resource-group --template-file main.bicep --parameters @param.json +``` + +5. Once completed successfully, the solution will be deployed and running. + + +#### Configuring Bicep to deploy the solution + +The solution can be deployed using Bicep, the following files are included: + +- `main.bicep` - main bicep file which deploy the solution + +- `parameters.json` - parameters file which include the required parameters for the deployment + +- `aca.bicep` - bicep file which deploy the Azure Container App + +- `roles.bicep` - bicep file which deploy the required roles for the solution + +The following parameters should be set, these are subset of the environment variables which need to be set when running the docker image. The other items are either derived or created during the deployment process. + +- `EventHubNamespace` - This is the namespace of your eventhub + +- `EventHubName` - This is the name of your eventhub you wish to monitor within the namespace + +- `CheckpointAccountName` - This is the name of the storage account where the checkpoints are stored + +- `CheckpointContainerName` - This is the name of the container within the storage account where the checkpoints are stored + +- `CustomMetricInterval` - an optional value, this is the interval in milliseconds between each metric being sent to Azure Monitor. + +- `managedIdentityName` - As part of the deployment, a user assigned managed identity will be created. This is the name of that identity. It will be used as the identity for the container app. This identity will be granted the required roles for the solution to work. These are the roles: + + - `Monitoring Metrics Publisher` role for Azure Event Hub + + - `Azure Event Hubs Data Owner` role for Azure Event Hub + + - `Storage Blob Data Reader` for Azure Storage + +## Redploying to other image tag + +When you need to update to latest or specific version of the image, you can use the following approach: + +1. Modify the `param_redeploy.json` file to include the required parameters for the deployment (Use the existing values from your resource group) + +2. Run the following command to redeploy the solution: + +```azcli +az deployment group create --resource-group --template-file redeploy.bicep --parameters @param_redeploy.json +``` + +## Test Locally +In order to run and test locally the solution, build a docker image (using this [Dockerfile](custom-metrics-emitter/Dockerfile)) and execute the following run command - fill the missing values: + +`docker run -d -e EventHubNamespace="" -e Region="" -e SubscriptionId="" -e ResourceGroup="" -e TenantId="" -e EventHubName="" -e ConsumerGroup="optional" -e CheckpointAccountName="" -e CheckpointContainerName="" -e CustomMetricInterval="" -e ManagedIdentityClientId="optional" -e APPLICATIONINSIGHTS_CONNECTION_STRING="optional" -e AZURE_TENANT_ID="optional" -e AZURE_CLIENT_ID="optional" -e AZURE_CLIENT_SECRET="optional" ` + +remark: ConsumerGroup environment variable - if `empty` then - geting all consumer group of given EventHub, else: can add specific consumer group names seprated by `;` \ No newline at end of file diff --git a/custom-metrics-emitter.sln b/custom-metrics-emitter.sln new file mode 100644 index 0000000..8cd96ca --- /dev/null +++ b/custom-metrics-emitter.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 25.0.1704.2 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "custom-metrics-emitter", "src\custom-metrics-emitter.csproj", "{104CA399-D6F1-4C36-81E7-2ED8015BE485}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {104CA399-D6F1-4C36-81E7-2ED8015BE485}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {104CA399-D6F1-4C36-81E7-2ED8015BE485}.Debug|Any CPU.Build.0 = Debug|Any CPU + {104CA399-D6F1-4C36-81E7-2ED8015BE485}.Release|Any CPU.ActiveCfg = Release|Any CPU + {104CA399-D6F1-4C36-81E7-2ED8015BE485}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {DF11A73F-ABF2-472E-AA45-604EE0B9C28C} + EndGlobalSection +EndGlobal diff --git a/deploy/bicep/aca.bicep b/deploy/bicep/aca.bicep new file mode 100644 index 0000000..6e6e40b --- /dev/null +++ b/deploy/bicep/aca.bicep @@ -0,0 +1,168 @@ + +@description('default to resource group location.') +param location string = resourceGroup().location + + + +@description('Name of the Container App Environment') +param AcaEnvName string + + + +resource logAnalytics 'Microsoft.OperationalInsights/workspaces@2021-12-01-preview' = { + name: 'emitter-log-analytics' + location: location + properties: { + sku: { + name: 'PerGB2018' + } + } +} + + +resource containerAppEnvironment 'Microsoft.App/managedEnvironments@2022-06-01-preview' = { + name: AcaEnvName + location: location + properties: { + appLogsConfiguration: { + destination: 'log-analytics' + logAnalyticsConfiguration: { + customerId: logAnalytics.properties.customerId + sharedKey: logAnalytics.listKeys().primarySharedKey + } + } + } +} + + + +// in case using an existing log analytics workspace - this is the code to use +// resource logAnalytics 'Microsoft.OperationalInsights/workspaces@2021-12-01-preview' existing = { +// name: 'emitter-log-analytics' +// scope: resourceGroup() +// } + +// and this is the code to use for the existing container app environment + +// resource containerAppEnvironment 'Microsoft.App/managedEnvironments@2022-06-01-preview' existing = { +// name: name +// } + + + +@description('Name of the Emitter Container App') +param EmitterImage string +@description('Name of the Emitter Registry') +param registryLoginServer string + + + +@description('Managed Identity Client Id - created in main.bicep') +param ManagedIdentityClientId string + +@description('Managed Identity Client Id - created in main.bicep') +param ManagedIdentityId string + + +@description('Event Hub Namespace - provided in the param.json file') +param EventHubNamespace string + +@description('Event Hub - provided in the param.json file') +param EventHubName string + +// consider to also pass in param file (or we should take all consumer groups) +param ConsumerGroup string = '$Default' + + +@description('Storage Account Name - provided in the param.json file') +param CheckpointAccountName string + +@description('Storage Container Name - provided in the param.json file') +param CheckpointContainerName string + +@description('Custom Metric Interval - provided in the param.json file') +param CustomMetricInterval string + + + +resource ContainerApp 'Microsoft.App/containerApps@2022-06-01-preview' = { + name: 'eh-lag-emitter' + location: location + identity: { + type: 'UserAssigned' + userAssignedIdentities: { + '${ManagedIdentityId}':{} + } + } + properties: { + managedEnvironmentId: containerAppEnvironment.id + + configuration: { + + } + template: { + containers: [ + { + name: 'emitter' + image: '${registryLoginServer}/${EmitterImage}' + resources: { + cpu: json('0.25') + memory: '0.5Gi' + } + env: [ + { + name: 'TenantId' + value: subscription().tenantId + } + { + name: 'SubscriptionId' + value: subscription().subscriptionId + } + { + name: 'ResourceGroup' + value: resourceGroup().name + } + { + name: 'Region' + value: location + } + { + name: 'EventHubNamespace' + value: EventHubNamespace + } + { + name: 'EventHubName' + value: EventHubName + } + { + name: 'ConsumerGroup' + value: ConsumerGroup + } + { + name: 'CheckpointAccountName' + value: CheckpointAccountName + } + { + name: 'CheckpointContainerName' + value: CheckpointContainerName + } + { + name: 'CustomMetricInterval' + value: CustomMetricInterval + } + { + name: 'ManagedIdentityClientId' + value: ManagedIdentityClientId + } + + ] + } + ] + scale: { + minReplicas: 1 + maxReplicas: 1 + } + } + } +} + diff --git a/deploy/bicep/main.bicep b/deploy/bicep/main.bicep new file mode 100644 index 0000000..af39818 --- /dev/null +++ b/deploy/bicep/main.bicep @@ -0,0 +1,69 @@ + +@description('The location of the resource group and the location in which all resurces would be created') +param location string = resourceGroup().location + +@description('Storage Account Name - provided in the param.json file') +param CheckpointAccountName string + +@description('Storage Container Name - provided in the param.json file') +param CheckpointContainerName string + +@description('Custom Metric Interval - provided in the param.json file') +param CustomMetricInterval string + +@description('Event Hub Namespace - provided in the param.json file') +param EventHubNamespace string + +@description('Event Hub - provided in the param.json file') +param EventHubName string + +@description('managed identity name from param.json file') +param managedIdentityName string + +@description('container app environment name from param.json file') +param AcaEnvName string + +// create a managed identity +resource mngIdentity 'Microsoft.ManagedIdentity/userAssignedIdentities@2022-01-31-preview' = { + name: managedIdentityName + location: location +} + +// when using an existing managed identity, using the 'existing' keyword will not create a new one rather it will use the existing one +// resource mngIdentity 'Microsoft.ManagedIdentity/userAssignedIdentities@2022-01-31-preview' existing = { +// name: managedIdentityName +// scope: resourceGroup() +// } + +// assign role to the managed identity using a module +module roles 'roles.bicep' = { + name: 'roles' + params: { + ManagedIdentityID: mngIdentity.properties.principalId + EventHubNamespace: EventHubNamespace + CheckpointAccountName: CheckpointAccountName + } +} + + +// create aca resource + +module ACA 'aca.bicep' = { + name: 'aca-emitter' + params: { + location: location + ManagedIdentityId: mngIdentity.id + ManagedIdentityClientId: mngIdentity.properties.clientId + EventHubNamespace: EventHubNamespace + CheckpointAccountName: CheckpointAccountName + AcaEnvName: AcaEnvName + EventHubName: EventHubName + CheckpointContainerName: CheckpointContainerName + CustomMetricInterval: CustomMetricInterval + EmitterImage: 'yodobrin/custom-metrics-emitter:0.3' + registryLoginServer: 'ghcr.io' + } +} + + + diff --git a/deploy/bicep/param.json b/deploy/bicep/param.json new file mode 100644 index 0000000..21afda6 --- /dev/null +++ b/deploy/bicep/param.json @@ -0,0 +1,27 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentParameters.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "EventHubNamespace": { + "value": "" + }, + "EventHubName":{ + "value": "" + }, + "CheckpointAccountName":{ + "value": "" + }, + "CheckpointContainerName":{ + "value": "" + }, + "CustomMetricInterval":{ + "value": "10000" + }, + "managedIdentityName":{ + "value": "" + }, + "AcaEnvName":{ + "value": "" + } + } + } \ No newline at end of file diff --git a/deploy/bicep/param_redeploy.json b/deploy/bicep/param_redeploy.json new file mode 100644 index 0000000..b3fdf2f --- /dev/null +++ b/deploy/bicep/param_redeploy.json @@ -0,0 +1,33 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentParameters.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "EventHubNamespace": { + "value": "" + }, + "EventHubName":{ + "value": "" + }, + "CheckpointAccountName":{ + "value": "" + }, + "CheckpointContainerName":{ + "value": "" + }, + "CustomMetricInterval":{ + "value": "10000" + }, + "managedIdentityName":{ + "value": "" + }, + "AcaEnvName":{ + "value": "< your existing ACA env name>" + }, + "EmitterImage":{ + "value": "yodobrin/custom-metrics-emitter:0.4" + }, + "ContainerAppName":{ + "value": "" + } + } + } \ No newline at end of file diff --git a/deploy/bicep/redeploy.bicep b/deploy/bicep/redeploy.bicep new file mode 100644 index 0000000..fa93a16 --- /dev/null +++ b/deploy/bicep/redeploy.bicep @@ -0,0 +1,152 @@ + + +// @description('managed identity name from param.json file') +// param managedIdentityName string + +@description('container app environment name from param.json file') +param AcaEnvName string + +@description('container app name from param.json file') +param ContainerAppName string + + +@description('Name of the Emitter Container App') +param EmitterImage string + +@description('Name of the Emitter Registry') +param registryLoginServer string = 'ghcr.io' + +@description('managed identity name from param.json file') +param managedIdentityName string + +// @description('Managed Identity Client Id - created in main.bicep') +// param ManagedIdentityClientId string + + + + +param location string = resourceGroup().location + +@description('Event Hub Namespace - provided in the param.json file') +param EventHubNamespace string + +@description('Event Hub - provided in the param.json file') +param EventHubName string + +// consider to also pass in param file (or we should take all consumer groups) +param ConsumerGroup string = '$Default' + + +@description('Storage Account Name - provided in the param.json file') +param CheckpointAccountName string + +@description('Storage Container Name - provided in the param.json file') +param CheckpointContainerName string + +@description('Custom Metric Interval - provided in the param.json file') +param CustomMetricInterval string + + + + +resource mngIdentity 'Microsoft.ManagedIdentity/userAssignedIdentities@2022-01-31-preview' existing = { + name: managedIdentityName +} + +// @description('Managed Identity Client Id - created in main.bicep') +// param ManagedIdentityId string = mngIdentity.id + +// // in case using an existing log analytics workspace - this is the code to use +// resource logAnalytics 'Microsoft.OperationalInsights/workspaces@2021-12-01-preview' existing = { +// name: 'emitter-log-analytics' +// scope: resourceGroup() +// } + +// and this is the code to use for the existing container app environment + +resource containerAppEnvironment 'Microsoft.App/managedEnvironments@2022-06-01-preview' existing = { + name: AcaEnvName +} + + +resource ContainerApp 'Microsoft.App/containerApps@2022-06-01-preview' = { + name: ContainerAppName + location: location + identity: { + type: 'UserAssigned' + userAssignedIdentities: { + '${ mngIdentity.id}':{} + } + } + properties: { + managedEnvironmentId: containerAppEnvironment.id + + configuration: { + + } + template: { + containers: [ + { + name: 'emitter' + image: '${registryLoginServer}/${EmitterImage}' + resources: { + cpu: json('0.25') + memory: '0.5Gi' + } + env: [ + { + name: 'TenantId' + value: subscription().tenantId + } + { + name: 'SubscriptionId' + value: subscription().subscriptionId + } + { + name: 'ResourceGroup' + value: resourceGroup().name + } + { + name: 'Region' + value: location + } + { + name: 'EventHubNamespace' + value: EventHubNamespace + } + { + name: 'EventHubName' + value: EventHubName + } + { + name: 'ConsumerGroup' + value: ConsumerGroup + } + { + name: 'CheckpointAccountName' + value: CheckpointAccountName + } + { + name: 'CheckpointContainerName' + value: CheckpointContainerName + } + { + name: 'CustomMetricInterval' + value: CustomMetricInterval + } + { + name: 'ManagedIdentityClientId' + value: mngIdentity.properties.clientId + } + + ] + } + ] + scale: { + minReplicas: 1 + maxReplicas: 1 + } + } +} + +} diff --git a/deploy/bicep/roles.bicep b/deploy/bicep/roles.bicep new file mode 100644 index 0000000..6ac1c8b --- /dev/null +++ b/deploy/bicep/roles.bicep @@ -0,0 +1,52 @@ + +param EventHubNamespace string +param CheckpointAccountName string +param ManagedIdentityID string + +var roleIdMapping = { + MonitoringMetricPublisherAssignment : '3913510d-42f4-4e42-8a64-420c390055eb' + EventHubDataOwnerAssignment : 'f526a384-b230-433a-b45c-95f59c4a2dec' + StorageBlobDataReaderAssignment : '2a2b9908-6ea1-4ae2-8e65-a410df84e7d1' +} + + +// pick the existing event hub and storage account + +resource eventHub 'Microsoft.EventHub/namespaces@2021-11-01' existing = { + name: EventHubNamespace +} + +resource checkpointStorage 'Microsoft.Storage/storageAccounts@2021-06-01' existing = { + name: CheckpointAccountName +} + + +resource EventHubDataOwnerAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid(roleIdMapping.EventHubDataOwnerAssignment,ManagedIdentityID,eventHub.id) + scope: eventHub + properties: { + roleDefinitionId: subscriptionResourceId('Microsoft.Authorization/roleDefinitions',roleIdMapping.EventHubDataOwnerAssignment) + principalId: ManagedIdentityID + principalType: 'ServicePrincipal' + } +} + +resource StorageBlobDataReaderAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid(roleIdMapping.StorageBlobDataReaderAssignment,ManagedIdentityID,checkpointStorage.id) + scope: checkpointStorage + properties: { + roleDefinitionId: subscriptionResourceId('Microsoft.Authorization/roleDefinitions', roleIdMapping.StorageBlobDataReaderAssignment) + principalId: ManagedIdentityID + principalType: 'ServicePrincipal' + } +} + +resource MonitoringMetricPublisherAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid(roleIdMapping.MonitoringMetricPublisherAssignment,ManagedIdentityID,eventHub.id) + scope: eventHub + properties: { + roleDefinitionId: subscriptionResourceId('Microsoft.Authorization/roleDefinitions', roleIdMapping.MonitoringMetricPublisherAssignment) + principalId: ManagedIdentityID + principalType: 'ServicePrincipal' + } +} diff --git a/design/design.png b/design/design.png new file mode 100644 index 0000000..356bf2e Binary files /dev/null and b/design/design.png differ diff --git a/design/view.png b/design/view.png new file mode 100644 index 0000000..3c79e64 Binary files /dev/null and b/design/view.png differ diff --git a/src/.dockerignore b/src/.dockerignore new file mode 100644 index 0000000..9415088 --- /dev/null +++ b/src/.dockerignore @@ -0,0 +1 @@ +appsettings.Development.json \ No newline at end of file diff --git a/src/.gitignore b/src/.gitignore new file mode 100644 index 0000000..d4ad9f9 --- /dev/null +++ b/src/.gitignore @@ -0,0 +1,343 @@ +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. +## +## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore + +# User-specific files +*.suo +*.user +*.userosscache +*.sln.docstates + +# User-specific files (MonoDevelop/Xamarin Studio) +*.userprefs + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +bld/ +[Bb]in/ +[Oo]bj/ +[Ll]og/ + +# Visual Studio 2015/2017 cache/options directory +.vs/ +# Uncomment if you have tasks that create the project's static files in wwwroot +#wwwroot/ + +# Visual Studio 2017 auto generated files +Generated\ Files/ + +# MSTest test Results +[Tt]est[Rr]esult*/ +[Bb]uild[Ll]og.* + +# NUNIT +*.VisualState.xml +TestResult.xml + +# Build Results of an ATL Project +[Dd]ebugPS/ +[Rr]eleasePS/ +dlldata.c + +# Benchmark Results +BenchmarkDotNet.Artifacts/ + +# .NET Core +project.lock.json +project.fragment.lock.json +artifacts/ +**/Properties/launchSettings.json + +# StyleCop +StyleCopReport.xml + +# Files built by Visual Studio +*_i.c +*_p.c +*_i.h +*.ilk +*.meta +*.obj +*.iobj +*.pch +*.pdb +*.ipdb +*.pgc +*.pgd +*.rsp +*.sbr +*.tlb +*.tli +*.tlh +*.tmp +*.tmp_proj +*.log +*.vspscc +*.vssscc +.builds +*.pidb +*.svclog +*.scc + +# Chutzpah Test files +_Chutzpah* + +# Visual C++ cache files +ipch/ +*.aps +*.ncb +*.opendb +*.opensdf +*.sdf +*.cachefile +*.VC.db +*.VC.VC.opendb + +# Visual Studio profiler +*.psess +*.vsp +*.vspx +*.sap + +# Visual Studio Trace Files +*.e2e + +# TFS 2012 Local Workspace +$tf/ + +# Guidance Automation Toolkit +*.gpState + +# ReSharper is a .NET coding add-in +_ReSharper*/ +*.[Rr]e[Ss]harper +*.DotSettings.user + +# JustCode is a .NET coding add-in +.JustCode + +# TeamCity is a build add-in +_TeamCity* + +# DotCover is a Code Coverage Tool +*.dotCover + +# AxoCover is a Code Coverage Tool +.axoCover/* +!.axoCover/settings.json + +# Visual Studio code coverage results +*.coverage +*.coveragexml + +# NCrunch +_NCrunch_* +.*crunch*.local.xml +nCrunchTemp_* + +# MightyMoose +*.mm.* +AutoTest.Net/ + +# Web workbench (sass) +.sass-cache/ + +# Installshield output folder +[Ee]xpress/ + +# DocProject is a documentation generator add-in +DocProject/buildhelp/ +DocProject/Help/*.HxT +DocProject/Help/*.HxC +DocProject/Help/*.hhc +DocProject/Help/*.hhk +DocProject/Help/*.hhp +DocProject/Help/Html2 +DocProject/Help/html + +# Click-Once directory +publish/ + +# Publish Web Output +*.[Pp]ublish.xml +*.azurePubxml +# Note: Comment the next line if you want to checkin your web deploy settings, +# but database connection strings (with potential passwords) will be unencrypted +*.pubxml +*.publishproj + +# Microsoft Azure Web App publish settings. Comment the next line if you want to +# checkin your Azure Web App publish settings, but sensitive information contained +# in these scripts will be unencrypted +PublishScripts/ + +# NuGet Packages +*.nupkg +# The packages folder can be ignored because of Package Restore +**/[Pp]ackages/* +# except build/, which is used as an MSBuild target. +!**/[Pp]ackages/build/ +# Uncomment if necessary however generally it will be regenerated when needed +#!**/[Pp]ackages/repositories.config +# NuGet v3's project.json files produces more ignorable files +*.nuget.props +*.nuget.targets + +# Microsoft Azure Build Output +csx/ +*.build.csdef + +# Microsoft Azure Emulator +ecf/ +rcf/ + +# Windows Store app package directories and files +AppPackages/ +BundleArtifacts/ +Package.StoreAssociation.xml +_pkginfo.txt +*.appx + +# Visual Studio cache files +# files ending in .cache can be ignored +*.[Cc]ache +# but keep track of directories ending in .cache +!*.[Cc]ache/ + +# Others +ClientBin/ +~$* +*~ +*.dbmdl +*.dbproj.schemaview +*.jfm +*.pfx +*.publishsettings +orleans.codegen.cs + +# Including strong name files can present a security risk +# (https://github.com/github/gitignore/pull/2483#issue-259490424) +#*.snk + +# Since there are multiple workflows, uncomment next line to ignore bower_components +# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) +#bower_components/ + +# RIA/Silverlight projects +Generated_Code/ + +# Backup & report files from converting an old project file +# to a newer Visual Studio version. Backup files are not needed, +# because we have git ;-) +_UpgradeReport_Files/ +Backup*/ +UpgradeLog*.XML +UpgradeLog*.htm +ServiceFabricBackup/ +*.rptproj.bak + +# SQL Server files +*.mdf +*.ldf +*.ndf + +# Business Intelligence projects +*.rdl.data +*.bim.layout +*.bim_*.settings +*.rptproj.rsuser + +# Microsoft Fakes +FakesAssemblies/ + +# GhostDoc plugin setting file +*.GhostDoc.xml + +# Node.js Tools for Visual Studio +.ntvs_analysis.dat +node_modules/ + +# Visual Studio 6 build log +*.plg + +# Visual Studio 6 workspace options file +*.opt + +# Visual Studio 6 auto-generated workspace file (contains which files were open etc.) +*.vbw + +# Visual Studio LightSwitch build output +**/*.HTMLClient/GeneratedArtifacts +**/*.DesktopClient/GeneratedArtifacts +**/*.DesktopClient/ModelManifest.xml +**/*.Server/GeneratedArtifacts +**/*.Server/ModelManifest.xml +_Pvt_Extensions + +# Paket dependency manager +.paket/paket.exe +paket-files/ + +# FAKE - F# Make +.fake/ + +# JetBrains Rider +.idea/ +*.sln.iml + +# CodeRush +.cr/ + +# Python Tools for Visual Studio (PTVS) +__pycache__/ +*.pyc + +# Cake - Uncomment if you are using it +# tools/** +# !tools/packages.config + +# Tabs Studio +*.tss + +# Telerik's JustMock configuration file +*.jmconfig + +# BizTalk build output +*.btp.cs +*.btm.cs +*.odx.cs +*.xsd.cs + +# OpenCover UI analysis results +OpenCover/ + +# Azure Stream Analytics local run output +ASALocalRun/ + +# MSBuild Binary and Structured Log +*.binlog + +# NVidia Nsight GPU debugger configuration file +*.nvuser + +# MFractors (Xamarin productivity tool) working folder +.mfractor/ +/src/docker-compose.override.yml + +# OS generated files # +###################### +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +appsettings.Development.json diff --git a/src/Dockerfile b/src/Dockerfile new file mode 100644 index 0000000..df80a34 --- /dev/null +++ b/src/Dockerfile @@ -0,0 +1,18 @@ +FROM mcr.microsoft.com/dotnet/runtime:6.0 AS base +WORKDIR /app + +FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build +WORKDIR /src +COPY ["custom-metrics-emitter.csproj", "."] +RUN dotnet restore "custom-metrics-emitter.csproj" +COPY . . +WORKDIR "/src" +RUN dotnet build "custom-metrics-emitter.csproj" -c Release -o /app/build + +FROM build AS publish +RUN dotnet publish "custom-metrics-emitter.csproj" -c Release -o /app/publish /p:UseAppHost=false + +FROM base AS final +WORKDIR /app +COPY --from=publish /app/publish . +ENTRYPOINT ["dotnet", "custom-metrics-emitter.dll"] diff --git a/src/Program.cs b/src/Program.cs new file mode 100644 index 0000000..5a89702 --- /dev/null +++ b/src/Program.cs @@ -0,0 +1,22 @@ +using custom_metrics_emitter; + +IHost host = Host.CreateDefaultBuilder(args) + .ConfigureServices((hostContext, services) => + { + IConfiguration configuration = hostContext.Configuration; + + if (!string.IsNullOrEmpty(configuration["APPLICATIONINSIGHTS_CONNECTION_STRING"])) + { + services.AddApplicationInsightsTelemetryWorkerService(); + } + services.AddHostedService(); + services.AddLogging(opt => + { + opt.AddSimpleConsole(config => config.TimestampFormat = "[HH:mm:ss]"); + }); + + }) + .Build(); + +await host.RunAsync(); + diff --git a/src/Worker.cs b/src/Worker.cs new file mode 100644 index 0000000..fc183ab --- /dev/null +++ b/src/Worker.cs @@ -0,0 +1,90 @@ +namespace custom_metrics_emitter; + +using Azure.Identity; + +public class Worker : BackgroundService +{ + private readonly ILogger _logger = default!; + private readonly EmitterConfig _cfg = default!; + private readonly EventHubEmitter _ehEmitter = default!; + + public Worker(ILogger logger, IConfiguration configuration) + { + try + { + _logger = logger; + + _cfg = new( + EventHubNamespace: configuration.Require("EventHubNamespace"), + EventHubName: configuration.Require("EventHubName"), + ConsumerGroup: configuration.Optional("ConsumerGroup"), + CheckpointAccountName: configuration.Require("CheckpointAccountName"), + CheckpointContainerName: configuration.Require("CheckpointContainerName"), + Region: configuration.Require("Region"), + TenantId: configuration.Require("TenantId"), + SubscriptionId: configuration.Require("SubscriptionId"), + ResourceGroup: configuration.Require("ResourceGroup"), + ManagedIdentityClientId: configuration.Optional("ManagedIdentityClientId"), + CustomMetricInterval: configuration.GetIntOrDefault("CustomMetricInterval", defaulT: 10_000)); + + var defaultCredential = string.IsNullOrEmpty(_cfg.ManagedIdentityClientId) + ? new DefaultAzureCredential() + : new DefaultAzureCredential(options: new() { ManagedIdentityClientId = _cfg.ManagedIdentityClientId }); + + _ehEmitter = new(_logger, _cfg, defaultCredential); + } + catch(Exception ex) + { + logger.LogError("{error}", ex.ToString()); + } + } + + protected override async Task ExecuteAsync(CancellationToken cancellationToken = default) + { + try + { + while (!cancellationToken.IsCancellationRequested) + { + _logger.LogInformation("Worker running at: {time}", DateTimeOffset.UtcNow); + var res = await _ehEmitter.ReadFromBlobStorageAndPublishToAzureMonitorAsync(cancellationToken); + + if (res.IsSuccessStatusCode) + { + _logger.LogInformation("Send Custom Metric end with status: {status}", res.StatusCode); + } + else + { + _logger.LogError("Error sending custom event with status: {status}", res.StatusCode); + } + + await Task.Delay(_cfg.CustomMetricInterval, cancellationToken); + } + } + catch (Exception ex) + { + _logger.LogError("{error}", ex.ToString()); + } + } +} + +/// +/// A helper class to make configuration parsing more fluent. +/// +internal static class IConfigurationExtensions +{ + internal static int GetIntOrDefault(this IConfiguration cfg, string name, int defaulT) => + !string.IsNullOrEmpty(cfg.GetValue(name)) && int.TryParse(cfg.GetValue(name), out int value) ? value : defaulT; + + internal static string Optional(this IConfiguration cfg, string name) => + cfg.GetValue(name) ?? string.Empty; + + internal static string Require(this IConfiguration cfg, string name) + { + var val = cfg.Optional(name); + if (string.IsNullOrEmpty(val)) + { + throw new ArgumentException($"Configuration error, missing key {name}", nameof(cfg)); + } + return val; + } +} \ No newline at end of file diff --git a/src/appsettings.json b/src/appsettings.json new file mode 100644 index 0000000..3b96c53 --- /dev/null +++ b/src/appsettings.json @@ -0,0 +1,12 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Trace" + }, + "ApplicationInsights": { + "LogLevel": { + "Default": "Trace" + } + } + } +} \ No newline at end of file diff --git a/src/custom-metrics-emitter.csproj b/src/custom-metrics-emitter.csproj new file mode 100644 index 0000000..882f791 --- /dev/null +++ b/src/custom-metrics-emitter.csproj @@ -0,0 +1,26 @@ + + + net6.0 + enable + enable + dotnet-custom_metrics_emitter-3dc3a13e-0171-4a69-85b3-c2b100c2669a + custom_metrics_emitter + ../docker-compose.dcproj + + + + + + + + + + + + + + + + + + diff --git a/src/emitters/EmitterConfig.cs b/src/emitters/EmitterConfig.cs new file mode 100644 index 0000000..8af0b58 --- /dev/null +++ b/src/emitters/EmitterConfig.cs @@ -0,0 +1,14 @@ +namespace custom_metrics_emitter; + +public record EmitterConfig( + string Region, + string SubscriptionId, + string ResourceGroup, + string TenantId, + string EventHubNamespace, + string EventHubName, + string ConsumerGroup, + string CheckpointAccountName, + string CheckpointContainerName, + int CustomMetricInterval, + string ManagedIdentityClientId); \ No newline at end of file diff --git a/src/emitters/EmitterHelper.cs b/src/emitters/EmitterHelper.cs new file mode 100644 index 0000000..35fcf50 --- /dev/null +++ b/src/emitters/EmitterHelper.cs @@ -0,0 +1,185 @@ +namespace custom_metrics_emitter.emitters; + +using Azure.Core; +using Azure.Identity; +using Microsoft.Azure.Amqp.Framing; +using Microsoft.Extensions.Logging; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Data.SqlTypes; +using System.Net; +using System.Net.Http.Headers; +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading; +using System.Xml.Linq; + +public record struct AccessTokenAndExpiration(bool isExpired, string token); + +public class EmitterHelper +{ + private static readonly HttpClient _httpClient = new(); + private readonly ILogger _logger; + private readonly TokenStore _TokenStore; + + public EmitterHelper(ILogger logger, DefaultAzureCredential defaultAzureCredential) + { + _logger = logger; + _TokenStore = new TokenStore( + defaultAzureCredential); + } + + public async Task SendCustomMetric( + string? region, string? resourceId, EmitterSchema metricToSend, + CancellationToken cancellationToken = default) + { + if ((region != null) && (resourceId != null)) + { + var record = await _TokenStore.RefreshAzureMonitorCredentialOnDemandAsync(cancellationToken); + _httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", record.token); + _httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); + + string uri = $"https://{region}.monitoring.azure.com{resourceId}/metrics"; + string jsonString = JsonSerializer.Serialize(metricToSend, _jsonOptions); + + StringContent content = new( + content: jsonString, + encoding: Encoding.UTF8, + mediaType: "application/json"); + + _logger.LogInformation("SendCustomMetric:{uri} with payload:{payload}", uri, jsonString); + + return await _httpClient.PostAsync(uri, content, cancellationToken); + } + + return new HttpResponseMessage(HttpStatusCode.LengthRequired); + } + + public string[] GetAllConsumerGroup(string eventhubNamespace, string eventhub) + { + var ehRecord = _TokenStore.RefreshAzureEventHubCredentialOnDemand(); + _httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", ehRecord.token); + + string uri = $"https://{eventhubNamespace}.servicebus.windows.net/{eventhub}/consumergroups?timeout=60&api-version=2014-01"; + + _logger.LogInformation("GetAllConsumerGroup:{uri}", uri); + var response = _httpClient.GetAsync(uri).Result.Content.ReadAsStringAsync().Result; + var doc = XDocument.Parse(response); + var entries = from item in doc.Root!. + Descendants(). + Where(i => i.Name.LocalName == "entry"). + Descendants(). + Where(j => j.Name.LocalName == "title") + select item.Value; + + return entries.ToArray(); + } + + public ValueTask RefreshAzureEventHubCredentialOnDemandAsync(CancellationToken cancellationToken = default) + { + return _TokenStore.RefreshAzureEventHubCredentialOnDemandAsync(cancellationToken); + } + + public ValueTask RefreshCredentialOnDemandAsync(string audience, + CancellationToken cancellationToken = default) + { + return _TokenStore.RefreshCredentialOnDemand(audience, cancellationToken); + } + + private static JsonSerializerOptions _jsonOptions = CreateJsonOptions(); + + internal static JsonSerializerOptions CreateJsonOptions() + { + JsonSerializerOptions options = new() { WriteIndented = false }; + options.Converters.Add(new SortableDateTimeConverter()); + return options; + } + + private class SortableDateTimeConverter : JsonConverter + { + private const string format = "s"; //SortableDateTimePattern yyyy'-'MM'-'dd'T'HH':'mm':'ss + + public override void Write(Utf8JsonWriter writer, DateTime date, JsonSerializerOptions options) + { + writer.WriteStringValue(date.ToUniversalTime().ToString(format)); + } + public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + return DateTime.ParseExact(reader.GetString()!, format, provider: null); + } + } + + private class TokenStore + { + private static readonly string MONITOR_SCOPE = "https://monitor.azure.com/.default"; + private static readonly string EVENTHUBS_SCOPE = "https://eventhubs.azure.net/.default"; + + private readonly DefaultAzureCredential _defaultAzureCredential; + private ConcurrentDictionary _scopeAndTokens = new(); + + + public TokenStore(DefaultAzureCredential defaultAzureCredential) + { + (_defaultAzureCredential) = (defaultAzureCredential); + RefreshAzureMonitorCredentialOnDemand(); + RefreshAzureEventHubCredentialOnDemand(); + } + + public ValueTask RefreshAzureMonitorCredentialOnDemandAsync(CancellationToken cancellationToken = default) + { + return RefreshCredentialOnDemand(MONITOR_SCOPE, cancellationToken); + } + + public ValueTask RefreshAzureEventHubCredentialOnDemandAsync(CancellationToken cancellationToken = default) + { + return RefreshCredentialOnDemand(EVENTHUBS_SCOPE, cancellationToken); + } + + public AccessTokenAndExpiration RefreshAzureMonitorCredentialOnDemand(CancellationToken cancellationToken = default) + { + return RefreshCredentialOnDemand(MONITOR_SCOPE, cancellationToken).Result; + } + + public AccessTokenAndExpiration RefreshAzureEventHubCredentialOnDemand(CancellationToken cancellationToken = default) + { + return RefreshCredentialOnDemand(EVENTHUBS_SCOPE, cancellationToken).Result; + } + + public async ValueTask RefreshCredentialOnDemand(string scope, CancellationToken cancellationToken = default) + { + bool needsNewToken(TimeSpan safetyInterval) + { + AccessToken? token; + if (_scopeAndTokens.TryGetValue(scope, out token)) + { + if (!token.HasValue) return true; + var timeUntilExpiry = token!.Value.ExpiresOn.Subtract(DateTimeOffset.UtcNow); + return timeUntilExpiry < safetyInterval; + } + return true; + } + + var isExpired = needsNewToken(safetyInterval: TimeSpan.FromMinutes(5.0)); + + if (isExpired) + { + var newToken = await _defaultAzureCredential.GetTokenAsync( + requestContext: new TokenRequestContext(new[] { scope }), + cancellationToken: cancellationToken); + + AccessToken? token; + if (_scopeAndTokens.TryGetValue(scope, out token) == false) + { + _scopeAndTokens.TryAdd(scope, newToken); + } + else + { + _scopeAndTokens[scope] = newToken; + } + } + + return new AccessTokenAndExpiration(isExpired, _scopeAndTokens[scope]!.Value.Token); + } + } +} diff --git a/src/emitters/EmitterSchema.cs b/src/emitters/EmitterSchema.cs new file mode 100644 index 0000000..88fbfac --- /dev/null +++ b/src/emitters/EmitterSchema.cs @@ -0,0 +1,29 @@ +namespace custom_metrics_emitter.emitters; + +using System.Text.Json.Serialization; + +// https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-store-custom-rest-api + +#pragma warning disable IDE1006 // Naming Styles + +public record EmitterSchema( + DateTime time, + CustomMetricData? data); + +public record CustomMetricData( + CustomMetricBaseData? baseData); + +public record CustomMetricBaseData( + string? metric, + string? Namespace, + IEnumerable? dimNames, + IEnumerable? series); + +public record CustomMetricBaseDataSeriesItem( + IEnumerable? dimValues, + [property: JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] long? min, + [property: JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] long? max, + [property: JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] long? sum, + [property: JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] long? count); + +#pragma warning restore IDE1006 // Naming Styles diff --git a/src/emitters/EventHubEmitter.cs b/src/emitters/EventHubEmitter.cs new file mode 100644 index 0000000..b35d898 --- /dev/null +++ b/src/emitters/EventHubEmitter.cs @@ -0,0 +1,194 @@ +namespace custom_metrics_emitter; + +using Azure.Core; +using Azure.Identity; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; +using custom_metrics_emitter.emitters; +using Azure.Messaging.EventHubs; +using Azure.Messaging.EventHubs.Consumer; +using System.Collections.Concurrent; + +internal record LagInformation(string ConsumerName, string PartitionId, long Lag); + +public class EventHubEmitter +{ + private const string LAG_METRIC_NAME = "Lag"; + private const string EVENT_HUB_CUSTOM_METRIC_NAMESPACE = "Event Hub custom metrics"; + + // Implementation details from the EventHub .NET SDK + private const string SEQUENCE_NUMBER = "sequenceNumber"; + private const string OFFSET_KEY = "offset"; + private readonly string _prefix; + private string CheckpointBlobName(string consumerGroup, string partitionId) => $"{_prefix}/{consumerGroup.ToLowerInvariant()}/checkpoint/{partitionId}"; + + private const string SERVICE_BUS_HOST_SUFFIX = ".servicebus.windows.net"; + private const string STORAGE_HOST_SUFFIX = ".blob.core.windows.net"; + + private readonly ILogger _logger; + private readonly EmitterConfig _cfg; + private readonly string _eventhubresourceId; + + private readonly EmitterHelper _emitter; + private readonly BlobContainerClient _checkpointContainerClient = default!; + private readonly Dictionary _eventhubConsumerClientsInfo = new(); + private readonly string[] _consumerGroups = default!; + + public EventHubEmitter(ILogger logger, EmitterConfig config, DefaultAzureCredential defaultCredential) + { + + (_logger, _cfg) = (logger, config); + + _emitter = new EmitterHelper(_logger, defaultCredential); + + if (string.IsNullOrEmpty(config.ConsumerGroup)) + { + _consumerGroups = _emitter.GetAllConsumerGroup(_cfg.EventHubNamespace, _cfg.EventHubName); + } + else + { + _consumerGroups = config.ConsumerGroup.Split(';'); + } + + _eventhubresourceId = $"/subscriptions/{_cfg.SubscriptionId}/resourceGroups/{_cfg.ResourceGroup}/providers/Microsoft.EventHub/namespaces/{_cfg.EventHubNamespace}"; + _prefix = $"{_cfg.EventHubNamespace.ToLowerInvariant()}{SERVICE_BUS_HOST_SUFFIX}/{_cfg.EventHubName.ToLowerInvariant()}"; + + + _checkpointContainerClient = new BlobContainerClient( + blobContainerUri: new($"https://{_cfg.CheckpointAccountName}{STORAGE_HOST_SUFFIX}/{_cfg.CheckpointContainerName}"), + credential: defaultCredential); + + //init eventhubConsumerClients per consumer group + foreach (string cGroup in _consumerGroups) + { + var client = new EventHubConsumerClient( + consumerGroup: cGroup, + fullyQualifiedNamespace: $"{_cfg.EventHubNamespace.ToLowerInvariant()}{SERVICE_BUS_HOST_SUFFIX}", + eventHubName: _cfg.EventHubName, + credential: defaultCredential); + + var partitions = client.GetPartitionIdsAsync().Result; + + _eventhubConsumerClientsInfo.TryAdd(cGroup, + new(consumerClient: client, partitionIds: partitions)); + } + } + + public async Task ReadFromBlobStorageAndPublishToAzureMonitorAsync(CancellationToken cancellationToken = default) + { + var totalLag = await GetLagAsync(cancellationToken); + + var emitterdata = new EmitterSchema( + time: DateTime.UtcNow, + data: new CustomMetricData( + baseData: new CustomMetricBaseData( + metric: LAG_METRIC_NAME, + Namespace: EVENT_HUB_CUSTOM_METRIC_NAMESPACE, + dimNames: new[] { "EventHubName", "ConsumerGroup", "PartitionId" }, + series: totalLag.Select((lagInfo, idx) => + new CustomMetricBaseDataSeriesItem( + dimValues: new[] { _cfg.EventHubName, lagInfo.ConsumerName, lagInfo.PartitionId }, + min: null, max: null, + count: idx + 1, + sum: lagInfo.Lag))))); + + return await _emitter.SendCustomMetric( + region: _cfg.Region, + resourceId: _eventhubresourceId, + metricToSend: emitterdata, + cancellationToken: cancellationToken); + } + + private async Task> GetLagAsync(CancellationToken cancellationToken = default) + { + // Query all partitions in parallel + var tasks = from consumer in _consumerGroups + from id in _eventhubConsumerClientsInfo[consumer]._partitionIds + select new { consumerGroup = consumer, partitionId = id, Task = LagInPartition(consumer, id, cancellationToken) }; + + await Task.WhenAll(tasks.Select(s => s.Task)); + + return tasks + .Select(x => new LagInformation(x.consumerGroup, x.partitionId, x.Task.Result)) + .OrderBy(x => x.PartitionId); + } + + private async Task LagInPartition(string consumerGroup, + string partitionId, CancellationToken cancellationToken = default) + { + long retVal = 0; + try + { + var partitionInfo = await _eventhubConsumerClientsInfo[consumerGroup]._consumerClient.GetPartitionPropertiesAsync( + partitionId, + cancellationToken); + // if partitionInfo.LastEnqueuedOffset = -1, that means event hub partition is empty + if ((partitionInfo != null) && (partitionInfo.LastEnqueuedOffset == -1)) + { + _logger.LogInformation("LagInPartition Empty partition"); + } + else + { + string checkpointName = CheckpointBlobName(consumerGroup, partitionId); + _logger.LogInformation("LagInPartition Checkpoint GetProperties: {name}", checkpointName); + + BlobProperties properties = await _checkpointContainerClient + .GetBlobClient(checkpointName) + .GetPropertiesAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + + string strSeqNum, strOffset; + if (properties.Metadata.TryGetValue(SEQUENCE_NUMBER, out strSeqNum!) && + properties.Metadata.TryGetValue(OFFSET_KEY, out strOffset!)) + { + if (long.TryParse(strSeqNum, out long seqNum)) + { + _logger.LogInformation("LagInPartition Start: {checkpoint name} seq={seqNum} offset={offset}", checkpointName, seqNum, strOffset); + + // If checkpoint.Offset is empty that means no messages has been processed from an event hub partition + // And since partitionInfo.LastSequenceNumber = 0 for the very first message hence + // total unprocessed message will be partitionInfo.LastSequenceNumber + 1 + if (string.IsNullOrEmpty(strOffset) == true) + { + retVal = partitionInfo!.LastEnqueuedSequenceNumber + 1; + } + else + { + if (partitionInfo!.LastEnqueuedSequenceNumber >= seqNum) + { + retVal = partitionInfo.LastEnqueuedSequenceNumber - seqNum; + } + else + { + // Partition is a circular buffer, so it is possible that + // partitionInfo.LastSequenceNumber < blob checkpoint's SequenceNumber + retVal = (long.MaxValue - partitionInfo.LastEnqueuedSequenceNumber) + seqNum; + + if (retVal < 0) + retVal = 0; + } + } + _logger.LogInformation("LagInPartition End: {checkpoint name} seq={seqNum} offset={offset} lag={lag}", checkpointName, seqNum, strOffset, retVal); + } + } + } + } + catch (Exception ex) + { + _logger.LogError("LagInPartition Error: {error}", ex.ToString()); + } + return retVal; + } + + private class ConsumerClientInfo + { + public EventHubConsumerClient _consumerClient; + public string[] _partitionIds; + + public ConsumerClientInfo(EventHubConsumerClient consumerClient, string[] partitionIds) + { + _consumerClient = consumerClient; + _partitionIds = partitionIds; + } + } +} \ No newline at end of file diff --git a/test/custom1.json b/test/custom1.json new file mode 100644 index 0000000..113cb41 --- /dev/null +++ b/test/custom1.json @@ -0,0 +1,26 @@ +{ + "time": "2023-01-17T11:26:20", + "data": { + "baseData": { + "metric": "Lag", + "namespace": "Eventhub custom metrics", + "dimNames": [ + "EventHubName", + "ConsumerGroup", + "PartitionId" + ], + "series": [ + { + "dimValues": [ + "EventHub1", + "Consumer1", + "0" + ], + "sum": 20, + "count": 1 + } + ] + } + } +} +