-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathendpoint.go
83 lines (69 loc) · 1.74 KB
/
endpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package grinta
import (
"context"
"io"
"sync"
"github.com/raskyld/grinta/pkg/flow"
)
// Endpoint is resolvable on a `Fabric` and allows the owner to
// `Accept` inbound `Flow`.
type Endpoint interface {
Name() string
// Accept will block until the context is canceled or a flow establishment
// request is received, it always returns a `RawReceiver` and
// MAY return a non-nil `RawSender` in case the flow is bidirectional.
Accept(context.Context) (flow.RawReceiver, flow.RawSender, error)
io.Closer
}
var _ Endpoint = (*endpoint)(nil)
type endpoint struct {
name string
closed bool
closeCh chan struct{}
writers sync.WaitGroup
lk sync.Mutex
epGC func(*endpoint)
flowCh chan flow.Raw
}
func newEndpoint(name string, garbageCollectorCallback func(*endpoint)) *endpoint {
return &endpoint{
epGC: garbageCollectorCallback,
name: name,
flowCh: make(chan flow.Raw, 64),
closeCh: make(chan struct{}, 1),
}
}
func (ep *endpoint) Name() string {
return ep.name
}
func (ep *endpoint) Accept(ctx context.Context) (flow.RawReceiver, flow.RawSender, error) {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case inboundChan := <-ep.flowCh:
if inboundChan.RawReceiver == nil {
return nil, nil, ErrEndpointClosed
}
return inboundChan.RawReceiver, inboundChan.RawSender, nil
}
}
func (ep *endpoint) Close() error {
return ep.close(false)
}
// since close may be initiated by the Endpoint Garbage Collector,
// we need a way to know if calling the callback is needed.
func (ep *endpoint) close(plannedForGc bool) error {
ep.lk.Lock()
defer ep.lk.Unlock()
if ep.closed {
return nil
}
ep.closed = true
close(ep.closeCh)
ep.writers.Wait()
close(ep.flowCh)
if !plannedForGc {
ep.epGC(ep)
}
return nil
}