forked from fsamin/go-wsqueue
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtopic.go
73 lines (65 loc) · 1.91 KB
/
topic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package wsqueue
import (
"encoding/json"
"log"
"sync"
)
//Topic implements publish and subscribe semantics. When you publish a message
//it goes to all the subscribers who are interested - so zero to many
//subscribers will receive a copy of the message. Only subscribers who had an
//active subscription at the time the broker receives the message will get a
//copy of the message.
type Topic struct {
Options *Options `json:"options,omitempty"`
Topic string `json:"topic,omitempty"`
OpenedConnectionHandler func(*Conn) `json:"-"`
ClosedConnectionHandler func(*Conn) `json:"-"`
OnMessageHandler func(*Conn, *Message) error `json:"-"`
mutex *sync.RWMutex
wsConnections map[ConnID]*Conn
}
//CreateTopic create topic
func (s *Server) CreateTopic(topic string) *Topic {
t, _ := s.newTopic(topic)
s.RegisterTopic(t)
return t
}
func (s *Server) newTopic(topic string) (*Topic, error) {
t := &Topic{
Topic: topic,
mutex: &sync.RWMutex{},
wsConnections: make(map[ConnID]*Conn),
}
return t, nil
}
//RegisterTopic register
func (s *Server) RegisterTopic(t *Topic) {
log.Printf("Register queue %s on route %s", t.Topic, s.RoutePrefix+"/wsqueue/topic/"+t.Topic)
handler := s.createHandler(
t.mutex,
&t.wsConnections,
&t.OpenedConnectionHandler,
&t.ClosedConnectionHandler,
&t.OnMessageHandler,
t.Options,
)
s.Router.HandleFunc(s.RoutePrefix+"/wsqueue/topic/"+t.Topic, handler)
s.TopicsCounter.Add(1)
}
func (t *Topic) publish(m Message) error {
t.mutex.Lock()
b, _ := json.Marshal(m)
for _, conn := range t.wsConnections {
conn.WSConn.WriteMessage(1, b)
}
t.mutex.Unlock()
return nil
}
//Publish send message to everyone
func (t *Topic) Publish(data interface{}) error {
m, e := newMessage(data)
if e != nil {
return e
}
return t.publish(*m)
}