Skip to content

Commit

Permalink
sockets refactoring (#329)
Browse files Browse the repository at this point in the history
  • Loading branch information
guybedford authored Jan 4, 2024
1 parent 46f589b commit 05b016b
Show file tree
Hide file tree
Showing 20 changed files with 747 additions and 1,216 deletions.
7 changes: 0 additions & 7 deletions packages/preview2-shim/lib/common/assert.js

This file was deleted.

17 changes: 9 additions & 8 deletions packages/preview2-shim/lib/io/worker-socket-tcp.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ const {
const { ShutdownWrap } = process.binding("stream_wrap");

/** @type {Map<number, NodeJS.Socket>} */
export const openedSockets = new Map();
export const openTcpSockets = new Map();

let socketCnt = 0;
let tcpSocketCnt = 0;

export function getSocketOrThrow(socketId) {
const socket = openedSockets.get(socketId);
const socket = openTcpSockets.get(socketId);
if (!socket) throw "invalid-socket";
return socket;
}
Expand All @@ -25,16 +25,16 @@ export function getSocketOrThrow(socketId) {
*/
export function createTcpSocket() {
const socket = new TCP(TCPConstants.SOCKET | TCPConstants.SERVER);
openedSockets.set(++socketCnt, socket);
return Promise.resolve(socketCnt);
openTcpSockets.set(++tcpSocketCnt, socket);
return tcpSocketCnt;
}

export function socketTcpBind(id, payload) {
const { localAddress, localPort, family, isIpV6Only } = payload;
const socket = getSocketOrThrow(id);

let bind = "bind"; // ipv4
if (family.toLocaleLowerCase() === "ipv6") {
if (family === "ipv6") {
bind = "bind6"; // ipv6
}

Expand Down Expand Up @@ -63,10 +63,9 @@ export function socketTcpConnect(id, payload) {
connectReq.localAddress = localAddress;
connectReq.localPort = localPort;
let connect = "connect"; // ipv4
if (family.toLocaleLowerCase() === "ipv6") {
if (family === "ipv6") {
connect = "connect6";
}

socket.onread = (_buffer) => {
// TODO: handle data received from the server
};
Expand All @@ -87,13 +86,15 @@ export function socketTcpGetLocalAddress(id) {
const socket = getSocketOrThrow(id);
const out = {};
socket.getsockname(out);
out.family = out.family.toLowerCase();
return out;
}

export function socketTcpGetRemoteAddress(id) {
const socket = getSocketOrThrow(id);
const out = {};
socket.getpeername(out);
out.family = out.family.toLowerCase();
return out;
}

Expand Down
60 changes: 24 additions & 36 deletions packages/preview2-shim/lib/io/worker-socket-udp.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,27 @@ const symbolSocketUdpIpUnspecified =
Symbol.for("symbolSocketUdpIpUnspecified");

/** @type {Map<number, NodeJS.Socket>} */
export const openedSockets = new Map();
export const openUdpSockets = new Map();

/** @type {Map<number, Map<string, { data: Buffer, rinfo: { address: string, family: string, port: number, size: number } }>>} */
const queuedReceivedSocketDatagrams = new Map();

let socketCnt = 0;
let udpSocketCnt = 0;

export function getSocketOrThrow(socketId) {
const socket = openedSockets.get(socketId);
export function getUdpSocketOrThrow(socketId) {
const socket = openUdpSockets.get(socketId);
if (!socket) throw "invalid-state";
return socket;
}

export function getSocketByPort(port) {
return Array.from(openedSockets.values()).find(
export function getUdpSocketByPort(port) {
return Array.from(openUdpSockets.values()).find(
(socket) => socket.address().port === port
);
}

export function getBoundSockets(socketId) {
return Array.from(openedSockets.entries())
export function getBoundUdpSockets(socketId) {
return Array.from(openUdpSockets.entries())
.filter(([id, _socket]) => id !== socketId) // exclude source socket
.map(([_id, socket]) => socket.address());
}
Expand Down Expand Up @@ -65,27 +65,15 @@ export function enqueueReceivedSocketDatagram(socketInfo, { data, rinfo }) {
* @returns {NodeJS.Socket}
*/
export function createUdpSocket(addressFamily, reuseAddr) {
return new Promise((resolve, reject) => {
const type = addressFamily === "ipv6" ? "udp6" : "udp4";
try {
const socket = createSocket({
type,
reuseAddr,
});
openedSockets.set(++socketCnt, socket);
resolve({
id: socketCnt,
socket,
});
} catch (e) {
reject(e);
}
});
const type = addressFamily === "ipv6" ? "udp6" : "udp4";
const socket = createSocket({ type, reuseAddr });
openUdpSockets.set(++udpSocketCnt, socket);
return udpSocketCnt;
}

export function socketUdpBind(id, payload) {
const { localAddress, localPort } = payload;
const socket = getSocketOrThrow(id);
const socket = getUdpSocketOrThrow(id);

// Note: even if the client has bound to IPV4_UNSPECIFIED/IPV6_UNSPECIFIED (0.0.0.0 // ::),
// rinfo.address is resolved to IPV4_LOOPBACK/IPV6_LOOPBACK.
Expand All @@ -104,13 +92,13 @@ export function socketUdpBind(id, payload) {
port: localPort,
},
() => {
openedSockets.set(id, socket);
openUdpSockets.set(id, socket);
resolve(0);
}
);

socket.on("message", (data, rinfo) => {
const remoteSocket = getSocketByPort(rinfo.port);
const remoteSocket = getUdpSocketByPort(rinfo.port);
let { address, port } = socket.address();

if (remoteSocket[symbolSocketUdpIpUnspecified].isUnspecified) {
Expand All @@ -136,7 +124,7 @@ export function socketUdpBind(id, payload) {
}

export function socketUdpCheckSend(id) {
const socket = getSocketOrThrow(id);
const socket = getUdpSocketOrThrow(id);
try {
return socket.getSendBufferSize() - socket.getSendQueueSize();
} catch (err) {
Expand All @@ -146,7 +134,7 @@ export function socketUdpCheckSend(id) {

export function socketUdpSend(id, payload) {
let { remoteHost, remotePort, data } = payload;
const socket = getSocketOrThrow(id);
const socket = getUdpSocketOrThrow(id);

return new Promise((resolve) => {
const _callback = (err, _byteLength) => {
Expand All @@ -157,7 +145,7 @@ export function socketUdpSend(id, payload) {
// Note: when remoteHost/remotePort is None, we broadcast to all bound sockets
// except the source socket
if (remotePort === undefined || remoteHost === undefined) {
getBoundSockets(id).forEach((adr) => {
getBoundUdpSockets(id).forEach((adr) => {
socket.send(data, adr.port, adr.address, _callback);
});
} else {
Expand All @@ -172,7 +160,7 @@ export function socketUdpSend(id, payload) {

export function SocketUdpReceive(id, payload) {
const { maxResults } = payload;
const socket = getSocketOrThrow(id);
const socket = getUdpSocketOrThrow(id);
const { address, port } = socket.address();

// set target socket info
Expand All @@ -187,11 +175,11 @@ export function SocketUdpReceive(id, payload) {
}

export function socketUdpConnect(id, payload) {
const socket = getSocketOrThrow(id);
const socket = getUdpSocketOrThrow(id);
const { remoteAddress, remotePort } = payload;
return new Promise((resolve) => {
socket.connect(remotePort, remoteAddress, () => {
openedSockets.set(id, socket);
openUdpSockets.set(id, socket);
resolve(0);
});
socket.once("error", (err) => {
Expand All @@ -201,18 +189,18 @@ export function socketUdpConnect(id, payload) {
}

export function socketUdpDisconnect(id) {
const socket = getSocketOrThrow(id);
const socket = getUdpSocketOrThrow(id);
return new Promise((resolve) => {
socket.disconnect();
resolve(0);
});
}

export function socketUdpDispose(id) {
const socket = getSocketOrThrow(id);
const socket = getUdpSocketOrThrow(id);
return new Promise((resolve) => {
socket.close(() => {
openedSockets.delete(id);
openUdpSockets.delete(id);
resolve(0);
});
});
Expand Down
65 changes: 65 additions & 0 deletions packages/preview2-shim/lib/io/worker-sockets.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { lookup } from "node:dns/promises";
import {
ALL,
BADFAMILY,
CANCELLED,
CONNREFUSED,
NODATA,
NOMEM,
NONAME,
NOTFOUND,
REFUSED,
SERVFAIL,
TIMEOUT,
V4MAPPED,
} from "node:dns";
import { ipv4ToTuple, ipv6ToTuple } from "../nodejs/sockets/socket-common.js";

const dnsLookupOptions = {
verbatim: true,
all: true,
hints: V4MAPPED | ALL,
};

export function socketResolveAddress(hostname) {
return lookup(hostname, dnsLookupOptions).then(
(addresses) => {
return addresses.map(({ address, family }) => {
[
{
tag: "ipv" + family,
val: (family === 4 ? ipv4ToTuple : ipv6ToTuple)(address),
},
];
});
},
(err) => {
switch (err.code) {
// TODO: verify these more carefully
case NODATA:
case BADFAMILY:
case NONAME:
case NOTFOUND:
throw "name-unresolvable";
case TIMEOUT:
case REFUSED:
case CONNREFUSED:
case SERVFAIL:
case NOMEM:
case CANCELLED:
throw "temporary-resolver-failure";
default:
throw "permanent-resolver-failure";
}
}
);
}

export function convertSocketError(err) {
switch (err?.code) {
case "EBADF":
return "invalid-state";
}
process._rawDebug(err);
return "unknown";
}
Loading

0 comments on commit 05b016b

Please sign in to comment.