Skip to content

Commit

Permalink
Add back V0 metrics (Azure#1650)
Browse files Browse the repository at this point in the history
  • Loading branch information
varunpuranik authored Aug 30, 2019
1 parent b872869 commit a26e4e8
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading.Tasks;
using App.Metrics;
using App.Metrics.Gauge;
using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud;
using Microsoft.Azure.Devices.Edge.Hub.Core.Device;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
Expand Down Expand Up @@ -35,6 +37,7 @@ public ConnectionManager(
this.maxClients = Preconditions.CheckRange(maxClients, 1, nameof(maxClients));
this.credentialsCache = Preconditions.CheckNotNull(credentialsCache, nameof(credentialsCache));
this.identityProvider = Preconditions.CheckNotNull(identityProvider, nameof(identityProvider));
Util.Metrics.MetricsV0.RegisterGaugeCallback(() => MetricsV0.SetConnectedClientCountGauge(this));
}

public event EventHandler<IIdentity> CloudConnectionEstablished;
Expand Down Expand Up @@ -499,5 +502,21 @@ internal static void GetCloudConnection(IIdentity identity, Try<ICloudConnection
}
}
}

static class MetricsV0
{
static readonly GaugeOptions ConnectedClientGaugeOptions = new GaugeOptions
{
Name = "EdgeHubConnectedClientGauge",
MeasurementUnit = Unit.Events
};

public static void SetConnectedClientCountGauge(ConnectionManager connectionManager)
{
// Subtract EdgeHub from the list of connected clients
int connectedClients = connectionManager.GetConnectedClients().Count() - 1;
Util.Metrics.MetricsV0.SetGauge(ConnectedClientGaugeOptions, connectedClients);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Routing
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using App.Metrics;
using App.Metrics.Counter;
using App.Metrics.Timer;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud;
using Microsoft.Azure.Devices.Edge.Util;
Expand Down Expand Up @@ -223,15 +226,20 @@ async Task<ISinkResult<IRoutingMessage>> ProcessClientMessagesBatch(string id, L
.Select(r => this.cloudEndpoint.messageConverter.ToMessage(r))
.ToList();

if (messages.Count == 1)
using (MetricsV0.CloudLatency(id))
{
await cp.SendMessageAsync(messages[0]);
}
else
{
await cp.SendMessageBatchAsync(messages);
if (messages.Count == 1)
{
await cp.SendMessageAsync(messages[0]);
}
else
{
await cp.SendMessageBatchAsync(messages);
}
}

MetricsV0.MessageCount(id, messages.Count);

return new SinkResult<IRoutingMessage>(routingMessages);
}
catch (Exception ex)
Expand Down Expand Up @@ -365,5 +373,34 @@ internal static void InvalidMessage(string id, Exception ex)
Log.LogWarning((int)EventIds.InvalidMessage, ex, Invariant($"Non retryable exception occurred while sending message for client {id}."));
}
}

