Skip to content

Commit

Permalink
chore: migrate wasi-sockets tcp to worker thread
Browse files Browse the repository at this point in the history
  • Loading branch information
manekinekko authored and guybedford committed Dec 15, 2023
1 parent 056e065 commit 823fa9b
Show file tree
Hide file tree
Showing 7 changed files with 521 additions and 235 deletions.
31 changes: 29 additions & 2 deletions packages/preview2-shim/lib/io/calls.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,20 @@ export const CLOCKS_NOW = ++call_id << 24;
export const CLOCKS_DURATION_SUBSCRIBE = ++call_id << 24;
export const CLOCKS_INSTANT_SUBSCRIBE = ++call_id << 24;

// Sockets
// Sockets (TCP)
export const SOCKET_TCP_CREATE_HANDLE = ++call_id << 24;
export const SOCKET_TCP_BIND = ++call_id << 24;
export const SOCKET_TCP_CONNECT = ++call_id << 24;
export const SOCKET_TCP_LISTEN = ++call_id << 24;
export const SOCKET_TCP_ACCEPT = ++call_id << 24;
export const SOCKET_TCP_GET_LOCAL_ADDRESS = ++call_id << 24;
export const SOCKET_TCP_GET_REMOTE_ADDRESS = ++call_id << 24;
export const SOCKET_TCP_SHUTDOWN = ++call_id << 24;
export const SOCKET_TCP_SET_KEEP_ALIVE = ++call_id << 24;
export const SOCKET_TCP_DISPOSE = ++call_id << 24;
export const SOCKET_TCP_CREATE_INPUT_STREAM = ++call_id << 24;
export const SOCKET_TCP_CREATE_OUTPUT_STREAM = ++call_id << 24;
// Sockets (UDP)
export const SOCKET_UDP_CREATE_HANDLE = ++call_id << 24;
export const SOCKET_UDP_BIND = ++call_id << 24;
export const SOCKET_UDP_CONNECT = ++call_id << 24;
Expand Down Expand Up @@ -142,7 +155,20 @@ export const callMap = {
[CLOCKS_DURATION_SUBSCRIBE]: "CLOCKS_DURATION_SUBSCRIBE",
[CLOCKS_INSTANT_SUBSCRIBE]: "CLOCKS_INSTANT_SUBSCRIBE",

// Sockets
// Sockets TCP
[SOCKET_TCP_CREATE_HANDLE]: "SOCKET_TCP_CREATE_HANDLE",
[SOCKET_TCP_BIND]: "SOCKET_TCP_BIND",
[SOCKET_TCP_CONNECT]: "SOCKET_TCP_CONNECT",
[SOCKET_TCP_LISTEN]: "SOCKET_TCP_LISTEN",
[SOCKET_TCP_ACCEPT]: "SOCKET_TCP_ACCEPT",
[SOCKET_TCP_GET_LOCAL_ADDRESS]: "SOCKET_TCP_GET_LOCAL_ADDRESS",
[SOCKET_TCP_GET_REMOTE_ADDRESS]: "SOCKET_TCP_GET_REMOTE_ADDRESS",
[SOCKET_TCP_SHUTDOWN]: "SOCKET_TCP_SHUTDOWN",
[SOCKET_TCP_SET_KEEP_ALIVE]: "SOCKET_TCP_SET_KEEP_ALIVE",
[SOCKET_TCP_DISPOSE]: "SOCKET_TCP_DISPOSE",
[SOCKET_TCP_CREATE_INPUT_STREAM]: "SOCKET_TCP_CREATE_INPUT_STREAM",
[SOCKET_TCP_CREATE_OUTPUT_STREAM]: "SOCKET_TCP_CREATE_OUTPUT_STREAM",
// Sockets UDP
[SOCKET_UDP_CREATE_HANDLE]: "SOCKET_UDP_CREATE_HANDLE",
[SOCKET_UDP_BIND]: "SOCKET_UDP_BIND",
[SOCKET_UDP_CONNECT]: "SOCKET_UDP_CONNECT",
Expand All @@ -158,6 +184,7 @@ export const callMap = {
[SOCKET_UDP_GET_SEND_BUFFER_SIZE]: "SOCKET_UDP_GET_SEND_BUFFER_SIZE",
[SOCKET_UDP_SET_SEND_BUFFER_SIZE]: "SOCKET_UDP_SET_SEND_BUFFER_SIZE",
[SOCKET_UDP_SET_UNICAST_HOP_LIMIT]: "SOCKET_UDP_SET_UNICAST_HOP_LIMIT",
// Socket DNS
[SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST]: "SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST",
[SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST]: "SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST",
[SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST]: "SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST",
Expand Down
144 changes: 144 additions & 0 deletions packages/preview2-shim/lib/io/worker-socket-tcp.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// See: https://github.com/nodejs/node/blob/main/src/tcp_wrap.cc
const {
TCP,
constants: TCPConstants,
TCPConnectWrap,
} = process.binding("tcp_wrap");
const { ShutdownWrap } = process.binding('stream_wrap');

/** @type {Map<number, NodeJS.Socket>} */
export const openedSockets = new Map();

let socketCnt = 0;

export function getSocketOrThrow(socketId) {
const socket = openedSockets.get(socketId);
if (!socket) throw "invalid-state";
return socket;
}

//-----------------------------------------------------

/**
* @param {IpAddressFamily} addressFamily
* @returns {NodeJS.Socket}
*/
export function createTcpSocket() {
const socket = new TCP(TCPConstants.SOCKET | TCPConstants.SERVER);
openedSockets.set(++socketCnt, socket);
return Promise.resolve(socketCnt);
}

export function socketTcpBind(id, payload) {
const { localAddress, localPort, family } = payload;
process._rawDebug("socketTcpBind", {
localAddress, localPort, family
});

const socket = getSocketOrThrow(id);

let bind = "bind"; // ipv4
if (family.toLocaleLowerCase() === "ipv6") {
bind = "bind6";
}

return socket[bind](localAddress, localPort);
}

export function socketTcpConnect(id, payload) {
const socket = getSocketOrThrow(id);
const { remoteAddress, remotePort, localAddress, localPort, family } =
payload;

process._rawDebug("socketTcpConnect", {
remoteAddress,
remotePort,
localAddress,
localPort,
family
});

return new Promise((resolve) => {
const _onClientConnectComplete = (err) => {
if (err) resolve(err);
resolve(0);
};
const connectReq = new TCPConnectWrap();
connectReq.oncomplete = _onClientConnectComplete;
connectReq.address = remoteAddress;
connectReq.port = remotePort;
connectReq.localAddress = localAddress;
connectReq.localPort = localPort;
let connect = "connect"; // ipv4
if (family.toLocaleLowerCase() === "ipv6") {
connect = "connect6";
}

socket.onread = (_buffer) => {
// TODO: handle data received from the server
};
socket.readStart();

const err = socket[connect](connectReq, remoteAddress, remotePort);
resolve(err);
});
}

export function socketTcpListen(id, payload) {
const socket = getSocketOrThrow(id);
const { backlogSize } = payload;
process._rawDebug("socketTcpListen", {
backlogSize
});
return socket.listen(backlogSize);
}

export function socketTcpAccept(_id, _payload) {}

export function socketTcpGetLocalAddress(id) {
const socket = getSocketOrThrow(id);
const out = {};
socket.getsockname(out);
return out;
}

export function socketTcpGetRemoteAddress(id) {
const socket = getSocketOrThrow(id);
const out = {};
socket.getpeername(out);
return out;
}

export function socketTcpShutdown(id, _payload) {
const socket = getSocketOrThrow(id);
// const { shutdownType } = payload;

return new Promise((resolve) => {
const req = new ShutdownWrap();
req.oncomplete = () => {
process._rawDebug("shutdown complete");
resolve(0);
};
req.handle = socket;
req.callback = () => {
process._rawDebug("shutdown callback");
resolve(0);
};
const err = socket.shutdown(req);
resolve(err);
});
}

export function socketTcpSetKeepAlive(id, payload) {
const socket = getSocketOrThrow(id);
const { enable } = payload;

return socket.setKeepAlive(enable);
}

export function socketTcpDispose(id) {
const socket = getSocketOrThrow(id);
socket.close();
openedSockets.delete(id);
return 0;
}
2 changes: 2 additions & 0 deletions packages/preview2-shim/lib/io/worker-socket-udp.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ export function enqueueReceivedSocketDatagram(socketInfo, { data, rinfo }) {
queue.push(chunk);
}

//-----------------------------------------------------

/**
* @param {IpAddressFamily} addressFamily
* @returns {NodeJS.Socket}
Expand Down
100 changes: 81 additions & 19 deletions packages/preview2-shim/lib/io/worker-thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import {
POLL_POLLABLE_BLOCK,
POLL_POLLABLE_READY,
POLL_POLL_LIST,
SOCKET,
SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST,
SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST,
SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST,
Expand All @@ -72,8 +71,42 @@ import {
STDOUT,
HTTP_SERVER_SET_OUTGOING_RESPONSE,
HTTP_SERVER_CLEAR_OUTGOING_RESPONSE,
SOCKET_TCP_CREATE_HANDLE,
SOCKET_TCP_BIND,
SOCKET_TCP_CONNECT,
SOCKET_TCP_LISTEN,
SOCKET_TCP_GET_LOCAL_ADDRESS,
SOCKET_TCP_GET_REMOTE_ADDRESS,
SOCKET_TCP_DISPOSE,
SOCKET_TCP_ACCEPT,
SOCKET_TCP_SHUTDOWN,
SOCKET_TCP_SET_KEEP_ALIVE,
SOCKET_TCP_CREATE_OUTPUT_STREAM,
SOCKET_TCP_CREATE_INPUT_STREAM,
} from "./calls.js";
import { SocketUdpReceive, createUdpSocket, getSocketOrThrow, socketUdpBind, socketUdpCheckSend, socketUdpConnect, socketUdpDisconnect, socketUdpDispose, socketUdpSend } from "./worker-socket-udp.js";
import {
SocketUdpReceive,
createUdpSocket,
getSocketOrThrow,
socketUdpBind,
socketUdpCheckSend,
socketUdpConnect,
socketUdpDisconnect,
socketUdpDispose,
socketUdpSend,
} from "./worker-socket-udp.js";
import {
createTcpSocket,
socketTcpAccept,
socketTcpBind,
socketTcpConnect,
socketTcpDispose,
socketTcpGetLocalAddress,
socketTcpGetRemoteAddress,
socketTcpListen,
socketTcpSetKeepAlive,
socketTcpShutdown,
} from "./worker-socket-tcp.js";

let streamCnt = 0,
pollCnt = 0;
Expand Down Expand Up @@ -251,29 +284,67 @@ function handle(call, id, payload) {
case HTTP_SERVER_CLEAR_OUTGOING_RESPONSE:
return clearOutgoingResponse(id);

// Sockets
// Sockets TCP
case SOCKET_TCP_CREATE_HANDLE:
return createFuture(createTcpSocket());

case SOCKET_TCP_BIND:
return socketTcpBind(id, payload);

case SOCKET_TCP_CONNECT:
return socketTcpConnect(id, payload);

case SOCKET_TCP_LISTEN:
return socketTcpListen(id, payload);

case SOCKET_TCP_ACCEPT:
return socketTcpAccept(id, payload);

case SOCKET_TCP_GET_LOCAL_ADDRESS:
return socketTcpGetLocalAddress(id);

case SOCKET_TCP_GET_REMOTE_ADDRESS:
return socketTcpGetRemoteAddress(id);

case SOCKET_TCP_SHUTDOWN:
return socketTcpShutdown(id, payload);

case SOCKET_TCP_SET_KEEP_ALIVE:
return socketTcpSetKeepAlive(id, payload);

case SOCKET_TCP_DISPOSE:
return socketTcpDispose(id);

case SOCKET_TCP_CREATE_INPUT_STREAM:
// TODO: implement a proper input stream
return createStream(new PassThrough());

case SOCKET_TCP_CREATE_OUTPUT_STREAM:
// TODO: implement a proper output stream
return createStream(new PassThrough());

// Sockets UDP
case SOCKET_UDP_CREATE_HANDLE: {
const { addressFamily, reuseAddr } = payload;
return createFuture(createUdpSocket(addressFamily, reuseAddr));
}

case SOCKET_UDP_BIND:
case SOCKET_UDP_BIND:
return socketUdpBind(id, payload);

case SOCKET_UDP_CHECK_SEND:
case SOCKET_UDP_CHECK_SEND:
return socketUdpCheckSend(id);

case SOCKET_UDP_SEND:
case SOCKET_UDP_SEND:
return socketUdpSend(id, payload);

case SOCKET_UDP_RECEIVE:
case SOCKET_UDP_RECEIVE:
return SocketUdpReceive(id, payload);

case SOCKET_UDP_CONNECT:
case SOCKET_UDP_CONNECT:
return socketUdpConnect(id, payload);

case SOCKET_UDP_DISCONNECT:
case SOCKET_UDP_DISCONNECT:
return socketUdpDisconnect(id);

case SOCKET_UDP_GET_LOCAL_ADDRESS: {
Expand Down Expand Up @@ -350,16 +421,7 @@ function handle(call, id, payload) {
}
}

case OUTPUT_STREAM_CREATE | SOCKET: {
// TODO: implement
break;
}
case INPUT_STREAM_CREATE | SOCKET: {
// TODO: implement
break;
}

case SOCKET_UDP_DISPOSE:
case SOCKET_UDP_DISPOSE:
return socketUdpDispose(id);

// Stdio
Expand Down
Loading

0 comments on commit 823fa9b

Please sign in to comment.