forked from fsamin/go-wsqueue
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.go
172 lines (149 loc) · 4.36 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package wsqueue
import (
"encoding/json"
"expvar"
"fmt"
"log"
"net/http"
"sync"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/satori/go.uuid"
)
//Logfunc is a function that logs the provided message with optional
//fmt.Sprintf-style arguments. By default, logs to the default log.Logger.
//setting it to nil can be used to disable logging for this package.
//This doesn’t enforce a coupling with any specific external package
//and is already widely supported by existing loggers.
var Logfunc = log.Printf
//Warnfunc is a function that logs the provided message with optional
//fmt.Sprintf-style arguments. By default, logs to the default log.Logger.
//setting it to nil can be used to disable logging for this package.
//This doesn’t enforce a coupling with any specific external package
//and is already widely supported by existing loggers.
var Warnfunc = log.Printf
//Server is a server
type Server struct {
Router *mux.Router
RoutePrefix string
QueuesCounter *expvar.Int
TopicsCounter *expvar.Int
ClientsCounter *expvar.Int
MessagesCounter *expvar.Int
}
//StorageDriver is in-memory Stack or Redis server
type StorageDriver interface {
Open(options *Options)
Push(data interface{})
Pop() interface{}
}
//Options is options on topic or queues
type Options struct {
ACL ACL `json:"acl,omitempty"`
Storage StorageOptions `json:"storage,omitempty"`
}
//StorageOptions is a collection of options, see storage documentation
type StorageOptions map[string]interface{}
//ConnID a a connection ID
type ConnID string
//Conn is a conn
type Conn struct {
ID ConnID
WSConn *websocket.Conn
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
//NewServer init a new WSQueue server
func NewServer(router *mux.Router, routePrefix string) *Server {
s := &Server{
Router: router,
RoutePrefix: routePrefix,
}
router.HandleFunc(routePrefix+"/vars", varsHandler)
if routePrefix != "" {
routePrefix = "." + routePrefix
}
s.QueuesCounter = expvar.NewInt("wsqueue" + routePrefix + ".stats.queues.counter")
s.TopicsCounter = expvar.NewInt("wsqueue" + routePrefix + ".stats.topics.counter")
s.ClientsCounter = expvar.NewInt("wsqueue" + routePrefix + ".stats.clients.counter")
s.MessagesCounter = expvar.NewInt("wsqueue" + routePrefix + ".stats.messages.counter")
return s
}
func (s *Server) createHandler(
mutex *sync.RWMutex,
wsConnections *map[ConnID]*Conn,
openedConnectionCallback *func(*Conn),
closedConnectionCallback *func(*Conn),
onMessageCallback *func(*Conn, *Message) error,
options *Options,
) func(
w http.ResponseWriter,
r *http.Request,
) {
return func(w http.ResponseWriter, r *http.Request) {
if options != nil && len(options.ACL) > 0 {
if !checkACL(options.ACL, w, r) {
Warnfunc("Not Authorized by ACL")
w.Write([]byte("Not Authorized by ACL"))
return
}
}
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
Warnfunc("Cannot upgrade connection %s", err.Error())
w.Write([]byte(fmt.Sprintf("Cannot upgrade connection %s", err.Error())))
w.WriteHeader(426)
return
}
mutex.Lock()
conn := &Conn{
ID: ConnID(uuid.NewV4().String()),
WSConn: c,
}
if (*wsConnections)[conn.ID] != nil {
(*wsConnections)[conn.ID].WSConn.Close()
(*closedConnectionCallback)((*wsConnections)[conn.ID])
}
(*wsConnections)[conn.ID] = conn
mutex.Unlock()
if (*openedConnectionCallback) != nil {
go (*openedConnectionCallback)(conn)
}
s.ClientsCounter.Add(1)
defer c.Close()
for {
_, message, err := c.ReadMessage()
if err != nil {
mutex.Lock()
delete(*wsConnections, conn.ID)
mutex.Unlock()
if (*closedConnectionCallback) != nil {
(*closedConnectionCallback)(conn)
}
s.ClientsCounter.Add(-1)
break
}
if (*onMessageCallback) != nil {
var parsedMessage Message
if e := json.Unmarshal(message, &parsedMessage); err != nil {
Warnfunc("Cannot Unmarshall message", e.Error())
}
(*onMessageCallback)(conn, &parsedMessage)
}
}
}
}
func varsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, "{\n")
first := true
expvar.Do(func(kv expvar.KeyValue) {
if !first {
fmt.Fprintf(w, ",\n")
}
first = false
fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value)
})
fmt.Fprintf(w, "\n}\n")
}