Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: migrate wasi-sockets tcp to worker thread #316

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions packages/preview2-shim/lib/io/calls.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,20 @@ export const CLOCKS_NOW = ++call_id << 24;
export const CLOCKS_DURATION_SUBSCRIBE = ++call_id << 24;
export const CLOCKS_INSTANT_SUBSCRIBE = ++call_id << 24;

// Sockets
// Sockets (TCP)
export const SOCKET_TCP_CREATE_HANDLE = ++call_id << 24;
export const SOCKET_TCP_BIND = ++call_id << 24;
export const SOCKET_TCP_CONNECT = ++call_id << 24;
export const SOCKET_TCP_LISTEN = ++call_id << 24;
export const SOCKET_TCP_ACCEPT = ++call_id << 24;
export const SOCKET_TCP_GET_LOCAL_ADDRESS = ++call_id << 24;
export const SOCKET_TCP_GET_REMOTE_ADDRESS = ++call_id << 24;
export const SOCKET_TCP_SHUTDOWN = ++call_id << 24;
export const SOCKET_TCP_SET_KEEP_ALIVE = ++call_id << 24;
export const SOCKET_TCP_DISPOSE = ++call_id << 24;
export const SOCKET_TCP_CREATE_INPUT_STREAM = ++call_id << 24;
export const SOCKET_TCP_CREATE_OUTPUT_STREAM = ++call_id << 24;
// Sockets (UDP)
export const SOCKET_UDP_CREATE_HANDLE = ++call_id << 24;
export const SOCKET_UDP_BIND = ++call_id << 24;
export const SOCKET_UDP_CONNECT = ++call_id << 24;
Expand Down Expand Up @@ -142,7 +155,20 @@ export const callMap = {
[CLOCKS_DURATION_SUBSCRIBE]: "CLOCKS_DURATION_SUBSCRIBE",
[CLOCKS_INSTANT_SUBSCRIBE]: "CLOCKS_INSTANT_SUBSCRIBE",

// Sockets
// Sockets TCP
[SOCKET_TCP_CREATE_HANDLE]: "SOCKET_TCP_CREATE_HANDLE",
[SOCKET_TCP_BIND]: "SOCKET_TCP_BIND",
[SOCKET_TCP_CONNECT]: "SOCKET_TCP_CONNECT",
[SOCKET_TCP_LISTEN]: "SOCKET_TCP_LISTEN",
[SOCKET_TCP_ACCEPT]: "SOCKET_TCP_ACCEPT",
[SOCKET_TCP_GET_LOCAL_ADDRESS]: "SOCKET_TCP_GET_LOCAL_ADDRESS",
[SOCKET_TCP_GET_REMOTE_ADDRESS]: "SOCKET_TCP_GET_REMOTE_ADDRESS",
[SOCKET_TCP_SHUTDOWN]: "SOCKET_TCP_SHUTDOWN",
[SOCKET_TCP_SET_KEEP_ALIVE]: "SOCKET_TCP_SET_KEEP_ALIVE",
[SOCKET_TCP_DISPOSE]: "SOCKET_TCP_DISPOSE",
[SOCKET_TCP_CREATE_INPUT_STREAM]: "SOCKET_TCP_CREATE_INPUT_STREAM",
[SOCKET_TCP_CREATE_OUTPUT_STREAM]: "SOCKET_TCP_CREATE_OUTPUT_STREAM",
// Sockets UDP
[SOCKET_UDP_CREATE_HANDLE]: "SOCKET_UDP_CREATE_HANDLE",
[SOCKET_UDP_BIND]: "SOCKET_UDP_BIND",
[SOCKET_UDP_CONNECT]: "SOCKET_UDP_CONNECT",
Expand All @@ -158,6 +184,7 @@ export const callMap = {
[SOCKET_UDP_GET_SEND_BUFFER_SIZE]: "SOCKET_UDP_GET_SEND_BUFFER_SIZE",
[SOCKET_UDP_SET_SEND_BUFFER_SIZE]: "SOCKET_UDP_SET_SEND_BUFFER_SIZE",
[SOCKET_UDP_SET_UNICAST_HOP_LIMIT]: "SOCKET_UDP_SET_UNICAST_HOP_LIMIT",
// Socket DNS
[SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST]: "SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST",
[SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST]: "SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST",
[SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST]: "SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST",
Expand Down
143 changes: 143 additions & 0 deletions packages/preview2-shim/lib/io/worker-socket-tcp.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// See: https://github.com/nodejs/node/blob/main/src/tcp_wrap.cc
const {
TCP,
constants: TCPConstants,
TCPConnectWrap,
} = process.binding("tcp_wrap");

/** @type {Map<number, NodeJS.Socket>} */
export const openedSockets = new Map();

let socketCnt = 0;

export function getSocketOrThrow(socketId) {
const socket = openedSockets.get(socketId);
if (!socket) throw "invalid-state";
return socket;
}

//-----------------------------------------------------

/**
* @param {IpAddressFamily} addressFamily
* @returns {NodeJS.Socket}
*/
export function createTcpSocket() {
const socket = new TCP(TCPConstants.SOCKET | TCPConstants.SERVER);
openedSockets.set(++socketCnt, socket);
return Promise.resolve(socketCnt);
}

export function socketTcpBind(id, payload) {
const { localAddress, localPort, family } = payload;
process._rawDebug("socketTcpBind", {
localAddress, localPort, family
});

const socket = getSocketOrThrow(id);

let bind = "bind"; // ipv4
if (family.toLocaleLowerCase() === "ipv6") {
bind = "bind6";
}

return socket[bind](localAddress, localPort);
}

export function socketTcpConnect(id, payload) {
const socket = getSocketOrThrow(id);
const { remoteAddress, remotePort, localAddress, localPort, family } =
payload;

process._rawDebug("socketTcpConnect", {
remoteAddress,
remotePort,
localAddress,
localPort,
family
});

return new Promise((resolve) => {
const _onClientConnectComplete = (err) => {
if (err) resolve(err);
resolve(0);
};
const connectReq = new TCPConnectWrap();
connectReq.oncomplete = _onClientConnectComplete;
connectReq.address = remoteAddress;
connectReq.port = remotePort;
connectReq.localAddress = localAddress;
connectReq.localPort = localPort;
let connect = "connect"; // ipv4
if (family.toLocaleLowerCase() === "ipv6") {
connect = "connect6";
}

socket.onread = (_buffer) => {
// TODO: handle data received from the server
};
socket.readStart();

const err = socket[connect](connectReq, remoteAddress, remotePort);
resolve(err);
});
}

export function socketTcpListen(id, payload) {
const socket = getSocketOrThrow(id);
const { backlogSize } = payload;
process._rawDebug("socketTcpListen", {
backlogSize
});
return socket.listen(backlogSize);
}

export function socketTcpAccept(id, payload) {}

export function socketTcpGetLocalAddress(id) {
const socket = getSocketOrThrow(id);
const out = {};
socket.getsockname(out);
return out;
}

export function socketTcpGetRemoteAddress(id) {
const socket = getSocketOrThrow(id);
const out = {};
socket.getpeername(out);
return out;
}

export function socketTcpShutdown(id, payload) {
const socket = getSocketOrThrow(id);
const { shutdownType } = payload;

return new Promise((resolve) => {
const req = new ShutdownWrap();
req.oncomplete = () => {
process._rawDebug("shutdown complete");
resolve(0);
};
req.handle = socket;
req.callback = () => {
process._rawDebug("shutdown callback");
resolve(0);
};
const err = socket.shutdown(req);
resolve(err);
});
}

export function socketTcpSetKeepAlive(id, payload) {
const socket = getSocketOrThrow(id);
const { enable } = payload;

return socket.setKeepAlive(enable);
}

export function socketTcpDispose(id) {
const socket = getSocketOrThrow(id);
socket.close();
openedSockets.delete(id);
return 0;
}
2 changes: 2 additions & 0 deletions packages/preview2-shim/lib/io/worker-socket-udp.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ export function enqueueReceivedSocketDatagram(socketInfo, { data, rinfo }) {
queue.push(chunk);
}

//-----------------------------------------------------

/**
* @param {IpAddressFamily} addressFamily
* @returns {NodeJS.Socket}
Expand Down
99 changes: 81 additions & 18 deletions packages/preview2-shim/lib/io/worker-thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,42 @@ import {
STDOUT,
HTTP_SERVER_SET_OUTGOING_RESPONSE,
HTTP_SERVER_CLEAR_OUTGOING_RESPONSE,
SOCKET_TCP_CREATE_HANDLE,
SOCKET_TCP_BIND,
SOCKET_TCP_CONNECT,
SOCKET_TCP_LISTEN,
SOCKET_TCP_GET_LOCAL_ADDRESS,
SOCKET_TCP_GET_REMOTE_ADDRESS,
SOCKET_TCP_DISPOSE,
SOCKET_TCP_ACCEPT,
SOCKET_TCP_SHUTDOWN,
SOCKET_TCP_SET_KEEP_ALIVE,
SOCKET_TCP_CREATE_OUTPUT_STREAM,
SOCKET_TCP_CREATE_INPUT_STREAM,
} from "./calls.js";
import { SocketUdpReceive, createUdpSocket, getSocketOrThrow, socketUdpBind, socketUdpCheckSend, socketUdpConnect, socketUdpDisconnect, socketUdpDispose, socketUdpSend } from "./worker-socket-udp.js";
import {
SocketUdpReceive,
createUdpSocket,
getSocketOrThrow,
socketUdpBind,
socketUdpCheckSend,
socketUdpConnect,
socketUdpDisconnect,
socketUdpDispose,
socketUdpSend,
} from "./worker-socket-udp.js";
import {
createTcpSocket,
socketTcpAccept,
socketTcpBind,
socketTcpConnect,
socketTcpDispose,
socketTcpGetLocalAddress,
socketTcpGetRemoteAddress,
socketTcpListen,
socketTcpSetKeepAlive,
socketTcpShutdown,
} from "./worker-socket-tcp.js";

let streamCnt = 0,
pollCnt = 0;
Expand Down Expand Up @@ -251,29 +285,67 @@ function handle(call, id, payload) {
case HTTP_SERVER_CLEAR_OUTGOING_RESPONSE:
return clearOutgoingResponse(id);

// Sockets
// Sockets TCP
case SOCKET_TCP_CREATE_HANDLE:
return createFuture(createTcpSocket());

case SOCKET_TCP_BIND:
return socketTcpBind(id, payload);

case SOCKET_TCP_CONNECT:
return socketTcpConnect(id, payload);

case SOCKET_TCP_LISTEN:
return socketTcpListen(id, payload);

case SOCKET_TCP_ACCEPT:
return socketTcpAccept(id, payload);

case SOCKET_TCP_GET_LOCAL_ADDRESS:
return socketTcpGetLocalAddress(id);

case SOCKET_TCP_GET_REMOTE_ADDRESS:
return socketTcpGetRemoteAddress(id);

case SOCKET_TCP_SHUTDOWN:
return socketTcpShutdown(id, payload);

case SOCKET_TCP_SET_KEEP_ALIVE:
return socketTcpSetKeepAlive(id, payload);

case SOCKET_TCP_DISPOSE:
return socketTcpDispose(id);

case SOCKET_TCP_CREATE_INPUT_STREAM:
// TODO: implement a proper input stream
return createStream(new PassThrough());

case SOCKET_TCP_CREATE_OUTPUT_STREAM:
// TODO: implement a proper output stream
return createStream(new PassThrough());

// Sockets UDP
case SOCKET_UDP_CREATE_HANDLE: {
const { addressFamily, reuseAddr } = payload;
return createFuture(createUdpSocket(addressFamily, reuseAddr));
}

case SOCKET_UDP_BIND:
case SOCKET_UDP_BIND:
return socketUdpBind(id, payload);

case SOCKET_UDP_CHECK_SEND:
case SOCKET_UDP_CHECK_SEND:
return socketUdpCheckSend(id);

case SOCKET_UDP_SEND:
case SOCKET_UDP_SEND:
return socketUdpSend(id, payload);

case SOCKET_UDP_RECEIVE:
case SOCKET_UDP_RECEIVE:
return SocketUdpReceive(id, payload);

case SOCKET_UDP_CONNECT:
case SOCKET_UDP_CONNECT:
return socketUdpConnect(id, payload);

case SOCKET_UDP_DISCONNECT:
case SOCKET_UDP_DISCONNECT:
return socketUdpDisconnect(id);

case SOCKET_UDP_GET_LOCAL_ADDRESS: {
Expand Down Expand Up @@ -350,16 +422,7 @@ function handle(call, id, payload) {
}
}

case OUTPUT_STREAM_CREATE | SOCKET: {
// TODO: implement
break;
}
case INPUT_STREAM_CREATE | SOCKET: {
// TODO: implement
break;
}

case SOCKET_UDP_DISPOSE:
case SOCKET_UDP_DISPOSE:
return socketUdpDispose(id);

// Stdio
Expand Down
Loading
Loading