diff --git a/lib/index.js b/lib/index.js index a545ab8..8fa74f1 100644 --- a/lib/index.js +++ b/lib/index.js @@ -295,13 +295,6 @@ function readyToRead(readable) { class Event { constructor(id, event, data, retry) { - if(id instanceof Object) { - this.id = id.id; - this.event = id.event; - this.data = id.data; - this.retry = id.retry; - return this; - } this.id = id; this.event = event; this.data = data; @@ -311,10 +304,20 @@ class Event { exports.Event = Event; -const DATA_PREFIX = 'data: '; -const EVENT_PREFIX = 'event: '; -const ID_PREFIX = 'id: '; -const RETRY_PREFIX = 'retry: '; +const DATA_PREFIX = 'data:'; +const EVENT_PREFIX = 'event:'; +const ID_PREFIX = 'id:'; +const RETRY_PREFIX = 'retry:'; + +function isDigitsOnly(str) { + for (let i = 0; i < str.length; i++) { + const c = str.charAt(i); + if (c < '0' || c > '9') { + return false; + } + } + return str.length > 0; +} function tryGetEvents(head, chunk) { const all = head + chunk; @@ -329,13 +332,18 @@ function tryGetEvents(head, chunk) { const event = new Event(); lines.forEach((line) => { if (line.startsWith(DATA_PREFIX)) { - event.data = line.substring(DATA_PREFIX.length); + event.data = line.substring(DATA_PREFIX.length).trim(); } else if (line.startsWith(EVENT_PREFIX)) { - event.event = line.substring(EVENT_PREFIX.length); + event.event = line.substring(EVENT_PREFIX.length).trim(); } else if (line.startsWith(ID_PREFIX)) { - event.id = line.substring(ID_PREFIX.length); + event.id = line.substring(ID_PREFIX.length).trim(); } else if (line.startsWith(RETRY_PREFIX)) { - event.retry = parseInt(line.substring(RETRY_PREFIX.length)); + const retry = line.substring(RETRY_PREFIX.length).trim(); + if (isDigitsOnly(retry)) { + event.retry = parseInt(retry, 10); + } + } else if (line.startsWith(':')) { + // ignore the line } }); events.push(event); diff --git a/test/fixtures/test.txt b/test/fixtures/test.txt new file mode 100644 index 0000000..6769dd6 --- /dev/null +++ b/test/fixtures/test.txt @@ -0,0 +1 @@ +Hello world! \ No newline at end of file diff --git a/test/index.test.js b/test/index.test.js index 6d9152a..ee33bce 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -2,6 +2,8 @@ const http = require('http'); const zlib = require('zlib'); +const fs = require('fs'); +const path = require('path'); const assert = require('assert'); const socks = require('socksv5'); @@ -23,6 +25,15 @@ const server = http.createServer((req, res) => { res.writeHead(200, {'Content-Type': 'text/plain'}); res.end('Hello world!'); }, 200); + } else if (req.url === '/stream') { + res.writeHead(200); + const buffers = []; + req.on('data', (chunk) => { + buffers.push(chunk); + }); + req.on('end', () => { + res.end(Buffer.concat(buffers).toString()); + }); } else if (req.url === '/compression') { res.writeHead(200, { 'content-encoding': 'gzip' @@ -52,7 +63,43 @@ const server = http.createServer((req, res) => { res.end(); return; } - res.write(`data: ${JSON.stringify({count: count})}\nevent: flow\nid: sse-test\nretry: 3\n\n`); + res.write(`data: ${JSON.stringify({count: count})}\nevent: flow\nid: sse-test\nretry: 3\n:heartbeat\n\n`); + count++; + }, 100); + } else if (req.url === '/sse_with_no_spaces') { + const headers = { + 'Content-Type': 'text/event-stream', + 'Connection': 'keep-alive', + 'Cache-Control': 'no-cache' + }; + res.writeHead(200, headers); + res.flushHeaders(); + let count = 0; + let timer = setInterval(() => { + if (count >= 5) { + clearInterval(timer); + res.end(); + return; + } + res.write(`data:${JSON.stringify({count: count})}\nevent:flow\nid:sse-test\nretry:3\n\n`); + count++; + }, 100); + } else if (req.url === '/sse_invalid_retry') { + const headers = { + 'Content-Type': 'text/event-stream', + 'Connection': 'keep-alive', + 'Cache-Control': 'no-cache' + }; + res.writeHead(200, headers); + res.flushHeaders(); + let count = 0; + let timer = setInterval(() => { + if (count >= 5) { + clearInterval(timer); + res.end(); + return; + } + res.write(`data:${JSON.stringify({count: count})}\nevent:flow\nid:sse-test\nretry: abc\n\n`); count++; }, 100); } else { @@ -80,6 +127,10 @@ function make (server) { }; } +function newEvent(d) { + return new httpx.Event(d.id, d.event, d.data, d.retry); +} + describe('httpx', () => { before((done) => { server.listen(0, done); @@ -105,6 +156,16 @@ describe('httpx', () => { assert.deepStrictEqual(result, Buffer.from('Hello world!')); }); + it('should ok with stream', async function () { + var res = await make(server)('/stream', { + method: 'POST', + data: fs.createReadStream(path.join(__dirname, './fixtures/test.txt')) + }); + assert.strictEqual(res.statusCode, 200); + var result = await httpx.read(res); + assert.deepStrictEqual(result, Buffer.from('Hello world!')); + }); + it('compression should ok', async function () { var res = await make(server)('/compression'); assert.strictEqual(res.statusCode, 200); @@ -200,34 +261,112 @@ describe('httpx', () => { events.push(event); } + assert.strictEqual(events.length, 5); + + assert.deepStrictEqual([newEvent({ + data: '{"count":0}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), newEvent({ + data: '{"count":1}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), newEvent({ + data: '{"count":2}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), newEvent({ + data: '{"count":3}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), newEvent({ + data: '{"count":4}', + event: 'flow', + id: 'sse-test', + retry: 3, + })], events); + }); + + it('readAsSSE with no spaces should ok', async function () { + this.timeout(15000); + var res = await make(server)('/sse_with_no_spaces', {readTimeout: 5000}); + assert.strictEqual(res.statusCode, 200); + const events = []; + for await (const event of httpx.readAsSSE(res)) { + events.push(event); + } assert.strictEqual(events.length, 5); - assert.deepStrictEqual([new httpx.Event({ + assert.deepStrictEqual([newEvent({ data: '{"count":0}', event: 'flow', id: 'sse-test', retry: 3, - }), new httpx.Event({ + }), newEvent({ data: '{"count":1}', event: 'flow', id: 'sse-test', retry: 3, - }), new httpx.Event({ + }), newEvent({ data: '{"count":2}', event: 'flow', id: 'sse-test', retry: 3, - }), new httpx.Event({ + }), newEvent({ data: '{"count":3}', event: 'flow', id: 'sse-test', retry: 3, - }), new httpx.Event({ + }), newEvent({ data: '{"count":4}', event: 'flow', id: 'sse-test', retry: 3, })], events); }); + + + it('readAsSSE with invalid retry should ok', async function () { + this.timeout(15000); + var res = await make(server)('/sse_invalid_retry', {readTimeout: 5000}); + assert.strictEqual(res.statusCode, 200); + const events = []; + for await (const event of httpx.readAsSSE(res)) { + events.push(event); + } + + assert.strictEqual(events.length, 5); + + assert.deepStrictEqual([newEvent({ + data: '{"count":0}', + event: 'flow', + id: 'sse-test', + retry: undefined, + }), newEvent({ + data: '{"count":1}', + event: 'flow', + id: 'sse-test', + retry: undefined, + }), newEvent({ + data: '{"count":2}', + event: 'flow', + id: 'sse-test', + retry: undefined, + }), newEvent({ + data: '{"count":3}', + event: 'flow', + id: 'sse-test', + retry: undefined, + }), newEvent({ + data: '{"count":4}', + event: 'flow', + id: 'sse-test', + retry: undefined, + })], events); + }); });