From 20aa527d4af536ab4dc563b8e9547e410a621821 Mon Sep 17 00:00:00 2001 From: Gabriel Lucaci Date: Thu, 20 Jan 2022 17:52:57 +0100 Subject: [PATCH] MassTransit support request/response inline transaction (#24) --- .../Constants.cs | 10 ++- .../MassTransitDiagnosticListener.cs | 72 +++++++++++++++++- .../MassTransitExtensions.cs | 76 ++++++++++++++++--- 3 files changed, 141 insertions(+), 17 deletions(-) diff --git a/src/Elastic.Apm.Messaging.MassTransit/Constants.cs b/src/Elastic.Apm.Messaging.MassTransit/Constants.cs index b94295b..d1c5e38 100644 --- a/src/Elastic.Apm.Messaging.MassTransit/Constants.cs +++ b/src/Elastic.Apm.Messaging.MassTransit/Constants.cs @@ -2,8 +2,11 @@ namespace Elastic.Apm.Messaging.MassTransit { internal struct Constants { - internal const string TraceHeaderName = "Elastic.Apm"; - internal const string MessageSourceHeaderName = "Elastic.Apm.MessageSource"; + internal const string TraceHeader = "Elastic.Apm"; + internal const string ReceiveResponseHeader = "Elastic.Apm.ReceiveResponse"; + internal const string MessageSourceHeader = "Elastic.Apm.MessageSource"; + internal const string MessageResponseHeader = "Elastic.Apm.MessageResponse"; + internal const string AcceptTypeHeader = "MT-Request-AcceptType"; internal struct DiagnosticListener { @@ -14,6 +17,8 @@ internal struct Events { internal const string SendStart = "MassTransit.Transport.Send.Start"; internal const string SendStop = "MassTransit.Transport.Send.Stop"; + internal const string ConsumeStart = "MassTransit.Consumer.Consume.Start"; + internal const string ConsumeStop = "MassTransit.Consumer.Consume.Stop"; internal const string ReceiveStart = "MassTransit.Transport.Receive.Start"; internal const string ReceiveStop = "MassTransit.Transport.Receive.Stop"; } @@ -22,6 +27,7 @@ internal struct Apm { internal const string Type = "messaging"; internal const string SendAction = "send"; + internal const string ConsumeAction = "consume"; } } } diff --git a/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticListener.cs b/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticListener.cs index 70f53fb..8a052e8 100644 --- a/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticListener.cs +++ b/src/Elastic.Apm.Messaging.MassTransit/MassTransitDiagnosticListener.cs @@ -47,6 +47,12 @@ public void OnNext(KeyValuePair value) case Constants.Events.ReceiveStop: HandleStop(activity.ParentSpanId, activity.Parent!.Duration); return; + case Constants.Events.ConsumeStart: + HandleConsumeStart(activity, value.Value); + return; + case Constants.Events.ConsumeStop: + HandleStop(activity.SpanId, activity.Duration); + return; } } @@ -67,6 +73,12 @@ private void HandleSendStart(Activity activity, object? context) var hasTransaction = true; var name = $"Send {_options.GetSendLabel(sendContext)}"; + var isSendingResponse = sendContext.Headers.TryGetMessageResponse(out var messageResponse); + if (isSendingResponse) + { + name = $"Respond {messageResponse}"; + } + IExecutionSegment? executionSegment = _apmAgent.Tracer.GetExecutionSegment(); if (executionSegment == null) { @@ -82,21 +94,22 @@ private void HandleSendStart(Activity activity, object? context) subType, Constants.Apm.SendAction); + Uri? address = isSendingResponse ? sendContext.SourceAddress : sendContext.DestinationAddress; span.Context.Destination = new Destination { Address = sendContext.DestinationAddress.AbsoluteUri, Service = new Destination.DestinationService { - Resource = $"{subType}{sendContext.DestinationAddress.AbsolutePath}" + Resource = $"{subType}{address.AbsolutePath}" } }; span.Context.Message = new Message { - Queue = new Queue { Name = sendContext.GetDestinationAbsoluteName() } + Queue = new Queue { Name = address.GetAbsoluteName() } }; - sendContext.SetTracingData(span); + sendContext.SetTracingData(span, isSendingResponse); if (hasTransaction) { @@ -123,8 +136,15 @@ private void HandleReceiveStart(Activity activity, object? context) { var transactionName = $"Receive {_options.GetReceiveLabel(receiveContext)}"; + var isReceivingResponse = receiveContext.TryGetMessageResponse(out var messageResponse); + if (isReceivingResponse) + { + transactionName = $"Receive response {messageResponse}"; + } + ITransaction? transaction; - if (_options.InlineReceiveTransaction) + var inline = _options.InlineReceiveTransaction || receiveContext.WaitForResponse(); + if (inline) { DistributedTracingData? tracingData = receiveContext.GetTracingData(); @@ -171,6 +191,50 @@ private void HandleReceiveStart(Activity activity, object? context) } } + private void HandleConsumeStart(Activity activity, object? context) + { + try + { + if (context is ConsumeContext consumeContext) + { + var consumerType = activity.Tags.FirstOrDefault(t => t.Key == "consumer-type").Value; + var name = string.IsNullOrEmpty(consumerType) ? "Consume" : $"Consume by {consumerType}"; + + IExecutionSegment? executionSegment = _apmAgent.Tracer.GetExecutionSegment(); + if (executionSegment != null) + { + var subType = consumeContext.ReceiveContext.GetSpanSubType(); + ISpan span = executionSegment.StartSpan( + name, + Constants.Apm.Type, + subType, + Constants.Apm.ConsumeAction); + + span.Context.Destination = new Destination + { + Address = consumeContext.ReceiveContext.InputAddress.AbsoluteUri, + Service = new Destination.DestinationService + { + Resource = $"{subType}{consumeContext.ReceiveContext.InputAddress.AbsolutePath}" + } + }; + + span.Context.Message = new Message + { + Queue = new Queue { Name = consumeContext.ReceiveContext.GetInputAbsoluteName() } + }; + + _activities.TryAdd(activity.SpanId, span); + } + } + } + catch (Exception ex) + { + var message = $"{Constants.Events.ConsumeStart} instrumentation failed."; + _logger.Log(LogLevel.Error, message, ex, default); + } + } + private void HandleStop(ActivitySpanId? spanId, TimeSpan duration) { if (spanId.HasValue) diff --git a/src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs b/src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs index 52c0101..4c3e61b 100644 --- a/src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs +++ b/src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs @@ -1,5 +1,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; using Elastic.Apm.Api; using MassTransit; @@ -12,26 +14,80 @@ internal static class MassTransitExtensions { "sb", "azureservicebus" } }; - internal static void SetTracingData(this SendContext context, ISpan span) + internal static void SetTracingData(this SendContext context, ISpan span, bool isResponse) { var tracingData = span.OutgoingDistributedTracingData.SerializeToString(); context.Headers.Set( - Constants.TraceHeaderName, + Constants.TraceHeader, tracingData); context.Headers.Set( - Constants.MessageSourceHeaderName, + Constants.MessageSourceHeader, context.DestinationAddress.AbsolutePath); + context.Headers.Set( + Constants.ReceiveResponseHeader, + $"{isResponse}", + true); + + if (context.Headers.TryGetHeader(Constants.AcceptTypeHeader, out var value) && + value is IList values) + { + var acceptType = values.FirstOrDefault(); + if (!string.IsNullOrEmpty(acceptType) && + Uri.TryCreate(acceptType, UriKind.RelativeOrAbsolute, out Uri? acceptTypeUrn)) + { + context.Headers.Set(Constants.MessageResponseHeader, acceptTypeUrn.AbsolutePath); + } + } + + if (isResponse) + { + context.Headers.Set( + Constants.MessageResponseHeader, + context.Headers.Get(Constants.MessageResponseHeader), + true); + } } internal static DistributedTracingData? GetTracingData(this ReceiveContext context) { - var tracingData = context.TransportHeaders.Get(Constants.TraceHeaderName); + var tracingData = context.TransportHeaders.Get(Constants.TraceHeader); return DistributedTracingData.TryDeserializeFromString(tracingData); } + internal static bool WaitForResponse(this ReceiveContext context) + { + return context.TransportHeaders.TryGetHeader(Constants.MessageResponseHeader, out _); + } + + internal static bool TryGetMessageResponse(this Headers headers, [NotNullWhen(true)]out string? value) + { + value = default; + var hasHeader = headers.TryGetHeader(Constants.MessageResponseHeader, out var rawValue); + if (hasHeader) + { + value = rawValue as string; + } + + return hasHeader; + } + + internal static bool TryGetMessageResponse(this ReceiveContext context, [NotNullWhen(true)] out string? value) + { + value = default; + var hasReceiveResponse = context.TransportHeaders.TryGetHeader(Constants.ReceiveResponseHeader, out var rawReceiveResponse); + if (hasReceiveResponse && + rawReceiveResponse is string receiveResponse && + receiveResponse.Equals("True", StringComparison.InvariantCultureIgnoreCase)) + { + return TryGetMessageResponse(context.TransportHeaders, out value); + } + + return false; + } + internal static string GetMessageSource(this ReceiveContext context) { - return context.TransportHeaders.Get(Constants.MessageSourceHeaderName); + return context.TransportHeaders.Get(Constants.MessageSourceHeader); } internal static string GetSpanSubType(this SendContext context) @@ -48,18 +104,16 @@ internal static string GetSpanSubType(this ReceiveContext context) return SchemeToSubType.TryGetValue(scheme, out var value) ? value : scheme; } - internal static string GetDestinationAbsoluteName(this SendContext context) + internal static string GetAbsoluteName(this Uri address) { - return context.DestinationAddress.AbsolutePath - .AsSpan(1, context.DestinationAddress.AbsolutePath.Length - 1) + return address.AbsolutePath + .AsSpan(1, address.AbsolutePath.Length - 1) .ToString(); } internal static string GetInputAbsoluteName(this ReceiveContext context) { - return context.InputAddress.AbsolutePath - .AsSpan(1, context.InputAddress.AbsolutePath.Length - 1) - .ToString(); + return context.InputAddress.GetAbsoluteName(); } } }