Skip to content

Commit

Permalink
lib: fix sse implemention
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonTian committed Feb 22, 2024
1 parent 4fbce33 commit e865f82
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 20 deletions.
38 changes: 23 additions & 15 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,6 @@ function readyToRead(readable) {

class Event {
constructor(id, event, data, retry) {
if(id instanceof Object) {
this.id = id.id;
this.event = id.event;
this.data = id.data;
this.retry = id.retry;
return this;
}
this.id = id;
this.event = event;
this.data = data;
Expand All @@ -311,10 +304,20 @@ class Event {

exports.Event = Event;

const DATA_PREFIX = 'data: ';
const EVENT_PREFIX = 'event: ';
const ID_PREFIX = 'id: ';
const RETRY_PREFIX = 'retry: ';
const DATA_PREFIX = 'data:';
const EVENT_PREFIX = 'event:';
const ID_PREFIX = 'id:';
const RETRY_PREFIX = 'retry:';

function isDigitsOnly(str) {
for (let i = 0; i < str.length; i++) {
const c = str.charAt(i);
if (c < '0' || c > '9') {
return false;
}
}
return str.length > 0;
}

function tryGetEvents(head, chunk) {
const all = head + chunk;
Expand All @@ -329,13 +332,18 @@ function tryGetEvents(head, chunk) {
const event = new Event();
lines.forEach((line) => {
if (line.startsWith(DATA_PREFIX)) {
event.data = line.substring(DATA_PREFIX.length);
event.data = line.substring(DATA_PREFIX.length).trim();
} else if (line.startsWith(EVENT_PREFIX)) {
event.event = line.substring(EVENT_PREFIX.length);
event.event = line.substring(EVENT_PREFIX.length).trim();
} else if (line.startsWith(ID_PREFIX)) {
event.id = line.substring(ID_PREFIX.length);
event.id = line.substring(ID_PREFIX.length).trim();
} else if (line.startsWith(RETRY_PREFIX)) {
event.retry = parseInt(line.substring(RETRY_PREFIX.length));
const retry = line.substring(RETRY_PREFIX.length).trim();
if (isDigitsOnly(retry)) {
event.retry = parseInt(retry, 10);
}
} else if (line.startsWith(':')) {
// ignore the line
}
});
events.push(event);
Expand Down
128 changes: 123 additions & 5 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,42 @@ const server = http.createServer((req, res) => {
res.write(`data: ${JSON.stringify({count: count})}\nevent: flow\nid: sse-test\nretry: 3\n\n`);
count++;
}, 100);
} else if (req.url === '/sse_with_no_spaces') {
const headers = {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache'
};
res.writeHead(200, headers);
res.flushHeaders();
let count = 0;
let timer = setInterval(() => {
if (count >= 5) {
clearInterval(timer);
res.end();
return;
}
res.write(`data:${JSON.stringify({count: count})}\nevent:flow\nid:sse-test\nretry:3\n\n`);
count++;
}, 100);
} else if (req.url === '/sse_invalid_retry') {
const headers = {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache'
};
res.writeHead(200, headers);
res.flushHeaders();
let count = 0;
let timer = setInterval(() => {
if (count >= 5) {
clearInterval(timer);
res.end();
return;
}
res.write(`data:${JSON.stringify({count: count})}\nevent:flow\nid:sse-test\nretry: abc\n\n`);
count++;
}, 100);
} else {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('Hello world!');
Expand All @@ -80,6 +116,10 @@ function make (server) {
};
}

function newEvent(d) {
return new httpx.Event(d.id, d.event, d.data, d.retry);
}

describe('httpx', () => {
before((done) => {
server.listen(0, done);
Expand Down Expand Up @@ -200,34 +240,112 @@ describe('httpx', () => {
events.push(event);
}

assert.strictEqual(events.length, 5);

assert.deepStrictEqual([newEvent({
data: '{"count":0}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), newEvent({
data: '{"count":1}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), newEvent({
data: '{"count":2}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), newEvent({
data: '{"count":3}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), newEvent({
data: '{"count":4}',
event: 'flow',
id: 'sse-test',
retry: 3,
})], events);
});

it('readAsSSE with no spaces should ok', async function () {
this.timeout(15000);
var res = await make(server)('/sse_with_no_spaces', {readTimeout: 5000});
assert.strictEqual(res.statusCode, 200);
const events = [];
for await (const event of httpx.readAsSSE(res)) {
events.push(event);
}

assert.strictEqual(events.length, 5);

assert.deepStrictEqual([new httpx.Event({
assert.deepStrictEqual([newEvent({
data: '{"count":0}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new httpx.Event({
}), newEvent({
data: '{"count":1}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new httpx.Event({
}), newEvent({
data: '{"count":2}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new httpx.Event({
}), newEvent({
data: '{"count":3}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new httpx.Event({
}), newEvent({
data: '{"count":4}',
event: 'flow',
id: 'sse-test',
retry: 3,
})], events);
});


it('readAsSSE with invalid retry should ok', async function () {
this.timeout(15000);
var res = await make(server)('/sse_invalid_retry', {readTimeout: 5000});
assert.strictEqual(res.statusCode, 200);
const events = [];
for await (const event of httpx.readAsSSE(res)) {
events.push(event);
}

assert.strictEqual(events.length, 5);

assert.deepStrictEqual([newEvent({
data: '{"count":0}',
event: 'flow',
id: 'sse-test',
retry: undefined,
}), newEvent({
data: '{"count":1}',
event: 'flow',
id: 'sse-test',
retry: undefined,
}), newEvent({
data: '{"count":2}',
event: 'flow',
id: 'sse-test',
retry: undefined,
}), newEvent({
data: '{"count":3}',
event: 'flow',
id: 'sse-test',
retry: undefined,
}), newEvent({
data: '{"count":4}',
event: 'flow',
id: 'sse-test',
retry: undefined,
})], events);
});
});

0 comments on commit e865f82

Please sign in to comment.