diff --git a/src/Elastic.Apm.Messaging.MassTransit/Constants.cs b/src/Elastic.Apm.Messaging.MassTransit/Constants.cs index cb2063f..b94295b 100644 --- a/src/Elastic.Apm.Messaging.MassTransit/Constants.cs +++ b/src/Elastic.Apm.Messaging.MassTransit/Constants.cs @@ -3,6 +3,7 @@ namespace Elastic.Apm.Messaging.MassTransit internal struct Constants { internal const string TraceHeaderName = "Elastic.Apm"; + internal const string MessageSourceHeaderName = "Elastic.Apm.MessageSource"; internal struct DiagnosticListener { diff --git a/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticListener.cs b/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticListener.cs index d4e130b..f1b77bd 100644 --- a/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticListener.cs +++ b/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticListener.cs @@ -15,8 +15,7 @@ internal class MassTransitDiagnosticListener : IObserver _activities = - new ConcurrentDictionary(); + private readonly ConcurrentDictionary _activities = new(); internal MassTransitDiagnosticListener(IApmAgent apmAgent, MassTransitDiagnosticOptions options) { @@ -65,14 +64,28 @@ private void HandleSendStart(Activity activity, object? context) if (executionSegment != null && context is SendContext sendContext) { var spanName = $"Send {_options.GetSendLabel(sendContext)}"; - var subType = sendContext.DestinationAddress.Scheme; + var subType = sendContext.GetSpanSubType(); ISpan span = executionSegment.StartSpan( spanName, Constants.Apm.Type, subType, Constants.Apm.SendAction); - + + span.Context.Destination = new Destination + { + Address = sendContext.DestinationAddress.AbsoluteUri, + Service = new Destination.DestinationService + { + Resource = $"{subType}{sendContext.DestinationAddress.AbsolutePath}", + } + }; + + span.Context.Message = new Message + { + Queue = new Queue { Name = sendContext.GetDestinationAbsoluteName() } + }; + sendContext.SetTracingData(span); _activities.TryAdd(activity.SpanId, span); @@ -91,13 +104,24 @@ private void HandleReceiveStart(Activity activity, object? context) { if (context is ReceiveContext receiveContext) { - DistributedTracingData? tracingData = receiveContext.GetTracingData(); var transactionName = $"Receive {_options.GetReceiveLabel(receiveContext)}"; - ITransaction transaction = _apmAgent.Tracer.StartTransaction( - transactionName, - Constants.Apm.Type, - tracingData); + ITransaction? transaction; + if (_options.InlineReceiveTransaction) + { + DistributedTracingData? tracingData = receiveContext.GetTracingData(); + + transaction = _apmAgent.Tracer.StartTransaction( + transactionName, + Constants.Apm.Type, + tracingData); + } + else + { + transaction = _apmAgent.Tracer.StartTransaction( + transactionName, + Constants.Apm.Type); + } _activities.TryAdd(activity.SpanId, transaction); } diff --git a/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticOptions.cs b/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticOptions.cs index 61bc9ee..49d48e1 100644 --- a/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticOptions.cs +++ b/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticOptions.cs @@ -9,7 +9,7 @@ public class MassTransitDiagnosticOptions context => context.DestinationAddress.AbsolutePath; private readonly Func _defaultReceiveLabel = - context => context.InputAddress.AbsolutePath; + context => $"on {context.InputAddress.AbsolutePath} from {context.GetMessageSource()}"; internal MassTransitDiagnosticOptions() { @@ -17,7 +17,7 @@ internal MassTransitDiagnosticOptions() ReceiveLabel = _defaultReceiveLabel; } - internal string GetSendLabel(SendContext context) => + internal string GetSendLabel(SendContext context) => GetLabel(context, SendLabel, _defaultSendLabel); internal string GetReceiveLabel(ReceiveContext context) => @@ -49,5 +49,12 @@ private string GetLabel( /// If the return value is empty or null, it will be replace with the default label. /// public Func ReceiveLabel { get; set; } + + /// + /// True if the receive transaction should has as a parent the send span and + /// false if the receive transaction is a root transaction. + /// Default: false. + /// + public bool InlineReceiveTransaction { get; set; } = false; } } diff --git a/src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs b/src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs index 9a28013..11944c2 100644 --- a/src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs +++ b/src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs @@ -1,3 +1,5 @@ +using System; +using System.Collections.Generic; using Elastic.Apm.Api; using MassTransit; @@ -5,10 +7,20 @@ namespace Elastic.Apm.Messaging.MassTransit { internal static class MassTransitExtensions { + private static readonly Dictionary SchemeToSubType = new() + { + { "sb", "azureservicebus" } + }; + internal static void SetTracingData(this SendContext context, ISpan span) { var tracingData = span.OutgoingDistributedTracingData.SerializeToString(); - context.Headers.Set(Constants.TraceHeaderName, tracingData); + context.Headers.Set( + Constants.TraceHeaderName, + tracingData); + context.Headers.Set( + Constants.MessageSourceHeaderName, + context.DestinationAddress.AbsolutePath); } internal static DistributedTracingData? GetTracingData(this ReceiveContext context) @@ -16,5 +28,24 @@ internal static void SetTracingData(this SendContext context, ISpan span) var tracingData = context.TransportHeaders.Get(Constants.TraceHeaderName); return DistributedTracingData.TryDeserializeFromString(tracingData); } + + internal static string GetMessageSource(this ReceiveContext context) + { + return context.TransportHeaders.Get(Constants.MessageSourceHeaderName); + } + + internal static string GetSpanSubType(this SendContext context) + { + var scheme = context.DestinationAddress.Scheme; + + return SchemeToSubType.TryGetValue(scheme, out var value) ? value : scheme; + } + + internal static string GetDestinationAbsoluteName(this SendContext context) + { + return context.DestinationAddress.AbsolutePath + .AsSpan(1, context.DestinationAddress.AbsolutePath.Length - 1) + .ToString(); + } } }