Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: fix sse implemention #118

Merged
merged 2 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,6 @@ function readyToRead(readable) {

class Event {
constructor(id, event, data, retry) {
if(id instanceof Object) {
this.id = id.id;
this.event = id.event;
this.data = id.data;
this.retry = id.retry;
return this;
}
this.id = id;
this.event = event;
this.data = data;
Expand All @@ -311,10 +304,20 @@ class Event {

exports.Event = Event;

const DATA_PREFIX = 'data: ';
const EVENT_PREFIX = 'event: ';
const ID_PREFIX = 'id: ';
const RETRY_PREFIX = 'retry: ';
const DATA_PREFIX = 'data:';
const EVENT_PREFIX = 'event:';
const ID_PREFIX = 'id:';
const RETRY_PREFIX = 'retry:';

function isDigitsOnly(str) {
for (let i = 0; i < str.length; i++) {
const c = str.charAt(i);
if (c < '0' || c > '9') {
return false;
}
}
return str.length > 0;
}

function tryGetEvents(head, chunk) {
const all = head + chunk;
Expand All @@ -329,13 +332,18 @@ function tryGetEvents(head, chunk) {
const event = new Event();
lines.forEach((line) => {
if (line.startsWith(DATA_PREFIX)) {
event.data = line.substring(DATA_PREFIX.length);
event.data = line.substring(DATA_PREFIX.length).trim();
} else if (line.startsWith(EVENT_PREFIX)) {
event.event = line.substring(EVENT_PREFIX.length);
event.event = line.substring(EVENT_PREFIX.length).trim();
} else if (line.startsWith(ID_PREFIX)) {
event.id = line.substring(ID_PREFIX.length);
event.id = line.substring(ID_PREFIX.length).trim();
} else if (line.startsWith(RETRY_PREFIX)) {
event.retry = parseInt(line.substring(RETRY_PREFIX.length));
const retry = line.substring(RETRY_PREFIX.length).trim();
if (isDigitsOnly(retry)) {
event.retry = parseInt(retry, 10);
}
} else if (line.startsWith(':')) {
// ignore the line
}
});
events.push(event);
Expand Down
1 change: 1 addition & 0 deletions test/fixtures/test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello world!
151 changes: 145 additions & 6 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const http = require('http');
const zlib = require('zlib');
const fs = require('fs');
const path = require('path');
const assert = require('assert');

const socks = require('socksv5');
Expand All @@ -23,6 +25,15 @@ const server = http.createServer((req, res) => {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('Hello world!');
}, 200);
} else if (req.url === '/stream') {
res.writeHead(200);
const buffers = [];
req.on('data', (chunk) => {
buffers.push(chunk);
});
req.on('end', () => {
res.end(Buffer.concat(buffers).toString());
});
} else if (req.url === '/compression') {
res.writeHead(200, {
'content-encoding': 'gzip'
Expand Down Expand Up @@ -52,7 +63,43 @@ const server = http.createServer((req, res) => {
res.end();
return;
}
res.write(`data: ${JSON.stringify({count: count})}\nevent: flow\nid: sse-test\nretry: 3\n\n`);
res.write(`data: ${JSON.stringify({count: count})}\nevent: flow\nid: sse-test\nretry: 3\n:heartbeat\n\n`);
count++;
}, 100);
} else if (req.url === '/sse_with_no_spaces') {
const headers = {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache'
};
res.writeHead(200, headers);
res.flushHeaders();
let count = 0;
let timer = setInterval(() => {
if (count >= 5) {
clearInterval(timer);
res.end();
return;
}
res.write(`data:${JSON.stringify({count: count})}\nevent:flow\nid:sse-test\nretry:3\n\n`);
count++;
}, 100);
} else if (req.url === '/sse_invalid_retry') {
const headers = {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache'
};
res.writeHead(200, headers);
res.flushHeaders();
let count = 0;
let timer = setInterval(() => {
if (count >= 5) {
clearInterval(timer);
res.end();
return;
}
res.write(`data:${JSON.stringify({count: count})}\nevent:flow\nid:sse-test\nretry: abc\n\n`);
count++;
}, 100);
} else {
Expand Down Expand Up @@ -80,6 +127,10 @@ function make (server) {
};
}

function newEvent(d) {
return new httpx.Event(d.id, d.event, d.data, d.retry);
}

describe('httpx', () => {
before((done) => {
server.listen(0, done);
Expand All @@ -105,6 +156,16 @@ describe('httpx', () => {
assert.deepStrictEqual(result, Buffer.from('Hello world!'));
});

it('should ok with stream', async function () {
var res = await make(server)('/stream', {
method: 'POST',
data: fs.createReadStream(path.join(__dirname, './fixtures/test.txt'))
});
assert.strictEqual(res.statusCode, 200);
var result = await httpx.read(res);
assert.deepStrictEqual(result, Buffer.from('Hello world!'));
});

it('compression should ok', async function () {
var res = await make(server)('/compression');
assert.strictEqual(res.statusCode, 200);
Expand Down Expand Up @@ -200,34 +261,112 @@ describe('httpx', () => {
events.push(event);
}

assert.strictEqual(events.length, 5);

assert.deepStrictEqual([newEvent({
data: '{"count":0}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), newEvent({
data: '{"count":1}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), newEvent({
data: '{"count":2}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), newEvent({
data: '{"count":3}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), newEvent({
data: '{"count":4}',
event: 'flow',
id: 'sse-test',
retry: 3,
})], events);
});

it('readAsSSE with no spaces should ok', async function () {
this.timeout(15000);
var res = await make(server)('/sse_with_no_spaces', {readTimeout: 5000});
assert.strictEqual(res.statusCode, 200);
const events = [];
for await (const event of httpx.readAsSSE(res)) {
events.push(event);
}

assert.strictEqual(events.length, 5);

assert.deepStrictEqual([new httpx.Event({
assert.deepStrictEqual([newEvent({
data: '{"count":0}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new httpx.Event({
}), newEvent({
data: '{"count":1}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new httpx.Event({
}), newEvent({
data: '{"count":2}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new httpx.Event({
}), newEvent({
data: '{"count":3}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new httpx.Event({
}), newEvent({
data: '{"count":4}',
event: 'flow',
id: 'sse-test',
retry: 3,
})], events);
});


it('readAsSSE with invalid retry should ok', async function () {
this.timeout(15000);
var res = await make(server)('/sse_invalid_retry', {readTimeout: 5000});
assert.strictEqual(res.statusCode, 200);
const events = [];
for await (const event of httpx.readAsSSE(res)) {
events.push(event);
}

assert.strictEqual(events.length, 5);

assert.deepStrictEqual([newEvent({
data: '{"count":0}',
event: 'flow',
id: 'sse-test',
retry: undefined,
}), newEvent({
data: '{"count":1}',
event: 'flow',
id: 'sse-test',
retry: undefined,
}), newEvent({
data: '{"count":2}',
event: 'flow',
id: 'sse-test',
retry: undefined,
}), newEvent({
data: '{"count":3}',
event: 'flow',
id: 'sse-test',
retry: undefined,
}), newEvent({
data: '{"count":4}',
event: 'flow',
id: 'sse-test',
retry: undefined,
})], events);
});
});
Loading