forked from fsamin/go-wsqueue
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqueue.go
201 lines (184 loc) · 4.73 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package wsqueue
import (
"encoding/json"
"errors"
"sync"
"time"
)
const (
//MaxUint is the maximum uint on your platform
maxUint = ^uint(0)
//MinUint is the min uint on your platform
minUint = 0
//MaxInt is the max int on your platform
maxInt = int(maxUint >> 1)
//MinInt is the min int on your platform
minInt = -maxInt - 1
)
//Queue implements load balancer semantics. A single message will be received by
// exactly one consumer. If there are no consumers available at the time the
// message is sent it will be kept until a consumer is available that can process
// the message. If a consumer receives a message and does not acknowledge it
// before closing then the message will be redelivered to another consumer.
// A queue can have many consumers with messages load balanced across the
// available consumers.
type Queue struct {
Options *Options `json:"options,omitempty"`
Queue string `json:"topic,omitempty"`
newConsumerHandler func(*Conn)
consumerExitedHandler func(*Conn)
ackHandler func(*Conn, *Message) error
mutex *sync.RWMutex
wsConnections map[ConnID]*Conn
acks map[*Message]bool
lb *loadBalancer
store StorageDriver
stopQueue chan bool
}
//CreateQueue create queue
func (s *Server) CreateQueue(name string, bufferSize int) *Queue {
q, _ := s.newQueue(name, bufferSize)
s.RegisterQueue(q)
return q
}
func (s *Server) newQueue(name string, bufferSize int) (*Queue, error) {
q := &Queue{
Queue: name,
mutex: &sync.RWMutex{},
wsConnections: make(map[ConnID]*Conn),
acks: make(map[*Message]bool),
}
q.lb = &loadBalancer{queue: q, counter: make(map[ConnID]int)}
q.newConsumerHandler = newConsumerHandler(q)
q.consumerExitedHandler = consumerExitedHandler(q)
q.ackHandler = ackHandler(q)
q.store = NewStack()
q.stopQueue = make(chan bool, 1)
q.Options = &Options{Storage: StorageOptions{"capacity": bufferSize}}
return q, nil
}
//RegisterQueue register
func (s *Server) RegisterQueue(q *Queue) {
Logfunc("Register queue %s on route %s", q.Queue, s.RoutePrefix+"/wsqueue/queue/"+q.Queue)
handler := s.createHandler(
q.mutex,
&q.wsConnections,
&q.newConsumerHandler,
&q.consumerExitedHandler,
&q.ackHandler,
q.Options,
)
q.store.Open(q.Options)
q.handle(100)
s.Router.HandleFunc(s.RoutePrefix+"/wsqueue/queue/"+q.Queue, handler)
s.QueuesCounter.Add(1)
}
type loadBalancer struct {
queue *Queue
counter map[ConnID]int
}
func (lb *loadBalancer) next() (*ConnID, error) {
lb.queue.mutex.Lock()
defer lb.queue.mutex.Unlock()
if len(lb.queue.wsConnections) == 0 {
return nil, errors.New("No connection available")
}
var minCounter = maxInt
for id := range lb.queue.wsConnections {
counter := lb.counter[id]
if counter < minCounter {
minCounter = counter
}
}
for id := range lb.queue.wsConnections {
c := lb.counter[id]
if c == minCounter {
c++
lb.counter[id] = c
return &id, nil
}
}
return nil, errors.New("No connection available")
}
//Send send a message
func (q *Queue) Send(data interface{}) error {
m, e := newMessage(data)
if e != nil {
return e
}
if len(q.wsConnections) == 0 {
Logfunc("No consumer, pushing message to stack")
q.store.Push(m)
return nil
}
q.send(m)
return nil
}
func (q *Queue) send(m *Message) {
connID, err := q.lb.next()
if err != nil {
Warnfunc("Error while sending to %s : %s", *connID, err.Error())
q.store.Push(m)
} else {
conn := q.wsConnections[*connID]
q.acks[m] = false
b, _ := json.Marshal(m)
q.mutex.Lock()
err := conn.WSConn.WriteMessage(1, b)
q.mutex.Unlock()
if err != nil {
Logfunc("Error while sending to %s : %s", *connID, err.Error())
q.store.Push(m)
}
}
}
func (q *Queue) handle(interval int64) {
var cont = true
go func(c *bool) {
for *c {
time.Sleep(time.Duration(interval) * time.Millisecond)
if len(q.wsConnections) > 0 {
data := q.store.Pop()
if data != nil {
m, b := data.(*Message)
if !b {
Warnfunc("Cannot cast %s to message", data)
}
q.send(m)
}
}
}
}(&cont)
go func(c *bool) {
var b bool
//FIXME: test
b = <-q.stopQueue
cont = *c && b
}(&cont)
}
func newConsumerHandler(q *Queue) func(*Conn) {
return func(c *Conn) {
q.mutex.Lock()
q.lb.counter[c.ID] = 0
//Reinitiatlisation du load balancer
for id := range q.lb.counter {
q.lb.counter[id] = 0
}
q.mutex.Unlock()
}
}
func consumerExitedHandler(q *Queue) func(*Conn) {
return func(c *Conn) {
q.mutex.Lock()
delete(q.lb.counter, c.ID)
q.mutex.Unlock()
}
}
func ackHandler(q *Queue) func(*Conn, *Message) error {
return func(c *Conn, m *Message) error {
q.mutex.Lock()
q.acks[m] = true
q.mutex.Unlock()
return nil
}
}