-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpubsub.go
147 lines (124 loc) · 2.95 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Package pubsub implements an unbounded channel for the pub/sub pattern.
package pubsub
import (
"context"
"sync"
)
// Subscriber is a subscriber that subscribes to a Pipe. A zero-value Subscriber
// is a valid Subscriber.
type Subscriber[T any] struct {
mu sync.RWMutex
subs map[chan<- T]pipeSub[T]
}
type pipeSub[T any] struct {
queue chan<- T // unbounded FIFO queue
stop chan struct{}
}
// NewSubscriber creates a new Subscriber.
func NewSubscriber[T any]() *Subscriber[T] {
return &Subscriber[T]{}
}
// Listen starts broadcasting messages received from the given src channel.
// It blocks until the src channel is closed or ctx is canceled.
func (s *Subscriber[T]) Listen(ctx context.Context, src <-chan T) error {
s.mu.Lock()
if s.subs == nil {
s.subs = make(map[chan<- T]pipeSub[T])
}
s.mu.Unlock()
for {
select {
case <-ctx.Done():
return ctx.Err()
case msg, ok := <-src:
if !ok {
return nil
}
s.publish(ctx, msg)
}
}
}
func (s *Subscriber[T]) publish(ctx context.Context, value T) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, sub := range s.subs {
// We're ok with waiting for sub.queue to accept the value, since
// they go to unbounded queues.
select {
case <-ctx.Done():
return
case <-sub.stop:
break
case sub.queue <- value:
// ok
}
}
}
// FilterFunc is a filter function for any type.
// If the function returns true, the message will be sent to the subscriber.
// If the function is nil, all messages will be sent to the subscriber.
type FilterFunc[T any] func(T) bool
// Subscribe subscribes ch to incoming messages from the given recipient.
// Calls to Subscribe should always be paired with Unsubscribe. It is
// recommended to use defer.
//
// Subscribe panics if it's called on a Subscriber w/ a src that's already
// closed.
func (s *Subscriber[T]) Subscribe(ch chan<- T, filter FilterFunc[T]) {
if filter == nil {
filter = func(T) bool { return true }
}
s.mu.Lock()
defer s.mu.Unlock()
if s.subs == nil {
s.subs = make(map[chan<- T]pipeSub[T])
}
_, ok := s.subs[ch]
if ok {
panic("twipi: channel already subscribed")
}
queue := make(chan T, 1)
stop := make(chan struct{})
s.subs[ch] = pipeSub[T]{
queue: queue,
stop: stop,
}
go func() {
var dst chan<- T
pending := NewQueue[T]()
for {
select {
case msg := <-queue:
if !filter(msg) {
break
}
pending.Enqueue(msg)
dst = ch
case dst <- pending.PendingOrZero():
pending.Dequeue()
// If the pending queue is drained, then stop sending.
if pending.IsEmpty() {
dst = nil
}
case <-stop:
close(ch)
return
}
}
}()
}
// Unsubscribe unsubscribes ch from incoming messages from all its recipients.
// Once unsubscribed, ch will be closed.
func (s *Subscriber[T]) Unsubscribe(ch chan<- T) {
s.mu.Lock()
defer s.mu.Unlock()
s.unsub(ch)
}
func (s *Subscriber[T]) unsub(ch chan<- T) {
sub, ok := s.subs[ch]
if !ok {
return
}
close(sub.stop)
delete(s.subs, ch)
}