Skip to content

Commit

Permalink
chore: test_udp_sample_application passing (90%)
Browse files Browse the repository at this point in the history
  • Loading branch information
manekinekko committed Dec 12, 2023
1 parent c15d224 commit 779ecaf
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 57 deletions.
105 changes: 81 additions & 24 deletions packages/preview2-shim/lib/io/worker-thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ import {
} from "./calls.js";
import { createUdpSocket } from "./worker-socket-udp.js";

const symbolSocketUdpIpUnspecified = Symbol.symbolSocketUdpIpUnspecified ?? Symbol.for("symbolSocketUdpIpUnspecified");

let streamCnt = 0,
pollCnt = 0;

Expand All @@ -76,8 +78,8 @@ export const unfinishedStreams = new Map();
/** @type {Map<number, NodeJS.Socket>} */
export const openedSockets = new Map();

/** @type {Map<number, Array<{ data: Buffer, rinfo: { address: string, family: string, port: number, size: number } }>>} */
const queuedReceivedSocketDatagrams = [];
/** @type {Map<number, Map<string, { data: Buffer, rinfo: { address: string, family: string, port: number, size: number } }>>} */
const queuedReceivedSocketDatagrams = new Map();

/** @type {Map<number, { value: any, error: bool }>} */
export const unfinishedFutures = new Map();
Expand Down Expand Up @@ -128,20 +130,40 @@ export function getStreamOrThrow(streamId) {

export function getSocketOrThrow(socketId) {
const socket = openedSockets.get(socketId);
if (!socket) throw "closed";
if (!socket) throw "invalid-state";
return socket;
}

// TODO: should we filter by socketId?
export function dequeueReceivedSocketDatagram(maxResults, _socketId) {
const dgrams = queuedReceivedSocketDatagrams.slice(0, Number(maxResults));
export function getSocketByPort(port) {
return Array.from(openedSockets.values()).find((socket) => socket.address().port === port);
}

export function getBoundSockets(socketId) {
return Array.from(openedSockets.entries())
.filter(([id, _socket]) => id !== socketId) // exclude source socket
.map(([_id, socket]) => socket.address());
}

export function dequeueReceivedSocketDatagram(socketInfo, maxResults) {
const dgrams = queuedReceivedSocketDatagrams.get(`PORT:${socketInfo.port}`).splice(0, Number(maxResults));
return dgrams;
}
export function enqueueReceivedSocketDatagram(socketId, dgram) {
queuedReceivedSocketDatagrams.unshift({
...dgram,
socketId,
});
export function enqueueReceivedSocketDatagram(socketInfo, { data, rinfo }) {
const key = `PORT:${socketInfo.port}`;
const chunk = {
data,
rinfo, // sender/remote socket info (source)
socketInfo, // receiver socket info (targeted socket)
};

// create new queue if not exists
if (!queuedReceivedSocketDatagrams.has(key)) {
queuedReceivedSocketDatagrams.set(key, []);
}

// append to queue
const queue = queuedReceivedSocketDatagrams.get(key);
queue.push(chunk);
}

function subscribeInstant(instant) {
Expand Down Expand Up @@ -245,8 +267,18 @@ function handle(call, id, payload) {
}

case SOCKET_UDP_BIND: {
const socket = getSocketOrThrow(id);
const { localAddress, localPort } = payload;
const socket = getSocketOrThrow(id);

// Note: even if the client has bound to IPV4_UNSPECIFIED/IPV6_UNSPECIFIED (0.0.0.0 // ::),
// rinfo.address is resolved to IPV4_LOOPBACK/IPV6_LOOPBACK.
// We need to cache the original bound IP type and fix rinfo.address when receiving datagrams (see below)
// See https://github.com/WebAssembly/wasi-sockets/issues/86
socket[symbolSocketUdpIpUnspecified] = {
isUnspecified: localAddress === "0.0.0.0" || localAddress === "0:0:0:0:0:0:0:0",
localAddress,
};

return new Promise((resolve) => {
socket.bind(
{
Expand All @@ -260,7 +292,21 @@ function handle(call, id, payload) {
);

socket.on("message", (data, rinfo) => {
enqueueReceivedSocketDatagram(id, { data, rinfo });
const remoteSocket = getSocketByPort(rinfo.port);
let { address, port } = socket.address();

if (remoteSocket[symbolSocketUdpIpUnspecified].isUnspecified) {
// cache original bound address
rinfo._address = remoteSocket[symbolSocketUdpIpUnspecified].localAddress;
}

const receiverSocket = {
address,
port,
id,
};

enqueueReceivedSocketDatagram(receiverSocket, { data, rinfo });
});

// catch all errors
Expand All @@ -271,33 +317,44 @@ function handle(call, id, payload) {
}

case SOCKET_UDP_SEND: {
let { remoteHost, remotePort, data, socketId } = payload;
const socket = getSocketOrThrow(socketId);
let { remoteHost, remotePort, data } = payload;
const socket = getSocketOrThrow(id);

return new Promise((resolve) => {
const _callback = (err, byteLength) => {
const _callback = (err, _byteLength) => {
if (err) return resolve(err.errno);
resolve(byteLength);
resolve(0); // success
};

// TODO: figure out how to handle the case when remoteHost is None
// Note: when remoteHost/remotePort is None, we broadcast to all bound sockets
// except the source socket
if (remotePort === undefined || remoteHost === undefined) {
const { address, port, family } = socket.address();
const dgram = { data, rinfo: { address, family, port, size: data.byteLength } };
enqueueReceivedSocketDatagram(socketId, dgram);
resolve(0); // resolve immediately
getBoundSockets(id).forEach((adr) => {
socket.send(data, adr.port, adr.address, _callback);
});
} else {
socket.send(data, remotePort, remoteHost, _callback);
}

socket.once("error", (err) => {
resolve(err.errno);
});
});
}

case SOCKET_UDP_RECEIVE: {
const { maxResults, socketId } = payload;
const dgrams = dequeueReceivedSocketDatagram(maxResults, socketId);
const { maxResults } = payload;
const socket = getSocketOrThrow(id);
const { address, port } = socket.address();

// set target socket info
// we use this to filter out datagrams that are were sent to this socket
const targetSocket = {
address,
port,
};

const dgrams = dequeueReceivedSocketDatagram(targetSocket, maxResults);
return Promise.resolve(dgrams);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ export class TcpSocketImpl {
localPort: 0,
remoteAddress: "",
remotePort: 0,
localIpSocketAddress: null,
};

// this is set by the TcpSocket child class
Expand Down
81 changes: 48 additions & 33 deletions packages/preview2-shim/lib/nodejs/sockets/udp-socket-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,31 @@ export class IncomingDatagramStream {
return [];
}

const datagrams = ioCall(SOCKET_UDP_RECEIVE, this.#pollId, {
maxResults,
socketId: this.#socketId,
});
const datagrams = ioCall(
SOCKET_UDP_RECEIVE,
// socket that's receiving the datagrams
this.#socketId,
{
maxResults,
}
);

return datagrams.map(({ data, rinfo }) => {
let address = rinfo.address;
if (rinfo._address) {
// set the original address that the socket was bound to
address = rinfo._address;
}
const remoteAddress = {
tag: rinfo.family.toLocaleLowerCase(),
val: {
address: deserializeIpAddress(address, rinfo.family),
port: rinfo.port,
},
};
return {
data,
remoteAddress: {
tag: rinfo.family.toLocaleLowerCase(),
val: {
address: deserializeIpAddress(rinfo.address, rinfo.family),
port: rinfo.port,
},
},
remoteAddress,
};
});
}
Expand Down Expand Up @@ -123,6 +133,9 @@ export class OutgoingDatagramStream {
*/
checkSend() {
// TODO: implement the actual check
// socket.getSendBufferSize() - socket.getSendQueueSize() <= 0 --> throw
// attach an event listener to the socket to listen for the drain event
// untill the next sent completes
return 1024n;
}

Expand Down Expand Up @@ -155,15 +168,21 @@ export class OutgoingDatagramStream {
assert(this.checkSend() < data.length, "datagram-too-large");
// TODO: add the other assertions

const ret = ioCall(SOCKET_UDP_SEND, this.#pollId, {
socketId: this.#socketId,
data,
remotePort,
remoteHost: host,
});
if (ret > 0) {
const ret = ioCall(
SOCKET_UDP_SEND,
this.#socketId, // socket that's sending the datagrams
{
data,
remotePort,
remoteHost: host,
}
);
if (ret === 0) {
datagramsSent++;
}
else {
assert(ret === -65, "remote-unreachable");
}
}

return datagramsSent;
Expand Down Expand Up @@ -304,7 +323,7 @@ export class UdpSocket {
assert(isIP(localAddress) === 0, "address-not-bindable");
assert(globalBoundAddresses.has(serializeIpAddress(localIpSocketAddress, true)), "address-in-use");

const err = ioCall(SOCKET_UDP_BIND, this.#pollId, {
const err = ioCall(SOCKET_UDP_BIND, this.id, {
localAddress,
localPort,
});
Expand Down Expand Up @@ -402,7 +421,7 @@ export class UdpSocket {
// this.bind(this.network, this.#socketOptions.localIpSocketAddress);
}

const err = ioCall(SOCKET_UDP_CONNECT, this.#pollId, {
const err = ioCall(SOCKET_UDP_CONNECT, this.id, {
remoteAddress,
remotePort,
});
Expand All @@ -426,7 +445,7 @@ export class UdpSocket {
}

#disconnect() {
const ret = ioCall(SOCKET_UDP_DISCONNECT, this.#pollId);
const ret = ioCall(SOCKET_UDP_DISCONNECT, this.id);

if (ret === 0) {
this[symbolSocketState].connectionState = SocketConnectionState.Closed;
Expand Down Expand Up @@ -480,7 +499,7 @@ export class UdpSocket {
localAddress() {
assert(this[symbolSocketState].isBound === false, "invalid-state");

const out = ioCall(SOCKET_UDP_GET_LOCAL_ADDRESS, this.#pollId);
const out = ioCall(SOCKET_UDP_GET_LOCAL_ADDRESS, this.id);

const { address, port, family } = out;
this.#socketOptions.localAddress = address;
Expand Down Expand Up @@ -508,7 +527,7 @@ export class UdpSocket {
"The socket is not streaming to a specific remote address"
);

const out = ioCall(SOCKET_UDP_GET_REMOTE_ADDRESS, this.#pollId);
const out = ioCall(SOCKET_UDP_GET_REMOTE_ADDRESS, this.id);

assert(out.address === undefined, "invalid-state", "The socket is not streaming to a specific remote address");

Expand Down Expand Up @@ -581,7 +600,7 @@ export class UdpSocket {
setUnicastHopLimit(value) {
assert(value < 1, "invalid-argument", "The TTL value must be 1 or higher");

ioCall(SOCKET_UDP_SET_UNICAST_HOP_LIMIT, this.#pollId, { value });
ioCall(SOCKET_UDP_SET_UNICAST_HOP_LIMIT, this.id, { value });
this[symbolSocketState].unicastHopLimit = value;
}

Expand All @@ -593,16 +612,14 @@ export class UdpSocket {
// TODO: should we throw if the socket is not bound?
// assert(this[symbolSocketState].isBound === false, "invalid-state");

const ret = ioCall(SOCKET_UDP_GET_RECEIVE_BUFFER_SIZE, this.#pollId);
const ret = ioCall(SOCKET_UDP_GET_RECEIVE_BUFFER_SIZE, this.id);

// if (ret === -9) {
// // TODO: handle the case where bad file descriptor (EBADF) is returned
// // This happens when the socket is not bound
// return this[symbolSocketState].receiveBufferSize;
// }

console.log("receiveBufferSize", ret);

return ret;
}

Expand All @@ -616,7 +633,7 @@ export class UdpSocket {
assert(value === 0n, "invalid-argument", "The provided value was 0");

// value = cappedUint32(value);
ioCall(SOCKET_UDP_SET_RECEIVE_BUFFER_SIZE, this.#pollId, { value });
ioCall(SOCKET_UDP_SET_RECEIVE_BUFFER_SIZE, this.id, { value });
}

/**
Expand All @@ -627,16 +644,14 @@ export class UdpSocket {
// TODO: should we throw if the socket is not bound?
// assert(this[symbolSocketState].isBound === false, "invalid-state");

const ret = ioCall(SOCKET_UDP_GET_SEND_BUFFER_SIZE, this.#pollId);
const ret = ioCall(SOCKET_UDP_GET_SEND_BUFFER_SIZE, this.id);

// if (ret === -9) {
// // TODO: handle the case where bad file descriptor (EBADF) is returned
// // This happens when the socket is not bound
// return this[symbolSocketState].sendBufferSize;
// }

console.log("sendBufferSize", ret);

return ret;
}

Expand All @@ -650,7 +665,7 @@ export class UdpSocket {
assert(value === 0n, "invalid-argument", "The provided value was 0");

// value = cappedUint32(value);
ioCall(SOCKET_UDP_SET_SEND_BUFFER_SIZE, this.#pollId, { value });
ioCall(SOCKET_UDP_SET_SEND_BUFFER_SIZE, this.id, { value });

// this.#socket.bufferSize(Number(cappedValue), BufferSizeFlags.SO_SNDBUF, exceptionInfo);
}
Expand All @@ -665,7 +680,7 @@ export class UdpSocket {
}

[symbolDispose]() {
ioCall(SOCKET_UDP_DISPOSE, this.#pollId);
ioCall(SOCKET_UDP_DISPOSE, this.id);
}
}

Expand Down

0 comments on commit 779ecaf

Please sign in to comment.