static class MetricsV0
{
static readonly CounterOptions EdgeHubToCloudMessageCountOptions = new CounterOptions
{
Name = "EdgeHubToCloudMessageSentCount",
MeasurementUnit = Unit.Events,
ResetOnReporting = true,
};

static readonly TimerOptions EdgeHubToCloudMessageLatencyOptions = new TimerOptions
{
Name = "EdgeHubToCloudMessageLatencyMs",
MeasurementUnit = Unit.None,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds
};

public static void MessageCount(string identity, int count)
=> Util.Metrics.MetricsV0.CountIncrement(GetTags(identity), EdgeHubToCloudMessageCountOptions, count);

public static IDisposable CloudLatency(string identity)
=> Util.Metrics.MetricsV0.Latency(GetTags(identity), EdgeHubToCloudMessageLatencyOptions);

static MetricTags GetTags(string id)
{
return new MetricTags("DeviceId", id);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Routing
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using App.Metrics;
using App.Metrics.Counter;
using App.Metrics.Timer;
using Microsoft.Azure.Devices.Edge.Hub.Core.Device;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
using Microsoft.Azure.Devices.Edge.Util;
Expand Down Expand Up @@ -51,16 +54,21 @@ public Task ProcessDeviceMessage(IIdentity identity, IMessage message)
Preconditions.CheckNotNull(message, nameof(message));
Preconditions.CheckNotNull(identity, nameof(identity));
Events.MessageReceived(identity, message);
IRoutingMessage routingMessage = this.ProcessMessageInternal(message, true);
Metrics.AddMessageSize(routingMessage.Size(), identity.Id);
return this.router.RouteAsync(routingMessage);
MetricsV0.MessageCount(identity, 1);
using (MetricsV0.MessageLatency(identity))
{
IRoutingMessage routingMessage = this.ProcessMessageInternal(message, true);
Metrics.AddMessageSize(routingMessage.Size(), identity.Id);
return this.router.RouteAsync(routingMessage);
}
}

public Task ProcessDeviceMessageBatch(IIdentity identity, IEnumerable<IMessage> messages)
{
IList<IMessage> messagesList = messages as IList<IMessage>
?? Preconditions.CheckNotNull(messages, nameof(messages)).ToList();
Events.MessagesReceived(identity, messagesList);
MetricsV0.MessageCount(identity, messagesList.Count);

IEnumerable<IRoutingMessage> routingMessages = messagesList
.Select(
Expand Down Expand Up @@ -260,5 +268,32 @@ static class Metrics

public static void AddMessageSize(long size, string id) => MessagesHistogram.Update(size, new[] { id });
}

static class MetricsV0
{
static readonly CounterOptions EdgeHubMessageReceivedCountOptions = new CounterOptions
{
Name = "EdgeHubMessageReceivedCount",
MeasurementUnit = Unit.Events,
ResetOnReporting = true,
};

static readonly TimerOptions EdgeHubMessageLatencyOptions = new TimerOptions
{
Name = "EdgeHubMessageLatencyMs",
MeasurementUnit = Unit.None,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds
};

public static void MessageCount(IIdentity identity, long count) => Util.Metrics.MetricsV0.CountIncrement(GetTags(identity), EdgeHubMessageReceivedCountOptions, count);

public static IDisposable MessageLatency(IIdentity identity) => Util.Metrics.MetricsV0.Latency(GetTags(identity), EdgeHubMessageLatencyOptions);

static MetricTags GetTags(IIdentity identity)
{
return new MetricTags("Id", identity.Id);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Storage
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using App.Metrics;
using App.Metrics.Timer;
using Microsoft.Azure.Devices.Edge.Storage;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Routing.Core;
Expand Down Expand Up @@ -90,20 +92,26 @@ public async Task<long> Add(string endpointId, IMessage message)
// entity store. But that should be rare enough that it might be okay. Also it is better than not being able to forward the message.
// Alternative is to add retry logic to the pump, but that is more complicated, and could affect performance.
// TODO - Need to support transactions for these operations. The underlying storage layers support it.
await this.messageEntityStore.PutOrUpdate(
edgeMessageId,
new MessageWrapper(message),
(m) =>
{
m.RefCount++;
return m;
});
using (MetricsV0.MessageStoreLatency(endpointId))
{
await this.messageEntityStore.PutOrUpdate(
edgeMessageId,
new MessageWrapper(message),
(m) =>
{
m.RefCount++;
return m;
});
}

try
{
long offset = await sequentialStore.Append(new MessageRef(edgeMessageId));
Events.MessageAdded(offset, edgeMessageId, endpointId, this.messageCount);
return offset;
using (MetricsV0.SequentialStoreLatency(endpointId))
{
long offset = await sequentialStore.Append(new MessageRef(edgeMessageId));
Events.MessageAdded(offset, edgeMessageId, endpointId, this.messageCount);
return offset;
}
}
catch (Exception)
{
Expand Down Expand Up @@ -514,5 +522,33 @@ public MessageRef(string edgeMessageId, DateTime timeStamp)

public DateTime TimeStamp { get; }
}

static class MetricsV0
{
static readonly TimerOptions MessageEntityStorePutOrUpdateLatencyOptions = new TimerOptions
{
Name = "MessageEntityStorePutOrUpdateLatencyMs",
MeasurementUnit = Unit.None,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds
};

static readonly TimerOptions SequentialStoreAppendLatencyOptions = new TimerOptions
{
Name = "SequentialStoreAppendLatencyMs",
MeasurementUnit = Unit.None,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds
};

public static IDisposable MessageStoreLatency(string identity) => Util.Metrics.MetricsV0.Latency(GetTags(identity), MessageEntityStorePutOrUpdateLatencyOptions);

public static IDisposable SequentialStoreLatency(string identity) => Util.Metrics.MetricsV0.Latency(GetTags(identity), SequentialStoreAppendLatencyOptions);

internal static MetricTags GetTags(string id)
{
return new MetricTags("EndpointId", id);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ static async Task<int> MainAsync(IConfigurationRoot configuration)
var metricsProvider = container.Resolve<IMetricsProvider>();
Metrics.Init(metricsProvider, metricsListener, logger);

// Init V0 Metrics
MetricsV0.BuildMetricsCollector(configuration);

// EdgeHub and CloudConnectionProvider have a circular dependency. So need to Bind the EdgeHub to the CloudConnectionProvider.
IEdgeHub edgeHub = await container.Resolve<Task<IEdgeHub>>();
ICloudConnectionProvider cloudConnectionProvider = await container.Resolve<Task<ICloudConnectionProvider>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ namespace Microsoft.Azure.Devices.Routing.Core.Endpoints
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using App.Metrics;
using App.Metrics.Counter;
using App.Metrics.Timer;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Concurrency;
using Microsoft.Azure.Devices.Routing.Core.Endpoints.StateMachine;
Expand Down Expand Up @@ -54,11 +57,15 @@ public async Task Invoke(IMessage message)
throw new InvalidOperationException($"Endpoint executor for endpoint {this.Endpoint} is closed.");
}

long offset = await this.messageStore.Add(this.Endpoint.Id, message);
this.checkpointer.Propose(message);
Events.AddMessageSuccess(this, offset);
using (MetricsV0.StoreLatency(this.Endpoint.Id))
{
long offset = await this.messageStore.Add(this.Endpoint.Id, message);
this.checkpointer.Propose(message);
Events.AddMessageSuccess(this, offset);
}

this.hasMessagesInQueue.Set();
MetricsV0.StoredCountIncrement(this.Endpoint.Id);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -133,6 +140,7 @@ async Task SendMessagesPump()
{
await this.ProcessMessages(messages);
Events.SendMessagesSuccess(this, messages);
MetricsV0.DrainedCountIncrement(this.Endpoint.Id, messages.Length);
}
else
{
Expand Down Expand Up @@ -322,5 +330,39 @@ public static void ErrorInPopulatePump(Exception ex)
Log.LogWarning((int)EventIds.ErrorInPopulatePump, ex, "Error in populate messages pump");
}
}

static class MetricsV0
{
static readonly CounterOptions EndpointMessageStoredCountOptions = new CounterOptions
{
Name = "EndpointMessageStoredCount",
MeasurementUnit = Unit.Events
};

static readonly CounterOptions EndpointMessageDrainedCountOptions = new CounterOptions
{
Name = "EndpointMessageDrainedCount",
MeasurementUnit = Unit.Events
};

static readonly TimerOptions EndpointMessageLatencyOptions = new TimerOptions
{
Name = "EndpointMessageStoredLatencyMs",
MeasurementUnit = Unit.None,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds
};

public static void StoredCountIncrement(string identity) => Edge.Util.Metrics.MetricsV0.CountIncrement(GetTags(identity), EndpointMessageStoredCountOptions, 1);

public static void DrainedCountIncrement(string identity, long amount) => Edge.Util.Metrics.MetricsV0.CountIncrement(GetTags(identity), EndpointMessageDrainedCountOptions, amount);

public static IDisposable StoreLatency(string identity) => Edge.Util.Metrics.MetricsV0.Latency(GetTags(identity), EndpointMessageLatencyOptions);

internal static MetricTags GetTags(string id)
{
return new MetricTags("EndpointId", id);
}
}
}
}
Loading

0 comments on commit a26e4e8

Please sign in to comment.