-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
430 lines (363 loc) · 12.1 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
'use strict'
const debug = require('debug')('topico')
const pubsub = require('mitt')()
const uid = require('ulid').ulid
/**
* The list of valid topics, which is used to create the enumeration. This list
* will grow as new topics are added.
* @type {Array}
*/
const validTopics = [
/**
* This topic is intended for general announcements regarding expected
* activity. It should not be used for reporting errors or other unplanned
* events.
*/
'INFO',
/**
* This topic is intended for information about any errors that occur.
*/
'ERROR'
]
/**
* The enumeration of valid topics.
* @type {Object}
*/
let topicEnum = {}
/**
* The number of milliseconds to wait before "timing out" a pending request.
* @type {Number}
*/
let requestTTL = 4200
/**
* Returns a frozen object that represents the current enumeration of valid
* topics. The "enumeration" is really just a hashtable (or dictionary) with
* each topic represented by a key and unique value.
*
* When calling this function, the existing enumeration must be supplied in
* order to carry over existing Symbol values.
*
* @return {Object}
*/
function createTopicEnumeration (existingEnum, topicNames) {
const dictionary = {}
topicNames.forEach((name) => {
if (Object.hasOwnProperty.call(existingEnum, name)) {
dictionary[name] = existingEnum[name]
} else {
dictionary[name] = Symbol(name)
}
})
return Object.freeze(dictionary)
}
/**
* Retuns the name of the topic if found in the enumeration, otherwise `null`.
*
* @param {Symbol} topic The value to check.
*
* @return {String?}
*/
function validate (topic) {
const topicName = (topic ? topic.toString() : '')
if (topicName.length > 0) {
const match = validTopics.filter((key) => { return (topicName === `Symbol(${key})`) })
if (match.length > 0) {
return match[0]
}
}
return null
}
class TopicalPubSub {
constructor () {
topicEnum = createTopicEnumeration({}, validTopics)
}
/**
* Returns the current enumeration of valid topics.
*
* @return {Object}
*/
get topics () {
return topicEnum
}
get requestTTL () {
return requestTTL
}
set requestTTL (value) {
if (typeof value !== 'number') {
throw new Error('The new value for "requestTTL" must be numeric.')
}
requestTTL = value
}
/**
* Adds one or more topics to the enumeration.
*
* @param {Array|String} newTopicNames The name(s) of the topic(s) to
* add. Each name will be converted
* to uppercase, and if the enum
* already contains the name, then
* it will not be re-added.
*
* @return {undefined}
*/
addTopic (newTopicNames) {
let names = null
if (Array.isArray(newTopicNames)) {
names = Array.from(newTopicNames)
} else {
names = [newTopicNames]
}
names.forEach((name) => {
if (name == null || (name.toString() !== name)) {
throw new Error('The "name" parameter for "addTopic()" is required and must be a string (or an array of strings).')
}
const formattedName = name.toUpperCase()
// Avoid adding duplicate entries.
if (!~validTopics.indexOf(formattedName)) {
debug('Adding new topic "%s"', formattedName)
validTopics.push(formattedName)
}
})
topicEnum = createTopicEnumeration(topicEnum, validTopics)
}
/**
* Adds a subscription for a particular topic.
*
* @param {Symbol} topic One of `TopicalPubSub.prototype.topics`.
*
* @param {Function} callback The function to call when data is published.
*
* @return {undefined}
*/
listen (topic, callback) {
const topicName = validate(topic)
if (!topicName) {
throw new TypeError('The "topic" parameter for "listen()" is required and must be a value from "topics".')
}
if (typeof callback !== 'function') {
throw new TypeError('The "callback" parameter for "listen()" is required and must be a function.')
}
pubsub.on(topicName, callback)
debug('Registered listener on topic "%s"', topicName)
}
/**
* Adds a one-time subscription for a particular topic.
*
* @param {Symbol} topic One of `TopicalPubSub.prototype.topics`.
*
* @param {Function} callback The function to call when data is published.
*
* @return {undefined}
*/
listenOnce (topic, callback) {
const topicName = validate(topic)
if (!topicName) {
throw new TypeError('The "topic" parameter for "listenOnce()" is required and must be a value from "topics".')
}
if (typeof callback !== 'function') {
throw new TypeError('The "callback" parameter for "listenOnce()" is required and must be a function.')
}
/**
* Create a special property that will flag this callback for removal after
* it is called the first time. Functions as objects FTW!
*/
callback.__onlyOnce__ = true
pubsub.on(topicName, callback)
debug('Registered one-time listener on topic "%s"', topicName)
}
/**
* Adds a subscription for a particular topic that will automatically cancel
* immediately after the specified primitive value (or regular expression) is
* received (or matched).
*
* @param {Symbol} topic One of `TopicalPubSub.prototype.topics`.
*
* @param {any} value The primitive value to watch for, or a
* regular expression to match against.
*
* @param {Function} callback The function to call when data is published.
* If `value` is a regular expression, then
* the callback will receive whatever payload
* matched the expression. The callback is
* executed asynchronously.
*
* @return {undefined}
*/
listenFor (topic, value, callback) {
const topicName = validate(topic)
if (!topicName) {
throw new TypeError('The "topic" parameter for "listenFor()" is required and must be a value from "topics".')
}
/**
* A list of all primitive value types. Found at
* https://developer.mozilla.org/en-US/docs/Web/JavaScript/Data_structures
* @type {Array}
*/
const primitives = [
'boolean',
'number',
'string',
'bigint',
'symbol'
]
if (value == null || (!~primitives.indexOf(typeof value) && !(value instanceof RegExp))) {
throw new TypeError('The "value" parameter for "listenFor()" is required and must be either a primitive value or an instance of RegExp.')
}
if (typeof callback !== 'function') {
throw new TypeError('The "callback" parameter for "listenFor()" is required and must be a function.')
}
const fn = (payload) => {
let matched = false
if (value instanceof RegExp && value.test(payload)) {
matched = true
process.nextTick(() => {
callback(payload)
})
} else if (payload === value) {
matched = true
process.nextTick(callback)
}
if (matched) { pubsub.off(topicName, fn) }
}
pubsub.on(topicName, fn)
}
/**
* Publishes data for a particular topic. All subscribers will be notified.
*
* @param {Symbol} topic One of `TopicalPubSub.prototype.topics`.
*
* @param {any} data The value to publish.
*
* @return {undefined}
*/
say (topic, data) {
const topicName = validate(topic)
if (!topicName) {
throw new TypeError('The "topic" parameter for "say()" is required and must be a value from "topics".')
}
debug('Saying %o on topic "%s"', data, topicName)
pubsub.emit(topicName, data)
if (pubsub.all.has(topicName)) {
debug('Removing one-time listeners...')
pubsub.all
.get(topicName)
.filter((fn) => { return fn.__onlyOnce__ })
.forEach((fn) => {
pubsub.off(topicName, fn)
})
debug('...done')
}
}
/**
* Removes all registered listeners for the specified topic.
*
* @param {Symbol} topic One of `TopicalPubSub.prototype.topics`.
*
* @return {undefined}
*/
cancel (topic) {
const topicName = validate(topic)
if (!topicName) {
throw new TypeError('The "topic" parameter for "cancel()" is required and must be a value from "topics".')
}
debug('Dropping all listeners on topic "%s"', topicName)
pubsub.all.delete(topicName)
}
/**
* Removes all registered listeners on all topics.
*
* @return {undefined}
*/
cancelAll () {
debug('Dropping all listeners on all topics')
pubsub.all.clear()
}
/**
* Request a specific piece of information from a subscriber. The subscriber
* must reply using the `respond` method, not `say`. The promise will resolve
* with the value sent to `respond`.
*
* @param {Symbol} topic One of `TopicalPubSub.prototype.topics`.
*
* @param {any} query A value that identifies the information being
* sought.
*
* @return {Promise}
*/
request (topic, query) {
return new Promise((resolve, reject) => {
const topicName = validate(topic)
if (!topicName) {
throw new TypeError('The "topic" parameter for "request()" is required and must be a value from "topics".')
}
/**
* The "tracking number" is a unique identifier that belongs to this
* request, and only this request. It is used to match up the response.
* @type {String}
*/
const trackingNo = uid()
/**
* This is used to report the amount of time spent waiting for the response.
* Should be useful for troubleshooting or anaylzing the performance of
* other parts of code.
* @type {Date}
*/
const startTime = new Date()
/**
* This Promise will only be resolved when the matching response is
* received. If (for whatever reason) the response never arrives, then the
* entire application could freeze, stuck in an endless loop.
*
* This "watchdog" is responsible for preventing such a situation. If the
* response doesn't arrive within a specific timeframe (defined above in
* the constructor), then the watchdog will cause the Promise to be
* rejected.
* @type {Timeout}
*/
const watchdog = global.setTimeout(
() => {
debug('Failed to receive response to %s within %d sec', trackingNo, (requestTTL / 1000))
reject(new Error('No response received within the required time limit.'))
},
requestTTL
)
debug('Submitting request for %o with tracking number %s', query, trackingNo)
// the handler needs to be set before calling `say` (otherwise the
// sequence of events won't work out right)
pubsub.on(trackingNo, (answer) => {
global.clearTimeout(watchdog)
const now = new Date()
let elapsedTime = (now.getTime() - startTime.getTime()) / 1000
if (elapsedTime < 1) {
elapsedTime = 'less than 1 sec'
} else {
elapsedTime = `${elapsedTime} sec`
}
debug('%s had a cycle time of %s', trackingNo, elapsedTime)
resolve(answer)
// remove the listener after we're done here, because it can never be
// called again
process.nextTick(() => {
pubsub.all.delete(trackingNo)
})
})
this.say(topic, { trackingNo: trackingNo, query: query })
})
}
/**
* Responds to a request for a specific piece of information.
*
* @param {any} trackingNo The value published as part of the original
* request. This is required in order to match
* everything up correctly.
*
* @param {any} answer The information that was requested.
*
* @return {undefined}
*/
respond (trackingNo, answer) {
debug('Received response for %s: %o', trackingNo, answer)
pubsub.emit(trackingNo, answer)
}
}
const instance = new TopicalPubSub()
module.exports = instance