-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathExtensions.LinkTo.cs
52 lines (46 loc) · 1.62 KB
/
Extensions.LinkTo.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
using Open.Threading.Tasks;
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Open.Threading.Dataflow;
[System.Diagnostics.CodeAnalysis.SuppressMessage("Roslynator", "RCS1047:Non-asynchronous method name should not end with 'Async'.", Justification = "Required to differentiate between Actions and Tasks.")]
public static partial class DataFlowExtensions
{
public static IDisposable LinkTo<T>(this ISourceBlock<T> producer,
Action<T> consumer) => producer.LinkTo(new ActionBlock<T>(consumer));
public static IDisposable LinkToAsync<T>(this ISourceBlock<T> producer,
Func<T, Task> consumer) => producer.LinkTo(new ActionBlock<T>(consumer));
public static IDisposable LinkToWithCompletion<T>(this ISourceBlock<T> producer,
ITargetBlock<T> consumer) => producer.LinkTo(consumer, new DataflowLinkOptions() { PropagateCompletion = true });
public static T PropagateFaultsTo<T>(this T source, params IDataflowBlock[] targets)
where T : IDataflowBlock
{
_ = source.Completion.OnFaulted(ex =>
{
foreach (var target in targets.Where(t => t is not null))
target.Fault(ex.InnerException);
});
return source;
}
public static T PropagateCompletionTo<T>(this T source, params IDataflowBlock[] targets)
where T : IDataflowBlock
{
_ = source.Completion.ContinueWith(task =>
{
foreach (var target in targets.Where(t => t is not null))
{
if (task.IsFaulted)
{
// ReSharper disable once PossibleNullReferenceException
target.Fault(task.Exception.InnerException);
}
else
{
target.Complete();
}
}
});
return source;
}
}