From e865f8258aef8c896b0fdf9da37a111298078668 Mon Sep 17 00:00:00 2001 From: Jackson Tian Date: Thu, 22 Feb 2024 15:21:31 +0800 Subject: [PATCH] lib: fix sse implemention --- lib/index.js | 38 ++++++++------ test/index.test.js | 128 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 146 insertions(+), 20 deletions(-) diff --git a/lib/index.js b/lib/index.js index a545ab8..8fa74f1 100644 --- a/lib/index.js +++ b/lib/index.js @@ -295,13 +295,6 @@ function readyToRead(readable) { class Event { constructor(id, event, data, retry) { - if(id instanceof Object) { - this.id = id.id; - this.event = id.event; - this.data = id.data; - this.retry = id.retry; - return this; - } this.id = id; this.event = event; this.data = data; @@ -311,10 +304,20 @@ class Event { exports.Event = Event; -const DATA_PREFIX = 'data: '; -const EVENT_PREFIX = 'event: '; -const ID_PREFIX = 'id: '; -const RETRY_PREFIX = 'retry: '; +const DATA_PREFIX = 'data:'; +const EVENT_PREFIX = 'event:'; +const ID_PREFIX = 'id:'; +const RETRY_PREFIX = 'retry:'; + +function isDigitsOnly(str) { + for (let i = 0; i < str.length; i++) { + const c = str.charAt(i); + if (c < '0' || c > '9') { + return false; + } + } + return str.length > 0; +} function tryGetEvents(head, chunk) { const all = head + chunk; @@ -329,13 +332,18 @@ function tryGetEvents(head, chunk) { const event = new Event(); lines.forEach((line) => { if (line.startsWith(DATA_PREFIX)) { - event.data = line.substring(DATA_PREFIX.length); + event.data = line.substring(DATA_PREFIX.length).trim(); } else if (line.startsWith(EVENT_PREFIX)) { - event.event = line.substring(EVENT_PREFIX.length); + event.event = line.substring(EVENT_PREFIX.length).trim(); } else if (line.startsWith(ID_PREFIX)) { - event.id = line.substring(ID_PREFIX.length); + event.id = line.substring(ID_PREFIX.length).trim(); } else if (line.startsWith(RETRY_PREFIX)) { - event.retry = parseInt(line.substring(RETRY_PREFIX.length)); + const retry = line.substring(RETRY_PREFIX.length).trim(); + if (isDigitsOnly(retry)) { + event.retry = parseInt(retry, 10); + } + } else if (line.startsWith(':')) { + // ignore the line } }); events.push(event); diff --git a/test/index.test.js b/test/index.test.js index 6d9152a..2788fe9 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -55,6 +55,42 @@ const server = http.createServer((req, res) => { res.write(`data: ${JSON.stringify({count: count})}\nevent: flow\nid: sse-test\nretry: 3\n\n`); count++; }, 100); + } else if (req.url === '/sse_with_no_spaces') { + const headers = { + 'Content-Type': 'text/event-stream', + 'Connection': 'keep-alive', + 'Cache-Control': 'no-cache' + }; + res.writeHead(200, headers); + res.flushHeaders(); + let count = 0; + let timer = setInterval(() => { + if (count >= 5) { + clearInterval(timer); + res.end(); + return; + } + res.write(`data:${JSON.stringify({count: count})}\nevent:flow\nid:sse-test\nretry:3\n\n`); + count++; + }, 100); + } else if (req.url === '/sse_invalid_retry') { + const headers = { + 'Content-Type': 'text/event-stream', + 'Connection': 'keep-alive', + 'Cache-Control': 'no-cache' + }; + res.writeHead(200, headers); + res.flushHeaders(); + let count = 0; + let timer = setInterval(() => { + if (count >= 5) { + clearInterval(timer); + res.end(); + return; + } + res.write(`data:${JSON.stringify({count: count})}\nevent:flow\nid:sse-test\nretry: abc\n\n`); + count++; + }, 100); } else { res.writeHead(200, {'Content-Type': 'text/plain'}); res.end('Hello world!'); @@ -80,6 +116,10 @@ function make (server) { }; } +function newEvent(d) { + return new httpx.Event(d.id, d.event, d.data, d.retry); +} + describe('httpx', () => { before((done) => { server.listen(0, done); @@ -200,34 +240,112 @@ describe('httpx', () => { events.push(event); } + assert.strictEqual(events.length, 5); + + assert.deepStrictEqual([newEvent({ + data: '{"count":0}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), newEvent({ + data: '{"count":1}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), newEvent({ + data: '{"count":2}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), newEvent({ + data: '{"count":3}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), newEvent({ + data: '{"count":4}', + event: 'flow', + id: 'sse-test', + retry: 3, + })], events); + }); + + it('readAsSSE with no spaces should ok', async function () { + this.timeout(15000); + var res = await make(server)('/sse_with_no_spaces', {readTimeout: 5000}); + assert.strictEqual(res.statusCode, 200); + const events = []; + for await (const event of httpx.readAsSSE(res)) { + events.push(event); + } assert.strictEqual(events.length, 5); - assert.deepStrictEqual([new httpx.Event({ + assert.deepStrictEqual([newEvent({ data: '{"count":0}', event: 'flow', id: 'sse-test', retry: 3, - }), new httpx.Event({ + }), newEvent({ data: '{"count":1}', event: 'flow', id: 'sse-test', retry: 3, - }), new httpx.Event({ + }), newEvent({ data: '{"count":2}', event: 'flow', id: 'sse-test', retry: 3, - }), new httpx.Event({ + }), newEvent({ data: '{"count":3}', event: 'flow', id: 'sse-test', retry: 3, - }), new httpx.Event({ + }), newEvent({ data: '{"count":4}', event: 'flow', id: 'sse-test', retry: 3, })], events); }); + + + it('readAsSSE with invalid retry should ok', async function () { + this.timeout(15000); + var res = await make(server)('/sse_invalid_retry', {readTimeout: 5000}); + assert.strictEqual(res.statusCode, 200); + const events = []; + for await (const event of httpx.readAsSSE(res)) { + events.push(event); + } + + assert.strictEqual(events.length, 5); + + assert.deepStrictEqual([newEvent({ + data: '{"count":0}', + event: 'flow', + id: 'sse-test', + retry: undefined, + }), newEvent({ + data: '{"count":1}', + event: 'flow', + id: 'sse-test', + retry: undefined, + }), newEvent({ + data: '{"count":2}', + event: 'flow', + id: 'sse-test', + retry: undefined, + }), newEvent({ + data: '{"count":3}', + event: 'flow', + id: 'sse-test', + retry: undefined, + }), newEvent({ + data: '{"count":4}', + event: 'flow', + id: 'sse-test', + retry: undefined, + })], events); + }); });