Skip to content

Commit

Permalink
MassTransit support request/response inline transaction (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
glucaci authored Jan 20, 2022
1 parent 35b8950 commit 20aa527
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 17 deletions.
10 changes: 8 additions & 2 deletions src/Elastic.Apm.Messaging.MassTransit/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ namespace Elastic.Apm.Messaging.MassTransit
{
internal struct Constants
{
internal const string TraceHeaderName = "Elastic.Apm";
internal const string MessageSourceHeaderName = "Elastic.Apm.MessageSource";
internal const string TraceHeader = "Elastic.Apm";
internal const string ReceiveResponseHeader = "Elastic.Apm.ReceiveResponse";
internal const string MessageSourceHeader = "Elastic.Apm.MessageSource";
internal const string MessageResponseHeader = "Elastic.Apm.MessageResponse";
internal const string AcceptTypeHeader = "MT-Request-AcceptType";

internal struct DiagnosticListener
{
Expand All @@ -14,6 +17,8 @@ internal struct Events
{
internal const string SendStart = "MassTransit.Transport.Send.Start";
internal const string SendStop = "MassTransit.Transport.Send.Stop";
internal const string ConsumeStart = "MassTransit.Consumer.Consume.Start";
internal const string ConsumeStop = "MassTransit.Consumer.Consume.Stop";
internal const string ReceiveStart = "MassTransit.Transport.Receive.Start";
internal const string ReceiveStop = "MassTransit.Transport.Receive.Stop";
}
Expand All @@ -22,6 +27,7 @@ internal struct Apm
{
internal const string Type = "messaging";
internal const string SendAction = "send";
internal const string ConsumeAction = "consume";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public void OnNext(KeyValuePair<string, object?> value)
case Constants.Events.ReceiveStop:
HandleStop(activity.ParentSpanId, activity.Parent!.Duration);
return;
case Constants.Events.ConsumeStart:
HandleConsumeStart(activity, value.Value);
return;
case Constants.Events.ConsumeStop:
HandleStop(activity.SpanId, activity.Duration);
return;
}
}

Expand All @@ -67,6 +73,12 @@ private void HandleSendStart(Activity activity, object? context)
var hasTransaction = true;
var name = $"Send {_options.GetSendLabel(sendContext)}";

var isSendingResponse = sendContext.Headers.TryGetMessageResponse(out var messageResponse);
if (isSendingResponse)
{
name = $"Respond {messageResponse}";
}

IExecutionSegment? executionSegment = _apmAgent.Tracer.GetExecutionSegment();
if (executionSegment == null)
{
Expand All @@ -82,21 +94,22 @@ private void HandleSendStart(Activity activity, object? context)
subType,
Constants.Apm.SendAction);

Uri? address = isSendingResponse ? sendContext.SourceAddress : sendContext.DestinationAddress;
span.Context.Destination = new Destination
{
Address = sendContext.DestinationAddress.AbsoluteUri,
Service = new Destination.DestinationService
{
Resource = $"{subType}{sendContext.DestinationAddress.AbsolutePath}"
Resource = $"{subType}{address.AbsolutePath}"
}
};

span.Context.Message = new Message
{
Queue = new Queue { Name = sendContext.GetDestinationAbsoluteName() }
Queue = new Queue { Name = address.GetAbsoluteName() }
};

sendContext.SetTracingData(span);
sendContext.SetTracingData(span, isSendingResponse);

if (hasTransaction)
{
Expand All @@ -123,8 +136,15 @@ private void HandleReceiveStart(Activity activity, object? context)
{
var transactionName = $"Receive {_options.GetReceiveLabel(receiveContext)}";

var isReceivingResponse = receiveContext.TryGetMessageResponse(out var messageResponse);
if (isReceivingResponse)
{
transactionName = $"Receive response {messageResponse}";
}

ITransaction? transaction;
if (_options.InlineReceiveTransaction)
var inline = _options.InlineReceiveTransaction || receiveContext.WaitForResponse();
if (inline)
{
DistributedTracingData? tracingData = receiveContext.GetTracingData();

Expand Down Expand Up @@ -171,6 +191,50 @@ private void HandleReceiveStart(Activity activity, object? context)
}
}

private void HandleConsumeStart(Activity activity, object? context)
{
try
{
if (context is ConsumeContext consumeContext)
{
var consumerType = activity.Tags.FirstOrDefault(t => t.Key == "consumer-type").Value;
var name = string.IsNullOrEmpty(consumerType) ? "Consume" : $"Consume by {consumerType}";

IExecutionSegment? executionSegment = _apmAgent.Tracer.GetExecutionSegment();
if (executionSegment != null)
{
var subType = consumeContext.ReceiveContext.GetSpanSubType();
ISpan span = executionSegment.StartSpan(
name,
Constants.Apm.Type,
subType,
Constants.Apm.ConsumeAction);

span.Context.Destination = new Destination
{
Address = consumeContext.ReceiveContext.InputAddress.AbsoluteUri,
Service = new Destination.DestinationService
{
Resource = $"{subType}{consumeContext.ReceiveContext.InputAddress.AbsolutePath}"
}
};

span.Context.Message = new Message
{
Queue = new Queue { Name = consumeContext.ReceiveContext.GetInputAbsoluteName() }
};

_activities.TryAdd(activity.SpanId, span);
}
}
}
catch (Exception ex)
{
var message = $"{Constants.Events.ConsumeStart} instrumentation failed.";
_logger.Log(LogLevel.Error, message, ex, default);
}
}

private void HandleStop(ActivitySpanId? spanId, TimeSpan duration)
{
if (spanId.HasValue)
Expand Down
76 changes: 65 additions & 11 deletions src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using Elastic.Apm.Api;
using MassTransit;

Expand All @@ -12,26 +14,80 @@ internal static class MassTransitExtensions
{ "sb", "azureservicebus" }
};

internal static void SetTracingData(this SendContext context, ISpan span)
internal static void SetTracingData(this SendContext context, ISpan span, bool isResponse)
{
var tracingData = span.OutgoingDistributedTracingData.SerializeToString();
context.Headers.Set(
Constants.TraceHeaderName,
Constants.TraceHeader,
tracingData);
context.Headers.Set(
Constants.MessageSourceHeaderName,
Constants.MessageSourceHeader,
context.DestinationAddress.AbsolutePath);
context.Headers.Set(
Constants.ReceiveResponseHeader,
$"{isResponse}",
true);

if (context.Headers.TryGetHeader(Constants.AcceptTypeHeader, out var value) &&
value is IList<string> values)
{
var acceptType = values.FirstOrDefault();
if (!string.IsNullOrEmpty(acceptType) &&
Uri.TryCreate(acceptType, UriKind.RelativeOrAbsolute, out Uri? acceptTypeUrn))
{
context.Headers.Set(Constants.MessageResponseHeader, acceptTypeUrn.AbsolutePath);
}
}

if (isResponse)
{
context.Headers.Set(
Constants.MessageResponseHeader,
context.Headers.Get<string>(Constants.MessageResponseHeader),
true);
}
}

internal static DistributedTracingData? GetTracingData(this ReceiveContext context)
{
var tracingData = context.TransportHeaders.Get<string>(Constants.TraceHeaderName);
var tracingData = context.TransportHeaders.Get<string>(Constants.TraceHeader);
return DistributedTracingData.TryDeserializeFromString(tracingData);
}

internal static bool WaitForResponse(this ReceiveContext context)
{
return context.TransportHeaders.TryGetHeader(Constants.MessageResponseHeader, out _);
}

internal static bool TryGetMessageResponse(this Headers headers, [NotNullWhen(true)]out string? value)
{
value = default;
var hasHeader = headers.TryGetHeader(Constants.MessageResponseHeader, out var rawValue);
if (hasHeader)
{
value = rawValue as string;
}

return hasHeader;
}

internal static bool TryGetMessageResponse(this ReceiveContext context, [NotNullWhen(true)] out string? value)
{
value = default;
var hasReceiveResponse = context.TransportHeaders.TryGetHeader(Constants.ReceiveResponseHeader, out var rawReceiveResponse);
if (hasReceiveResponse &&
rawReceiveResponse is string receiveResponse &&
receiveResponse.Equals("True", StringComparison.InvariantCultureIgnoreCase))
{
return TryGetMessageResponse(context.TransportHeaders, out value);
}

return false;
}

internal static string GetMessageSource(this ReceiveContext context)
{
return context.TransportHeaders.Get<string>(Constants.MessageSourceHeaderName);
return context.TransportHeaders.Get<string>(Constants.MessageSourceHeader);
}

internal static string GetSpanSubType(this SendContext context)
Expand All @@ -48,18 +104,16 @@ internal static string GetSpanSubType(this ReceiveContext context)
return SchemeToSubType.TryGetValue(scheme, out var value) ? value : scheme;
}

internal static string GetDestinationAbsoluteName(this SendContext context)
internal static string GetAbsoluteName(this Uri address)
{
return context.DestinationAddress.AbsolutePath
.AsSpan(1, context.DestinationAddress.AbsolutePath.Length - 1)
return address.AbsolutePath
.AsSpan(1, address.AbsolutePath.Length - 1)
.ToString();
}

internal static string GetInputAbsoluteName(this ReceiveContext context)
{
return context.InputAddress.AbsolutePath
.AsSpan(1, context.InputAddress.AbsolutePath.Length - 1)
.ToString();
return context.InputAddress.GetAbsoluteName();
}
}
}

0 comments on commit 20aa527

Please sign in to comment.