Skip to content

Commit

Permalink
Refactor MassTransit receive transaction (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
glucaci authored Jan 13, 2022
1 parent 9e8d990 commit 5796716
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/Elastic.Apm.Messaging.MassTransit/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace Elastic.Apm.Messaging.MassTransit
internal struct Constants
{
internal const string TraceHeaderName = "Elastic.Apm";
internal const string MessageSourceHeaderName = "Elastic.Apm.MessageSource";

internal struct DiagnosticListener
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ internal class MassTransitDiagnosticListener : IObserver<KeyValuePair<string, ob
private readonly IApmLogger _logger;
private readonly MassTransitDiagnosticOptions _options;

private readonly ConcurrentDictionary<ActivitySpanId, IExecutionSegment> _activities =
new ConcurrentDictionary<ActivitySpanId, IExecutionSegment>();
private readonly ConcurrentDictionary<ActivitySpanId, IExecutionSegment> _activities = new();

internal MassTransitDiagnosticListener(IApmAgent apmAgent, MassTransitDiagnosticOptions options)
{
Expand Down Expand Up @@ -65,14 +64,28 @@ private void HandleSendStart(Activity activity, object? context)
if (executionSegment != null && context is SendContext sendContext)
{
var spanName = $"Send {_options.GetSendLabel(sendContext)}";
var subType = sendContext.DestinationAddress.Scheme;
var subType = sendContext.GetSpanSubType();

ISpan span = executionSegment.StartSpan(
spanName,
Constants.Apm.Type,
subType,
Constants.Apm.SendAction);


span.Context.Destination = new Destination
{
Address = sendContext.DestinationAddress.AbsoluteUri,
Service = new Destination.DestinationService
{
Resource = $"{subType}{sendContext.DestinationAddress.AbsolutePath}",
}
};

span.Context.Message = new Message
{
Queue = new Queue { Name = sendContext.GetDestinationAbsoluteName() }
};

sendContext.SetTracingData(span);

_activities.TryAdd(activity.SpanId, span);
Expand All @@ -91,13 +104,24 @@ private void HandleReceiveStart(Activity activity, object? context)
{
if (context is ReceiveContext receiveContext)
{
DistributedTracingData? tracingData = receiveContext.GetTracingData();
var transactionName = $"Receive {_options.GetReceiveLabel(receiveContext)}";

ITransaction transaction = _apmAgent.Tracer.StartTransaction(
transactionName,
Constants.Apm.Type,
tracingData);
ITransaction? transaction;
if (_options.InlineReceiveTransaction)
{
DistributedTracingData? tracingData = receiveContext.GetTracingData();

transaction = _apmAgent.Tracer.StartTransaction(
transactionName,
Constants.Apm.Type,
tracingData);
}
else
{
transaction = _apmAgent.Tracer.StartTransaction(
transactionName,
Constants.Apm.Type);
}

_activities.TryAdd(activity.SpanId, transaction);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ public class MassTransitDiagnosticOptions
context => context.DestinationAddress.AbsolutePath;

private readonly Func<ReceiveContext, string> _defaultReceiveLabel =
context => context.InputAddress.AbsolutePath;
context => $"on {context.InputAddress.AbsolutePath} from {context.GetMessageSource()}";

internal MassTransitDiagnosticOptions()
{
SendLabel = _defaultSendLabel;
ReceiveLabel = _defaultReceiveLabel;
}

internal string GetSendLabel(SendContext context) =>
internal string GetSendLabel(SendContext context) =>
GetLabel(context, SendLabel, _defaultSendLabel);

internal string GetReceiveLabel(ReceiveContext context) =>
Expand Down Expand Up @@ -49,5 +49,12 @@ private string GetLabel<T>(
/// If the return value is empty or null, it will be replace with the default label.
/// </summary>
public Func<ReceiveContext, string> ReceiveLabel { get; set; }

/// <summary>
/// True if the receive transaction should has as a parent the send span and
/// false if the receive transaction is a root transaction.
/// Default: false.
/// </summary>
public bool InlineReceiveTransaction { get; set; } = false;
}
}
33 changes: 32 additions & 1 deletion src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,51 @@
using System;
using System.Collections.Generic;
using Elastic.Apm.Api;
using MassTransit;

namespace Elastic.Apm.Messaging.MassTransit
{
internal static class MassTransitExtensions
{
private static readonly Dictionary<string, string> SchemeToSubType = new()
{
{ "sb", "azureservicebus" }
};

internal static void SetTracingData(this SendContext context, ISpan span)
{
var tracingData = span.OutgoingDistributedTracingData.SerializeToString();
context.Headers.Set(Constants.TraceHeaderName, tracingData);
context.Headers.Set(
Constants.TraceHeaderName,
tracingData);
context.Headers.Set(
Constants.MessageSourceHeaderName,
context.DestinationAddress.AbsolutePath);
}

internal static DistributedTracingData? GetTracingData(this ReceiveContext context)
{
var tracingData = context.TransportHeaders.Get<string>(Constants.TraceHeaderName);
return DistributedTracingData.TryDeserializeFromString(tracingData);
}

internal static string GetMessageSource(this ReceiveContext context)
{
return context.TransportHeaders.Get<string>(Constants.MessageSourceHeaderName);
}

internal static string GetSpanSubType(this SendContext context)
{
var scheme = context.DestinationAddress.Scheme;

return SchemeToSubType.TryGetValue(scheme, out var value) ? value : scheme;
}

internal static string GetDestinationAbsoluteName(this SendContext context)
{
return context.DestinationAddress.AbsolutePath
.AsSpan(1, context.DestinationAddress.AbsolutePath.Length - 1)
.ToString();
}
}
}

0 comments on commit 5796716

Please sign in to comment.