From 81b049a6dfdea2fa1866a36ee66d3f7187040264 Mon Sep 17 00:00:00 2001 From: Wassim Chegham Date: Fri, 15 Dec 2023 17:08:15 +0100 Subject: [PATCH] chore: migrate wasi-sockets tcp to worker thread --- packages/preview2-shim/lib/io/calls.js | 31 +- .../preview2-shim/lib/io/worker-socket-tcp.js | 143 ++++++ .../preview2-shim/lib/io/worker-socket-udp.js | 2 + .../preview2-shim/lib/io/worker-thread.js | 99 +++- .../lib/nodejs/sockets/tcp-socket-impl.js | 444 ++++++++++-------- .../lib/nodejs/sockets/wasi-sockets.js | 32 +- 6 files changed, 518 insertions(+), 233 deletions(-) create mode 100644 packages/preview2-shim/lib/io/worker-socket-tcp.js diff --git a/packages/preview2-shim/lib/io/calls.js b/packages/preview2-shim/lib/io/calls.js index 966ef4b2f..98a80ccce 100644 --- a/packages/preview2-shim/lib/io/calls.js +++ b/packages/preview2-shim/lib/io/calls.js @@ -55,7 +55,20 @@ export const CLOCKS_NOW = ++call_id << 24; export const CLOCKS_DURATION_SUBSCRIBE = ++call_id << 24; export const CLOCKS_INSTANT_SUBSCRIBE = ++call_id << 24; -// Sockets +// Sockets (TCP) +export const SOCKET_TCP_CREATE_HANDLE = ++call_id << 24; +export const SOCKET_TCP_BIND = ++call_id << 24; +export const SOCKET_TCP_CONNECT = ++call_id << 24; +export const SOCKET_TCP_LISTEN = ++call_id << 24; +export const SOCKET_TCP_ACCEPT = ++call_id << 24; +export const SOCKET_TCP_GET_LOCAL_ADDRESS = ++call_id << 24; +export const SOCKET_TCP_GET_REMOTE_ADDRESS = ++call_id << 24; +export const SOCKET_TCP_SHUTDOWN = ++call_id << 24; +export const SOCKET_TCP_SET_KEEP_ALIVE = ++call_id << 24; +export const SOCKET_TCP_DISPOSE = ++call_id << 24; +export const SOCKET_TCP_CREATE_INPUT_STREAM = ++call_id << 24; +export const SOCKET_TCP_CREATE_OUTPUT_STREAM = ++call_id << 24; +// Sockets (UDP) export const SOCKET_UDP_CREATE_HANDLE = ++call_id << 24; export const SOCKET_UDP_BIND = ++call_id << 24; export const SOCKET_UDP_CONNECT = ++call_id << 24; @@ -142,7 +155,20 @@ export const callMap = { [CLOCKS_DURATION_SUBSCRIBE]: "CLOCKS_DURATION_SUBSCRIBE", [CLOCKS_INSTANT_SUBSCRIBE]: "CLOCKS_INSTANT_SUBSCRIBE", - // Sockets + // Sockets TCP + [SOCKET_TCP_CREATE_HANDLE]: "SOCKET_TCP_CREATE_HANDLE", + [SOCKET_TCP_BIND]: "SOCKET_TCP_BIND", + [SOCKET_TCP_CONNECT]: "SOCKET_TCP_CONNECT", + [SOCKET_TCP_LISTEN]: "SOCKET_TCP_LISTEN", + [SOCKET_TCP_ACCEPT]: "SOCKET_TCP_ACCEPT", + [SOCKET_TCP_GET_LOCAL_ADDRESS]: "SOCKET_TCP_GET_LOCAL_ADDRESS", + [SOCKET_TCP_GET_REMOTE_ADDRESS]: "SOCKET_TCP_GET_REMOTE_ADDRESS", + [SOCKET_TCP_SHUTDOWN]: "SOCKET_TCP_SHUTDOWN", + [SOCKET_TCP_SET_KEEP_ALIVE]: "SOCKET_TCP_SET_KEEP_ALIVE", + [SOCKET_TCP_DISPOSE]: "SOCKET_TCP_DISPOSE", + [SOCKET_TCP_CREATE_INPUT_STREAM]: "SOCKET_TCP_CREATE_INPUT_STREAM", + [SOCKET_TCP_CREATE_OUTPUT_STREAM]: "SOCKET_TCP_CREATE_OUTPUT_STREAM", + // Sockets UDP [SOCKET_UDP_CREATE_HANDLE]: "SOCKET_UDP_CREATE_HANDLE", [SOCKET_UDP_BIND]: "SOCKET_UDP_BIND", [SOCKET_UDP_CONNECT]: "SOCKET_UDP_CONNECT", @@ -158,6 +184,7 @@ export const callMap = { [SOCKET_UDP_GET_SEND_BUFFER_SIZE]: "SOCKET_UDP_GET_SEND_BUFFER_SIZE", [SOCKET_UDP_SET_SEND_BUFFER_SIZE]: "SOCKET_UDP_SET_SEND_BUFFER_SIZE", [SOCKET_UDP_SET_UNICAST_HOP_LIMIT]: "SOCKET_UDP_SET_UNICAST_HOP_LIMIT", + // Socket DNS [SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST]: "SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST", [SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST]: "SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST", [SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST]: "SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST", diff --git a/packages/preview2-shim/lib/io/worker-socket-tcp.js b/packages/preview2-shim/lib/io/worker-socket-tcp.js new file mode 100644 index 000000000..bbffc49d2 --- /dev/null +++ b/packages/preview2-shim/lib/io/worker-socket-tcp.js @@ -0,0 +1,143 @@ +// See: https://github.com/nodejs/node/blob/main/src/tcp_wrap.cc +const { + TCP, + constants: TCPConstants, + TCPConnectWrap, +} = process.binding("tcp_wrap"); + +/** @type {Map} */ +export const openedSockets = new Map(); + +let socketCnt = 0; + +export function getSocketOrThrow(socketId) { + const socket = openedSockets.get(socketId); + if (!socket) throw "invalid-state"; + return socket; +} + +//----------------------------------------------------- + +/** + * @param {IpAddressFamily} addressFamily + * @returns {NodeJS.Socket} + */ +export function createTcpSocket() { + const socket = new TCP(TCPConstants.SOCKET | TCPConstants.SERVER); + openedSockets.set(++socketCnt, socket); + return Promise.resolve(socketCnt); +} + +export function socketTcpBind(id, payload) { + const { localAddress, localPort, family } = payload; + process._rawDebug("socketTcpBind", { + localAddress, localPort, family +}); + + const socket = getSocketOrThrow(id); + + let bind = "bind"; // ipv4 + if (family.toLocaleLowerCase() === "ipv6") { + bind = "bind6"; + } + + return socket[bind](localAddress, localPort); +} + +export function socketTcpConnect(id, payload) { + const socket = getSocketOrThrow(id); + const { remoteAddress, remotePort, localAddress, localPort, family } = + payload; + + process._rawDebug("socketTcpConnect", { + remoteAddress, + remotePort, + localAddress, + localPort, + family + }); + + return new Promise((resolve) => { + const _onClientConnectComplete = (err) => { + if (err) resolve(err); + resolve(0); + }; + const connectReq = new TCPConnectWrap(); + connectReq.oncomplete = _onClientConnectComplete; + connectReq.address = remoteAddress; + connectReq.port = remotePort; + connectReq.localAddress = localAddress; + connectReq.localPort = localPort; + let connect = "connect"; // ipv4 + if (family.toLocaleLowerCase() === "ipv6") { + connect = "connect6"; + } + + socket.onread = (_buffer) => { + // TODO: handle data received from the server + }; + socket.readStart(); + + const err = socket[connect](connectReq, remoteAddress, remotePort); + resolve(err); + }); +} + +export function socketTcpListen(id, payload) { + const socket = getSocketOrThrow(id); + const { backlogSize } = payload; + process._rawDebug("socketTcpListen", { + backlogSize + }); + return socket.listen(backlogSize); +} + +export function socketTcpAccept(id, payload) {} + +export function socketTcpGetLocalAddress(id) { + const socket = getSocketOrThrow(id); + const out = {}; + socket.getsockname(out); + return out; +} + +export function socketTcpGetRemoteAddress(id) { + const socket = getSocketOrThrow(id); + const out = {}; + socket.getpeername(out); + return out; +} + +export function socketTcpShutdown(id, payload) { + const socket = getSocketOrThrow(id); + const { shutdownType } = payload; + + return new Promise((resolve) => { + const req = new ShutdownWrap(); + req.oncomplete = () => { + process._rawDebug("shutdown complete"); + resolve(0); + }; + req.handle = socket; + req.callback = () => { + process._rawDebug("shutdown callback"); + resolve(0); + }; + const err = socket.shutdown(req); + resolve(err); + }); +} + +export function socketTcpSetKeepAlive(id, payload) { + const socket = getSocketOrThrow(id); + const { enable } = payload; + + return socket.setKeepAlive(enable); +} + +export function socketTcpDispose(id) { + const socket = getSocketOrThrow(id); + socket.close(); + openedSockets.delete(id); + return 0; +} diff --git a/packages/preview2-shim/lib/io/worker-socket-udp.js b/packages/preview2-shim/lib/io/worker-socket-udp.js index e71f31cad..e72313782 100644 --- a/packages/preview2-shim/lib/io/worker-socket-udp.js +++ b/packages/preview2-shim/lib/io/worker-socket-udp.js @@ -58,6 +58,8 @@ export function enqueueReceivedSocketDatagram(socketInfo, { data, rinfo }) { queue.push(chunk); } +//----------------------------------------------------- + /** * @param {IpAddressFamily} addressFamily * @returns {NodeJS.Socket} diff --git a/packages/preview2-shim/lib/io/worker-thread.js b/packages/preview2-shim/lib/io/worker-thread.js index a0c52e522..2a22d0330 100644 --- a/packages/preview2-shim/lib/io/worker-thread.js +++ b/packages/preview2-shim/lib/io/worker-thread.js @@ -72,8 +72,42 @@ import { STDOUT, HTTP_SERVER_SET_OUTGOING_RESPONSE, HTTP_SERVER_CLEAR_OUTGOING_RESPONSE, + SOCKET_TCP_CREATE_HANDLE, + SOCKET_TCP_BIND, + SOCKET_TCP_CONNECT, + SOCKET_TCP_LISTEN, + SOCKET_TCP_GET_LOCAL_ADDRESS, + SOCKET_TCP_GET_REMOTE_ADDRESS, + SOCKET_TCP_DISPOSE, + SOCKET_TCP_ACCEPT, + SOCKET_TCP_SHUTDOWN, + SOCKET_TCP_SET_KEEP_ALIVE, + SOCKET_TCP_CREATE_OUTPUT_STREAM, + SOCKET_TCP_CREATE_INPUT_STREAM, } from "./calls.js"; -import { SocketUdpReceive, createUdpSocket, getSocketOrThrow, socketUdpBind, socketUdpCheckSend, socketUdpConnect, socketUdpDisconnect, socketUdpDispose, socketUdpSend } from "./worker-socket-udp.js"; +import { + SocketUdpReceive, + createUdpSocket, + getSocketOrThrow, + socketUdpBind, + socketUdpCheckSend, + socketUdpConnect, + socketUdpDisconnect, + socketUdpDispose, + socketUdpSend, +} from "./worker-socket-udp.js"; +import { + createTcpSocket, + socketTcpAccept, + socketTcpBind, + socketTcpConnect, + socketTcpDispose, + socketTcpGetLocalAddress, + socketTcpGetRemoteAddress, + socketTcpListen, + socketTcpSetKeepAlive, + socketTcpShutdown, +} from "./worker-socket-tcp.js"; let streamCnt = 0, pollCnt = 0; @@ -247,29 +281,67 @@ function handle(call, id, payload) { case HTTP_SERVER_CLEAR_OUTGOING_RESPONSE: return clearOutgoingResponse(id); - // Sockets + // Sockets TCP + case SOCKET_TCP_CREATE_HANDLE: + return createFuture(createTcpSocket()); + + case SOCKET_TCP_BIND: + return socketTcpBind(id, payload); + + case SOCKET_TCP_CONNECT: + return socketTcpConnect(id, payload); + + case SOCKET_TCP_LISTEN: + return socketTcpListen(id, payload); + + case SOCKET_TCP_ACCEPT: + return socketTcpAccept(id, payload); + + case SOCKET_TCP_GET_LOCAL_ADDRESS: + return socketTcpGetLocalAddress(id); + + case SOCKET_TCP_GET_REMOTE_ADDRESS: + return socketTcpGetRemoteAddress(id); + + case SOCKET_TCP_SHUTDOWN: + return socketTcpShutdown(id, payload); + case SOCKET_TCP_SET_KEEP_ALIVE: + return socketTcpSetKeepAlive(id, payload); + + case SOCKET_TCP_DISPOSE: + return socketTcpDispose(id); + + case SOCKET_TCP_CREATE_INPUT_STREAM: + // TODO: implement a proper input stream + return createStream(new PassThrough()); + + case SOCKET_TCP_CREATE_OUTPUT_STREAM: + // TODO: implement a proper output stream + return createStream(new PassThrough()); + + // Sockets UDP case SOCKET_UDP_CREATE_HANDLE: { const { addressFamily, reuseAddr } = payload; return createFuture(createUdpSocket(addressFamily, reuseAddr)); } - case SOCKET_UDP_BIND: + case SOCKET_UDP_BIND: return socketUdpBind(id, payload); - case SOCKET_UDP_CHECK_SEND: + case SOCKET_UDP_CHECK_SEND: return socketUdpCheckSend(id); - case SOCKET_UDP_SEND: + case SOCKET_UDP_SEND: return socketUdpSend(id, payload); - case SOCKET_UDP_RECEIVE: + case SOCKET_UDP_RECEIVE: return SocketUdpReceive(id, payload); - case SOCKET_UDP_CONNECT: + case SOCKET_UDP_CONNECT: return socketUdpConnect(id, payload); - case SOCKET_UDP_DISCONNECT: + case SOCKET_UDP_DISCONNECT: return socketUdpDisconnect(id); case SOCKET_UDP_GET_LOCAL_ADDRESS: { @@ -346,16 +418,7 @@ function handle(call, id, payload) { } } - case OUTPUT_STREAM_CREATE | SOCKET: { - // TODO: implement - break; - } - case INPUT_STREAM_CREATE | SOCKET: { - // TODO: implement - break; - } - - case SOCKET_UDP_DISPOSE: + case SOCKET_UDP_DISPOSE: return socketUdpDispose(id); // Stdio diff --git a/packages/preview2-shim/lib/nodejs/sockets/tcp-socket-impl.js b/packages/preview2-shim/lib/nodejs/sockets/tcp-socket-impl.js index 386c7db68..d9172f4ce 100644 --- a/packages/preview2-shim/lib/nodejs/sockets/tcp-socket-impl.js +++ b/packages/preview2-shim/lib/nodejs/sockets/tcp-socket-impl.js @@ -17,15 +17,32 @@ import { assert } from "../../common/assert.js"; // const { InputStream, OutputStream } = streams; const symbolDispose = Symbol.dispose || Symbol.for("dispose"); -const symbolSocketState = Symbol.SocketInternalState || Symbol.for("SocketInternalState"); -const symbolOperations = Symbol.SocketOperationsState || Symbol.for("SocketOperationsState"); +const symbolSocketState = + Symbol.SocketInternalState || Symbol.for("SocketInternalState"); +const symbolOperations = + Symbol.SocketOperationsState || Symbol.for("SocketOperationsState"); -// See: https://github.com/nodejs/node/blob/main/src/tcp_wrap.cc -const { TCP, TCPConnectWrap, constants: TCPConstants } = process.binding("tcp_wrap"); -const { ShutdownWrap } = process.binding("stream_wrap"); - -import { INPUT_STREAM_CREATE, OUTPUT_STREAM_CREATE, SOCKET } from "../../io/calls.js"; -import { inputStreamCreate, ioCall, outputStreamCreate, pollableCreate } from "../../io/worker-io.js"; +import { + INPUT_STREAM_CREATE, + OUTPUT_STREAM_CREATE, + SOCKET, + SOCKET_TCP_BIND, + SOCKET_TCP_CONNECT, + SOCKET_TCP_CREATE_HANDLE, + SOCKET_TCP_CREATE_INPUT_STREAM, + SOCKET_TCP_CREATE_OUTPUT_STREAM, + SOCKET_TCP_DISPOSE, + SOCKET_TCP_GET_LOCAL_ADDRESS, + SOCKET_TCP_GET_REMOTE_ADDRESS, + SOCKET_TCP_LISTEN, + SOCKET_TCP_SET_KEEP_ALIVE, +} from "../../io/calls.js"; +import { + inputStreamCreate, + ioCall, + outputStreamCreate, + pollableCreate, +} from "../../io/worker-io.js"; import { deserializeIpAddress, findUnsuedLocalAddress, @@ -59,15 +76,16 @@ const globalBoundAddresses = new Map(); // TODO: implement would-block exceptions // TODO: implement concurrency-conflict exceptions -export class TcpSocketImpl { - #allowed; - id = 1; - /** @type {TCP.TCPConstants.SOCKET} */ #socket = null; +export class TcpSocket { /** @type {Network} */ network = null; - #connections = 0; + id = 1; + #allowTcp = true; + #connections = 0; #pollId = null; + #inputStreamId = this.id++; + #outputStreamId = this.id++; // track in-progress operations // counter must be 0 for the operation to be considered complete @@ -111,77 +129,29 @@ export class TcpSocketImpl { localIpSocketAddress: null, }; - // this is set by the TcpSocket child class - #tcpSocketChildClassType = null; + allowed = () => { + return this.#allowTcp; + }; /** * @param {IpAddressFamily} addressFamily - * @param {TcpSocket} childClassType * @param {number} id + * @returns {void} */ - constructor(addressFamily, childClassType, id) { - this.id = id; - - this.#socketOptions.family = addressFamily.toLocaleLowerCase(); - this.#tcpSocketChildClassType = childClassType; - - this.#socket = new TCP(TCPConstants.SOCKET | TCPConstants.SERVER); - } - - #handleConnection(err, newClientSocket) { - if (err) { - assert(true, "unknown", err); - } - - this.#connections++; - - this[symbolSocketState].acceptedClient = new NodeSocket({ - handle: newClientSocket, - }); - this[symbolSocketState].acceptedClient.server = this.#socket; - this[symbolSocketState].acceptedClient._server = this.#socket; - - // TODO: handle data received from the client - this[symbolSocketState].acceptedClient._handle.onread = (nread, buffer) => { - if (nread > 0) { - const data = buffer.toString("utf8", 0, nread); - console.log("accepted socket on read:", data); - } - }; - } - - #handleDisconnect(err) { - if (err) { - assert(true, "unknown", err); - } - - this.#connections--; + static _create(addressFamily, id) { + const socket = new TcpSocket(); + socket.#pollId = ioCall(SOCKET_TCP_CREATE_HANDLE); + socket.id = id; + socket.#socketOptions.family = addressFamily.toLocaleLowerCase(); + return socket; } - #onClientConnectComplete(err) { - if (err) { - // TODO: figure out what theis error mean and why it is thrown - assert(err === -89, "-89"); // on macos - - assert(err === -99, "ephemeral-ports-exhausted"); - assert(err === -104, "connection-reset"); - assert(err === -110, "timeout"); - assert(err === -111, "connection-refused"); - assert(err === -113, "remote-unreachable"); - assert(err === -125, "operation-cancelled"); - - throw new Error(err); - } - - this[symbolSocketState].connectionState = SocketConnectionState.Connected; - } - - // TODO: is this needed? - #handleAfterShutdown() {} - #autoBind(network, ipFamily) { const unsusedLocalAddress = findUnsuedLocalAddress(ipFamily); - this.#socketOptions.localAddress = serializeIpAddress(unsusedLocalAddress, this.#socketOptions.family); + this.#socketOptions.localAddress = serializeIpAddress( + unsusedLocalAddress, + this.#socketOptions.family + ); this.#socketOptions.localPort = unsusedLocalAddress.val.port; this.startBind(network, unsusedLocalAddress); this.finishBind(); @@ -194,7 +164,7 @@ export class TcpSocketImpl { if (localPort === 0) { boundAddress = this.localAddress(); } - globalBoundAddresses.set(serializeIpAddress(boundAddress, true), this.#socket); + globalBoundAddresses.set(serializeIpAddress(boundAddress, true), this.id); } /** @@ -207,22 +177,31 @@ export class TcpSocketImpl { * @throws {invalid-state} The socket is already bound. (EINVAL) */ startBind(network, localAddress) { - if (!this.allowed()) - throw 'access-denied'; + console.log("startBind"); + + if (!this.allowed()) throw "access-denied"; try { - assert(this[symbolSocketState].isBound, "invalid-state", "The socket is already bound"); + assert( + this[symbolSocketState].isBound, + "invalid-state", + "The socket is already bound" + ); const address = serializeIpAddress(localAddress); const ipFamily = `ipv${isIP(address)}`; assert( - this.#socketOptions.family.toLocaleLowerCase() !== ipFamily.toLocaleLowerCase(), + this.#socketOptions.family.toLocaleLowerCase() !== + ipFamily.toLocaleLowerCase(), "invalid-argument", "The `local-address` has the wrong address family" ); assert(isUnicastIpAddress(localAddress) === false, "invalid-argument"); - assert(isIPv4MappedAddress(localAddress) && this.ipv6Only(), "invalid-argument"); + assert( + isIPv4MappedAddress(localAddress) && this.ipv6Only(), + "invalid-argument" + ); const { port } = localAddress.val; this.#socketOptions.localIpSocketAddress = localAddress; @@ -249,20 +228,23 @@ export class TcpSocketImpl { try { assert(this[symbolOperations].bind === 0, "not-in-progress"); - const { localAddress, localIpSocketAddress, localPort, family } = this.#socketOptions; + const { localAddress, localIpSocketAddress, localPort, family } = + this.#socketOptions; assert(isIP(localAddress) === 0, "address-not-bindable"); - assert(globalBoundAddresses.has(serializeIpAddress(localIpSocketAddress, true)), "address-in-use"); - - let err = null; - let bind = "bind"; // ipv4 - if (family.toLocaleLowerCase() === "ipv6") { - bind = "bind6"; - } + assert( + globalBoundAddresses.has( + serializeIpAddress(localIpSocketAddress, true) + ), + "address-in-use" + ); - err = this.#socket[bind](localAddress, localPort); + const err = ioCall(SOCKET_TCP_BIND, this.id, { + localAddress, + localPort, + family, + }); if (err) { - this.#socket.close(); assert(err === -22, "address-in-use"); assert(err === -49, "address-not-bindable"); assert(err === -99, "address-not-bindable"); // EADDRNOTAVAIL @@ -295,20 +277,43 @@ export class TcpSocketImpl { * @throws {invalid-state} The socket is already in the Listener state. (EOPNOTSUPP, EINVAL on Windows) */ startConnect(network, remoteAddress) { - if (!this.allowed()) - throw 'access-denied'; + console.log("startConnect"); + + if (!this.allowed()) throw "access-denied"; const host = serializeIpAddress(remoteAddress); const ipFamily = `ipv${isIP(host)}`; try { - assert(this[symbolSocketState].connectionState === SocketConnectionState.Connected, "invalid-state"); - assert(this[symbolSocketState].connectionState === SocketConnectionState.Connecting, "invalid-state"); - assert(this[symbolSocketState].connectionState === SocketConnectionState.Listening, "invalid-state"); + assert( + this[symbolSocketState].connectionState === + SocketConnectionState.Connected, + "invalid-state" + ); + assert( + this[symbolSocketState].connectionState === + SocketConnectionState.Connecting, + "invalid-state" + ); + assert( + this[symbolSocketState].connectionState === + SocketConnectionState.Listening, + "invalid-state" + ); - assert(host === "0.0.0.0" || host === "0:0:0:0:0:0:0:0", "invalid-argument"); - assert(this.#socketOptions.family.toLocaleLowerCase() !== ipFamily.toLocaleLowerCase(), "invalid-argument"); + assert( + host === "0.0.0.0" || host === "0:0:0:0:0:0:0:0", + "invalid-argument" + ); + assert( + this.#socketOptions.family.toLocaleLowerCase() !== + ipFamily.toLocaleLowerCase(), + "invalid-argument" + ); assert(isUnicastIpAddress(remoteAddress) === false, "invalid-argument"); assert(isMulticastIpAddress(remoteAddress), "invalid-argument"); - assert(isIPv4MappedAddress(remoteAddress) && this.ipv6Only(), "invalid-argument"); + assert( + isIPv4MappedAddress(remoteAddress) && this.ipv6Only(), + "invalid-argument" + ); assert(remoteAddress.val.port === 0, "invalid-argument"); if (this[symbolSocketState].isBound === false) { @@ -317,7 +322,10 @@ export class TcpSocketImpl { assert(network !== this.network, "invalid-argument"); assert(ipFamily.toLocaleLowerCase() === "ipv0", "invalid-argument"); - assert(remoteAddress.val.port === 0 && platform() === "win32", "invalid-argument"); + assert( + remoteAddress.val.port === 0 && platform() === "win32", + "invalid-argument" + ); } catch (err) { this[symbolSocketState].lastErrorState = err; throw err; @@ -353,43 +361,40 @@ export class TcpSocketImpl { this[symbolSocketState].lastErrorState = null; - const { localAddress, localPort, remoteAddress, remotePort, family } = this.#socketOptions; - const connectReq = new TCPConnectWrap(); + const { localAddress, localPort, remoteAddress, remotePort, family } = + this.#socketOptions; - let err = null; - let connect = "connect"; // ipv4 - if (family.toLocaleLowerCase() === "ipv6") { - connect = "connect6"; - } + this[symbolSocketState].connectionState = SocketConnectionState.Connecting; - err = this.#socket[connect](connectReq, remoteAddress, remotePort); + const err = ioCall(SOCKET_TCP_CONNECT, this.id, { + remoteAddress, + remotePort, + localAddress, + localPort, + family, + }); if (err) { - console.error(`[tcp] connect error on socket: ${err}`); + // TODO: figure out what these error mean and why it is thrown + assert(err === -89, "-89"); // on macos + + assert(err === -99, "ephemeral-ports-exhausted"); + assert(err === -101, "remote-unreachable"); + assert(err === -104, "connection-reset"); + assert(err === -110, "timeout"); + assert(err === -111, "connection-refused"); + assert(err === -113, "remote-unreachable"); + assert(err === -125, "operation-cancelled"); this[symbolSocketState].connectionState = SocketConnectionState.Error; + throw new Error(err); } - connectReq.oncomplete = this.#onClientConnectComplete.bind(this); - connectReq.address = remoteAddress; - connectReq.port = remotePort; - connectReq.localAddress = localAddress; - connectReq.localPort = localPort; - - this.#socket.onread = (_buffer) => { - // TODO: handle data received from the server - }; + const inputStream = this.#inputStreamId = ioCall(SOCKET_TCP_CREATE_INPUT_STREAM, null, {}); - this.#socket.readStart(); + const outputStream = this.#outputStreamId = ioCall(SOCKET_TCP_CREATE_OUTPUT_STREAM, null, {}); - const inputStream = inputStreamCreate(SOCKET, ioCall(INPUT_STREAM_CREATE | SOCKET, null, {})); - const outputStream = outputStreamCreate(SOCKET, ioCall(OUTPUT_STREAM_CREATE | SOCKET, null, {})); - - this[symbolOperations].connect--; - this[symbolSocketState].connectionState = SocketConnectionState.Connecting; - - // TODO: this is a temporary workaround, move this to the connection callback - // when the connection is actually established this[symbolSocketState].connectionState = SocketConnectionState.Connected; + this[symbolOperations].connect--; return [inputStream, outputStream]; } @@ -401,13 +406,22 @@ export class TcpSocketImpl { * @throws {invalid-state} The socket is already in the Listener state. */ startListen() { - if (!this.allowed()) - throw 'access-denied'; + console.log("startListen"); + + if (!this.allowed()) throw "access-denied"; try { assert(this[symbolSocketState].lastErrorState !== null, "invalid-state"); assert(this[symbolSocketState].isBound === false, "invalid-state"); - assert(this[symbolSocketState].connectionState === SocketConnectionState.Connected, "invalid-state"); - assert(this[symbolSocketState].connectionState === SocketConnectionState.Listening, "invalid-state"); + assert( + this[symbolSocketState].connectionState === + SocketConnectionState.Connected, + "invalid-state" + ); + assert( + this[symbolSocketState].connectionState === + SocketConnectionState.Listening, + "invalid-state" + ); } catch (err) { this[symbolSocketState].lastErrorState = err; throw err; @@ -433,10 +447,11 @@ export class TcpSocketImpl { this[symbolSocketState].lastErrorState = null; - const err = this.#socket.listen(this[symbolSocketState].backlogSize); + const err = ioCall(SOCKET_TCP_LISTEN, this.id, { + backlogSize: this[symbolSocketState].backlogSize, + }); if (err) { console.error(`[tcp] listen error on socket: ${err}`); - this.#socket.close(); // TODO: handle errors throw new Error(err); @@ -454,12 +469,19 @@ export class TcpSocketImpl { * @throws {new-socket-limit} The new socket resource could not be created because of a system limit. (EMFILE, ENFILE) */ accept() { - if (!this.allowed()) - throw 'access-denied'; + console.log("accept", { + s: this[symbolSocketState], + }); + + if (!this.allowed()) throw "access-denied"; this[symbolOperations].accept++; try { - assert(this[symbolSocketState].connectionState !== SocketConnectionState.Listening, "invalid-state"); + assert( + this[symbolSocketState].connectionState !== + SocketConnectionState.Listening, + "invalid-state" + ); } catch (err) { this[symbolSocketState].lastErrorState = err; throw err; @@ -470,51 +492,46 @@ export class TcpSocketImpl { if (this[symbolSocketState].isBound === false) { this.#autoBind(this.network, this.addressFamily()); } - const inputStream = inputStreamCreate(SOCKET, ioCall(INPUT_STREAM_CREATE | SOCKET, null, {})); - const outputStream = outputStreamCreate(SOCKET, ioCall(OUTPUT_STREAM_CREATE | SOCKET, null, {})); - - // Because we have to return a valid TcpSocket resrouce type, - // we need to instantiate the correct child class - // TODO: figure out a more elegant way to do this - const socket = new this.#tcpSocketChildClassType(this.addressFamily()); - - // The returned socket is bound and in the Connection state. - // The following properties are inherited from the listener socket: - // - `address-family` - // - `ipv6-only` - // - `keep-alive-enabled` - // - `keep-alive-idle-time` - // - `keep-alive-interval` - // - `keep-alive-count` - // - `hop-limit` - // - `receive-buffer-size` - // - `send-buffer-size` - // - socket[symbolSocketState].ipv6Only = this[symbolSocketState].ipv6Only; - socket[symbolSocketState].keepAlive = this[symbolSocketState].keepAlive; - socket[symbolSocketState].keepAliveIdleTime = this[symbolSocketState].keepAliveIdleTime; - socket[symbolSocketState].keepAliveInterval = this[symbolSocketState].keepAliveInterval; - socket[symbolSocketState].keepAliveCount = this[symbolSocketState].keepAliveCount; - socket[symbolSocketState].hopLimit = this[symbolSocketState].hopLimit; - socket[symbolSocketState].receiveBufferSize = this[symbolSocketState].receiveBufferSize; - socket[symbolSocketState].sendBufferSize = this[symbolSocketState].sendBufferSize; + const inputStream = inputStreamCreate( + SOCKET, + ioCall(INPUT_STREAM_CREATE | SOCKET, this.#inputStreamId, {}) + ); + const outputStream = outputStreamCreate( + SOCKET, + ioCall(OUTPUT_STREAM_CREATE | SOCKET, this.#inputStreamId, {}) + ); + + const socket = tcpSocketImplCreate(this.addressFamily(), this.id + 1); + this.#cloneSocketState(socket); this[symbolOperations].accept--; return [socket, inputStream, outputStream]; } + #cloneSocketState(socket) { + socket.setIpv6Only(this.ipv6Only()); + socket.setKeepAliveEnabled(this.keepAliveEnabled()); + socket.setKeepAliveIdleTime(this.keepAliveIdleTime()); + socket.setKeepAliveInterval(this.keepAliveInterval()); + socket.setKeepAliveCount(this.keepAliveCount()); + socket.setHopLimit(this.hopLimit()); + socket.setReceiveBufferSize(this.receiveBufferSize()); + socket.setSendBufferSize(this.sendBufferSize()); + } + /** * @returns {IpSocketAddress} * @throws {invalid-state} The socket is not bound to any local address. */ localAddress() { + console.log("localAddress"); assert(this[symbolSocketState].isBound === false, "invalid-state"); - const out = {}; - this.#socket.getsockname(out); - - const { address, port, family } = out; + const { address, port, family } = ioCall( + SOCKET_TCP_GET_LOCAL_ADDRESS, + this.id + ); this.#socketOptions.localAddress = address; this.#socketOptions.localPort = port; this.#socketOptions.family = family.toLocaleLowerCase(); @@ -533,12 +550,18 @@ export class TcpSocketImpl { * @throws {invalid-state} The socket is not connected to a remote address. (ENOTCONN) */ remoteAddress() { - assert(this[symbolSocketState].connectionState !== SocketConnectionState.Connected, "invalid-state"); - - const out = {}; - this.#socket.getpeername(out); - - const { address, port, family } = out; + console.log("remoteAddress"); + + assert( + this[symbolSocketState].connectionState !== + SocketConnectionState.Connected, + "invalid-state" + ); + + const { address, port, family } = ioCall( + SOCKET_TCP_GET_REMOTE_ADDRESS, + this.id + ); this.#socketOptions.remoteAddress = address; this.#socketOptions.remotePort = port; this.#socketOptions.family = family.toLocaleLowerCase(); @@ -553,7 +576,10 @@ export class TcpSocketImpl { } isListening() { - return this[symbolSocketState].connectionState === SocketConnectionState.Listening; + return ( + this[symbolSocketState].connectionState === + SocketConnectionState.Listening + ); } /** @@ -568,7 +594,10 @@ export class TcpSocketImpl { * @throws {not-supported} (get/set) `this` socket is an IPv4 socket. */ ipv6Only() { - assert(this.#socketOptions.family.toLocaleLowerCase() === "ipv4", "not-supported"); + assert( + this.#socketOptions.family.toLocaleLowerCase() === "ipv4", + "not-supported" + ); return this[symbolSocketState].ipv6Only; } @@ -581,7 +610,11 @@ export class TcpSocketImpl { * @throws {not-supported} (set) Host does not support dual-stack sockets. (Implementations are not required to.) */ setIpv6Only(value) { - assert(this.#socketOptions.family.toLocaleLowerCase() === "ipv4", "not-supported"); + console.log("setIpv6Only"); + assert( + this.#socketOptions.family.toLocaleLowerCase() === "ipv4", + "not-supported" + ); assert(this[symbolSocketState].isBound, "invalid-state"); this[symbolSocketState].ipv6Only = value; @@ -595,8 +628,14 @@ export class TcpSocketImpl { * @throws {invalid-state} (set) The socket is already in the Connection state. */ setListenBacklogSize(value) { + console.log("setListenBacklogSize"); + assert(value === 0n, "invalid-argument", "The provided value was 0."); - assert(this[symbolSocketState].connectionState === SocketConnectionState.Connected, "invalid-state"); + assert( + this[symbolSocketState].connectionState === + SocketConnectionState.Connected, + "invalid-state" + ); this[symbolSocketState].backlogSize = Number(value); } @@ -613,10 +652,13 @@ export class TcpSocketImpl { * @returns {void} */ setKeepAliveEnabled(value) { - this.#socket.setKeepAlive(value); + ioCall(SOCKET_TCP_SET_KEEP_ALIVE, this.id, { + keepAlive: value, + }); + this[symbolSocketState].keepAlive = value; - if (value) { + if (value === true) { this.setKeepAliveIdleTime(this.keepAliveIdleTime()); this.setKeepAliveInterval(this.keepAliveInterval()); this.setKeepAliveCount(this.keepAliveCount()); @@ -638,6 +680,7 @@ export class TcpSocketImpl { * @throws {invalid-argument} (set) The idle time must be 1 or higher. */ setKeepAliveIdleTime(value) { + value = Number(value); assert(value < 1, "invalid-argument", "The idle time must be 1 or higher."); this[symbolSocketState].keepAliveIdleTime = value; @@ -658,6 +701,7 @@ export class TcpSocketImpl { * @throws {invalid-argument} (set) The interval must be 1 or higher. */ setKeepAliveInterval(value) { + value = Number(value); assert(value < 1, "invalid-argument", "The interval must be 1 or higher."); this[symbolSocketState].keepAliveInterval = value; @@ -678,6 +722,7 @@ export class TcpSocketImpl { * @throws {invalid-argument} (set) The count must be 1 or higher. */ setKeepAliveCount(value) { + value = Number(value); assert(value < 1, "invalid-argument", "The count must be 1 or higher."); // TODO: set this on the client socket as well @@ -701,6 +746,7 @@ export class TcpSocketImpl { * @description Not available on Node.js (see https://github.com/WebAssembly/wasi-sockets/blob/main/Posix-compatibility.md#socket-options) */ setHopLimit(value) { + value = Number(value); assert(value < 1, "invalid-argument", "The TTL value must be 1 or higher."); this[symbolSocketState].hopLimit = value; @@ -710,7 +756,7 @@ export class TcpSocketImpl { * @returns {bigint} */ receiveBufferSize() { - return this[symbolSocketState].receiveBufferSize; + return BigInt(this[symbolSocketState].receiveBufferSize); } /** @@ -721,9 +767,11 @@ export class TcpSocketImpl { * @throws {invalid-state} (set) The socket is already in the Connection state. */ setReceiveBufferSize(value) { + value = Number(value); + // TODO: review these assertions based on WIT specs // assert(this[symbolSocketState].connectionState === SocketConnectionState.Connected, "invalid-state"); - assert(value === 0n, "invalid-argument", "The provided value was 0."); + assert(value === 0, "invalid-argument", "The provided value was 0."); // TODO: set this on the client socket as well this[symbolSocketState].receiveBufferSize = value; @@ -733,7 +781,7 @@ export class TcpSocketImpl { * @returns {bigint} */ sendBufferSize() { - return this[symbolSocketState].sendBufferSize; + return BigInt(this[symbolSocketState].sendBufferSize); } /** @@ -744,9 +792,11 @@ export class TcpSocketImpl { * @throws {invalid-state} (set) The socket is already in the Listener state. */ setSendBufferSize(value) { + value = Number(value); + // TODO: review these assertions based on WIT specs // assert(this[symbolSocketState].connectionState === SocketConnectionState.Connected, "invalid-state"); - assert(value === 0n, "invalid-argument", "The provided value was 0."); + assert(value === 0, "invalid-argument", "The provided value was 0."); // TODO: set this on the client socket as well this[symbolSocketState].sendBufferSize = value; @@ -767,7 +817,12 @@ export class TcpSocketImpl { * @throws {invalid-state} The socket is not in the Connection state. (ENOTCONN) */ shutdown(shutdownType) { - assert(this[symbolSocketState].connectionState !== SocketConnectionState.Connected, "invalid-state"); + console.log("shutdown"); + assert( + this[symbolSocketState].connectionState !== + SocketConnectionState.Connected, + "invalid-state" + ); // TODO: figure out how to handle shutdownTypes if (shutdownType === ShutdownType.Receive) { @@ -779,26 +834,29 @@ export class TcpSocketImpl { this[symbolSocketState].canSend = false; } - const req = new ShutdownWrap(); - req.oncomplete = this.#handleAfterShutdown.bind(this); - req.handle = this._handle; - req.callback = () => {}; - const err = this._handle.shutdown(req); + const err = ioCall(SOCKET_TCP_SHUTDOWN, this.id, { + shutdownType, + }); assert(err === 1, "invalid-state"); } [symbolDispose]() { - this.#socket.close(); + ioCall(SOCKET_TCP_DISPOSE, this.id); // we only need to remove the bound address from the global map // if the socket was already bound if (this[symbolSocketState].isBound) { - globalBoundAddresses.delete(serializeIpAddress(this.#socketOptions.localIpSocketAddress, true)); + globalBoundAddresses.delete( + serializeIpAddress(this.#socketOptions.localIpSocketAddress, true) + ); } } handle() { - return this.#socket; + // return this.#socket; } } + +export const tcpSocketImplCreate = TcpSocket._create; +delete TcpSocket._create; diff --git a/packages/preview2-shim/lib/nodejs/sockets/wasi-sockets.js b/packages/preview2-shim/lib/nodejs/sockets/wasi-sockets.js index 43ec2c39b..70a92bd3e 100644 --- a/packages/preview2-shim/lib/nodejs/sockets/wasi-sockets.js +++ b/packages/preview2-shim/lib/nodejs/sockets/wasi-sockets.js @@ -16,7 +16,7 @@ import { } from "../../io/calls.js"; import { ioCall, pollableCreate } from "../../io/worker-io.js"; import { deserializeIpAddress } from "./socket-common.js"; -import { TcpSocketImpl } from "./tcp-socket-impl.js"; +import { TcpSocket, tcpSocketImplCreate } from "./tcp-socket-impl.js"; import { IncomingDatagramStream, OutgoingDatagramStream, UdpSocket, udpSocketImplCreate } from "./udp-socket-impl.js"; const symbolDispose = Symbol.dispose || Symbol.for("dispose"); @@ -150,19 +150,6 @@ export class WasiSockets { IncomingDatagramStream, }; - class TcpSocket extends TcpSocketImpl { - /** - * @param {IpAddressFamily} addressFamily - * */ - constructor(addressFamily) { - super(addressFamily, TcpSocket, net.socketCnt++); - net.tcpSockets.set(this.id, this); - } - allowed () { - return net.#allowTcp; - } - } - this.tcp = { TcpSocket, }; @@ -237,12 +224,17 @@ export class WasiSockets { "The new socket resource could not be created because of a system limit" ); - // try { - return new TcpSocket(addressFamily); - // } catch (err) { - // // assert(true, errorCode.unknown, err); - // throw err; - // } + try { + const id = net.socketCnt++; + const tcpSocket = tcpSocketImplCreate(addressFamily, id); + net.tcpSockets.set(id, tcpSocket); + return tcpSocket; + } catch (err) { + console.log("tcp socket create error", { + err, + }); + assert(true, errorCode.notSupported, err); + } }, };