Skip to content

Commit

Permalink
MassTransit refine send and receive activities (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
glucaci authored Jan 19, 2022
1 parent b89dde6 commit 35b8950
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using Elastic.Apm.Api;
using Elastic.Apm.Logging;
using MassTransit;
Expand All @@ -16,6 +17,7 @@ internal class MassTransitDiagnosticListener : IObserver<KeyValuePair<string, ob
private readonly MassTransitDiagnosticOptions _options;

private readonly ConcurrentDictionary<ActivitySpanId, IExecutionSegment> _activities = new();
private readonly ConcurrentDictionary<ActivitySpanId, IExecutionSegment[]> _multipleActivities = new();

internal MassTransitDiagnosticListener(IApmAgent apmAgent, MassTransitDiagnosticOptions options)
{
Expand Down Expand Up @@ -60,24 +62,32 @@ private void HandleSendStart(Activity activity, object? context)
{
try
{
IExecutionSegment? executionSegment = _apmAgent.Tracer.GetExecutionSegment();
if (executionSegment != null && context is SendContext sendContext)
if (context is SendContext sendContext)
{
var spanName = $"Send {_options.GetSendLabel(sendContext)}";
var subType = sendContext.GetSpanSubType();
var hasTransaction = true;
var name = $"Send {_options.GetSendLabel(sendContext)}";

IExecutionSegment? executionSegment = _apmAgent.Tracer.GetExecutionSegment();
if (executionSegment == null)
{
executionSegment = _apmAgent.Tracer
.StartTransaction(name, Constants.Apm.Type);
hasTransaction = false;
}

var subType = sendContext.GetSpanSubType();
ISpan span = executionSegment.StartSpan(
spanName,
hasTransaction ? name : "Sending",
Constants.Apm.Type,
subType,
Constants.Apm.SendAction);

span.Context.Destination = new Destination
{
Address = sendContext.DestinationAddress.AbsoluteUri,
Address = sendContext.DestinationAddress.AbsoluteUri,
Service = new Destination.DestinationService
{
Resource = $"{subType}{sendContext.DestinationAddress.AbsolutePath}",
Resource = $"{subType}{sendContext.DestinationAddress.AbsolutePath}"
}
};

Expand All @@ -88,7 +98,14 @@ private void HandleSendStart(Activity activity, object? context)

sendContext.SetTracingData(span);

_activities.TryAdd(activity.SpanId, span);
if (hasTransaction)
{
_activities.TryAdd(activity.SpanId, span);
}
else
{
_multipleActivities.TryAdd(activity.SpanId, new[] { span, executionSegment });
}
}
}
catch (Exception ex)
Expand Down Expand Up @@ -123,7 +140,28 @@ private void HandleReceiveStart(Activity activity, object? context)
Constants.Apm.Type);
}

_activities.TryAdd(activity.SpanId, transaction);
var subType = receiveContext.GetSpanSubType();
ISpan span = transaction.StartSpan(
"Receiving",
Constants.Apm.Type,
subType,
Constants.Apm.SendAction);

span.Context.Destination = new Destination
{
Address = receiveContext.InputAddress.AbsoluteUri,
Service = new Destination.DestinationService
{
Resource = $"{subType}{receiveContext.InputAddress.AbsolutePath}"
}
};

span.Context.Message = new Message
{
Queue = new Queue { Name = receiveContext.GetInputAbsoluteName() }
};

_multipleActivities.TryAdd(activity.SpanId, new[] { (IExecutionSegment)span, transaction });
}
}
catch (Exception ex)
Expand All @@ -135,11 +173,26 @@ private void HandleReceiveStart(Activity activity, object? context)

private void HandleStop(ActivitySpanId? spanId, TimeSpan duration)
{
if (spanId.HasValue &&
_activities.TryRemove(spanId.Value, out IExecutionSegment? executionSegment))
if (spanId.HasValue)
{
executionSegment.Duration = duration.TotalMilliseconds;
executionSegment.End();
if (_activities.Any() &&
_activities.TryRemove(spanId.Value, out IExecutionSegment? executionSegment) &&
executionSegment != null)
{
executionSegment.Duration = duration.TotalMilliseconds;
executionSegment.End();
}

if (_multipleActivities.Any() &&
_multipleActivities.TryRemove(spanId.Value, out IExecutionSegment[]? executionSegments) &&
executionSegments != null)
{
for (var i = 0; i < executionSegments.Length; i++)
{
executionSegments[i].Duration = duration.TotalMilliseconds;
executionSegments[i].End();
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ public class MassTransitDiagnosticOptions
context => context.DestinationAddress.AbsolutePath;

private readonly Func<ReceiveContext, string> _defaultReceiveLabel =
context => $"on {context.InputAddress.AbsolutePath} from {context.GetMessageSource()}";
context =>
{
var messageSource = context.GetMessageSource();
return string.IsNullOrEmpty(messageSource)
? $"on {context.InputAddress.AbsolutePath}"
: $"on {context.InputAddress.AbsolutePath} from {messageSource}";
};

internal MassTransitDiagnosticOptions()
{
Expand Down
14 changes: 14 additions & 0 deletions src/Elastic.Apm.Messaging.MassTransit/MassTransitExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,25 @@ internal static string GetSpanSubType(this SendContext context)
return SchemeToSubType.TryGetValue(scheme, out var value) ? value : scheme;
}

internal static string GetSpanSubType(this ReceiveContext context)
{
var scheme = context.InputAddress.Scheme;

return SchemeToSubType.TryGetValue(scheme, out var value) ? value : scheme;
}

internal static string GetDestinationAbsoluteName(this SendContext context)
{
return context.DestinationAddress.AbsolutePath
.AsSpan(1, context.DestinationAddress.AbsolutePath.Length - 1)
.ToString();
}

internal static string GetInputAbsoluteName(this ReceiveContext context)
{
return context.InputAddress.AbsolutePath
.AsSpan(1, context.InputAddress.AbsolutePath.Length - 1)
.ToString();
}
}
}

0 comments on commit 35b8950

Please sign in to comment.