-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathddp.go
124 lines (96 loc) · 2.13 KB
/
ddp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package rc
import (
"net/url"
"github.com/gopackage/ddp"
"go.uber.org/zap"
)
type ddpClient struct {
ddp *ddp.Client
streams *streams
server string
debug bool
log *zap.SugaredLogger
}
func newDDPClient(server string, debug bool, logger *zap.SugaredLogger, opts ...StreamOption) (*ddpClient, error) {
urlVals, err := url.Parse(server)
if err != nil {
return nil, err
}
if urlVals.Scheme == "https" {
urlVals.Scheme = "wss"
} else {
urlVals.Scheme = "ws"
}
urlVals.Path = "websocket"
u := urlVals.String()
d := ddp.NewClient(u, server)
if debug {
d.SetSocketLogActive(true)
}
str, err := newStreams(opts...)
if err != nil {
return nil, err
}
if err := d.Connect(); err != nil {
return nil, err
}
client := &ddpClient{
ddp: d,
log: logger,
server: server,
streams: str,
debug: debug,
}
return client, nil
}
func (d *ddpClient) Resume(ld ResumeLogin) error {
if _, err := d.call("login", ld); err != nil {
return err
}
d.streams.runStreams(d.ddp)
return nil
}
func (d *ddpClient) Reconnect() {
d.ddp.Reconnect()
}
func (d *ddpClient) Close() {
d.ddp.Close()
}
func (d *ddpClient) call(method string, args ...interface{}) (interface{}, error) {
return d.ddp.Call(method, args...)
}
func (c *Client) MessageStream() <-chan []RoomMessage {
return c.d.streams.allMsgs
}
func (c *Client) EventStream() <-chan *StreamEvent {
return c.d.streams.allEvts
}
func (c *Client) StreamErrors() <-chan error {
return c.d.streams.allErrs
}
func NewUpdateListener(fn func(ddp.Update) (interface{}, error)) (ddp.UpdateListener, *SubChannel) {
u := make(chan interface{}, 10)
e := make(chan error, 10)
ml := &updateListener{
updates: u,
errors: e,
process: fn,
}
return ml, &SubChannel{Updates: u, Errors: e}
}
type SubChannel struct {
Updates <-chan interface{}
Errors <-chan error
}
type updateListener struct {
updates chan interface{}
errors chan error
process func(ddp.Update) (interface{}, error)
}
func (ul updateListener) CollectionUpdate(coll, op, id string, update ddp.Update) {
u, err := ul.process(update)
if err != nil {
ul.errors <- err
}
ul.updates <- u
}