From 35b89507b51a98e579c9343b36c708cb5e208bfd Mon Sep 17 00:00:00 2001 From: Gabriel Lucaci Date: Wed, 19 Jan 2022 13:51:40 +0100 Subject: [PATCH] MassTransit refine send and receive activities (#23) --- .../MassTransitDiagnosticListener.cs | 79 ++++++++++++++++--- .../MassTransitDiagnosticOptions.cs | 8 +- .../MassTransitExtensions.cs | 14 ++++ 3 files changed, 87 insertions(+), 14 deletions(-) diff --git a/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticListener.cs b/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticListener.cs index f1b77bd..70f53fb 100644 --- a/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticListener.cs +++ b/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticListener.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.Linq; using Elastic.Apm.Api; using Elastic.Apm.Logging; using MassTransit; @@ -16,6 +17,7 @@ internal class MassTransitDiagnosticListener : IObserver _activities = new(); + private readonly ConcurrentDictionary _multipleActivities = new(); internal MassTransitDiagnosticListener(IApmAgent apmAgent, MassTransitDiagnosticOptions options) { @@ -60,24 +62,32 @@ private void HandleSendStart(Activity activity, object? context) { try { - IExecutionSegment? executionSegment = _apmAgent.Tracer.GetExecutionSegment(); - if (executionSegment != null && context is SendContext sendContext) + if (context is SendContext sendContext) { - var spanName = $"Send {_options.GetSendLabel(sendContext)}"; - var subType = sendContext.GetSpanSubType(); + var hasTransaction = true; + var name = $"Send {_options.GetSendLabel(sendContext)}"; + + IExecutionSegment? executionSegment = _apmAgent.Tracer.GetExecutionSegment(); + if (executionSegment == null) + { + executionSegment = _apmAgent.Tracer + .StartTransaction(name, Constants.Apm.Type); + hasTransaction = false; + } + var subType = sendContext.GetSpanSubType(); ISpan span = executionSegment.StartSpan( - spanName, + hasTransaction ? name : "Sending", Constants.Apm.Type, subType, Constants.Apm.SendAction); span.Context.Destination = new Destination { - Address = sendContext.DestinationAddress.AbsoluteUri, + Address = sendContext.DestinationAddress.AbsoluteUri, Service = new Destination.DestinationService { - Resource = $"{subType}{sendContext.DestinationAddress.AbsolutePath}", + Resource = $"{subType}{sendContext.DestinationAddress.AbsolutePath}" } }; @@ -88,7 +98,14 @@ private void HandleSendStart(Activity activity, object? context) sendContext.SetTracingData(span); - _activities.TryAdd(activity.SpanId, span); + if (hasTransaction) + { + _activities.TryAdd(activity.SpanId, span); + } + else + { + _multipleActivities.TryAdd(activity.SpanId, new[] { span, executionSegment }); + } } } catch (Exception ex) @@ -123,7 +140,28 @@ private void HandleReceiveStart(Activity activity, object? context) Constants.Apm.Type); } - _activities.TryAdd(activity.SpanId, transaction); + var subType = receiveContext.GetSpanSubType(); + ISpan span = transaction.StartSpan( + "Receiving", + Constants.Apm.Type, + subType, + Constants.Apm.SendAction); + + span.Context.Destination = new Destination + { + Address = receiveContext.InputAddress.AbsoluteUri, + Service = new Destination.DestinationService + { + Resource = $"{subType}{receiveContext.InputAddress.AbsolutePath}" + } + }; + + span.Context.Message = new Message + { + Queue = new Queue { Name = receiveContext.GetInputAbsoluteName() } + }; + + _multipleActivities.TryAdd(activity.SpanId, new[] { (IExecutionSegment)span, transaction }); } } catch (Exception ex) @@ -135,11 +173,26 @@ private void HandleReceiveStart(Activity activity, object? context) private void HandleStop(ActivitySpanId? spanId, TimeSpan duration) { - if (spanId.HasValue && - _activities.TryRemove(spanId.Value, out IExecutionSegment? executionSegment)) + if (spanId.HasValue) { - executionSegment.Duration = duration.TotalMilliseconds; - executionSegment.End(); + if (_activities.Any() && + _activities.TryRemove(spanId.Value, out IExecutionSegment? executionSegment) && + executionSegment != null) + { + executionSegment.Duration = duration.TotalMilliseconds; + executionSegment.End(); + } + + if (_multipleActivities.Any() && + _multipleActivities.TryRemove(spanId.Value, out IExecutionSegment[]? executionSegments) && + executionSegments != null) + { + for (var i = 0; i < executionSegments.Length; i++) + { + executionSegments[i].Duration = duration.TotalMilliseconds; + executionSegments[i].End(); + } + } } } diff --git a/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticOptions.cs b/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticOptions.cs index 49d48e1..5049263 100644 --- a/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticOptions.cs +++ b/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticOptions.cs @@ -9,7 +9,13 @@ public class MassTransitDiagnosticOptions context => context.DestinationAddress.AbsolutePath; private readonly Func _defaultReceiveLabel = - context => $"on {context.InputAddress.AbsolutePath} from {context.GetMessageSource()}"; + context => + { + var messageSource = context.GetMessageSource(); + return string.IsNullOrEmpty(messageSource) + ? $"on {context.InputAddress.AbsolutePath}" + : $"on {context.InputAddress.AbsolutePath} from {messageSource}"; + }; internal MassTransitDiagnosticOptions() { diff --git a/src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs b/src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs index 11944c2..52c0101 100644 --- a/src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs +++ b/src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs @@ -41,11 +41,25 @@ internal static string GetSpanSubType(this SendContext context) return SchemeToSubType.TryGetValue(scheme, out var value) ? value : scheme; } + internal static string GetSpanSubType(this ReceiveContext context) + { + var scheme = context.InputAddress.Scheme; + + return SchemeToSubType.TryGetValue(scheme, out var value) ? value : scheme; + } + internal static string GetDestinationAbsoluteName(this SendContext context) { return context.DestinationAddress.AbsolutePath .AsSpan(1, context.DestinationAddress.AbsolutePath.Length - 1) .ToString(); } + + internal static string GetInputAbsoluteName(this ReceiveContext context) + { + return context.InputAddress.AbsolutePath + .AsSpan(1, context.InputAddress.AbsolutePath.Length - 1) + .ToString(); + } } }