From 4203c0c6f9a0e08a06e640dbc24ec3dd289804c0 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Fri, 7 Mar 2025 23:05:50 -0800 Subject: [PATCH] js: de-class-ify node:net.Socket --- docs/api/tcp.md | 6 +- packages/bun-types/bun.d.ts | 2 +- src/bun.js/api/bun/socket.zig | 32 +- src/bun.js/bindings/BunProcess.cpp | 2 +- src/deps/uws.zig | 12 +- src/js/node/net.ts | 1834 ++++++++++++++-------------- 6 files changed, 925 insertions(+), 963 deletions(-) diff --git a/docs/api/tcp.md b/docs/api/tcp.md index 7b0083f8ad90da..da9a5fa3180c87 100644 --- a/docs/api/tcp.md +++ b/docs/api/tcp.md @@ -11,7 +11,7 @@ Bun.listen({ socket: { data(socket, data) {}, // message received from client open(socket) {}, // socket opened - close(socket) {}, // socket closed + close(socket, error) {}, // socket closed drain(socket) {}, // socket ready for more data error(socket, error) {}, // error handler }, @@ -30,7 +30,7 @@ Bun.listen({ open(socket) {}, data(socket, data) {}, drain(socket) {}, - close(socket) {}, + close(socket, error) {}, error(socket, error) {}, }, }); @@ -122,7 +122,7 @@ const socket = await Bun.connect({ socket: { data(socket, data) {}, open(socket) {}, - close(socket) {}, + close(socket, error) {}, drain(socket) {}, error(socket, error) {}, diff --git a/packages/bun-types/bun.d.ts b/packages/bun-types/bun.d.ts index 3fed4ec2bc191b..2d154763bbd11a 100644 --- a/packages/bun-types/bun.d.ts +++ b/packages/bun-types/bun.d.ts @@ -6239,7 +6239,7 @@ declare module "bun" { * @param socket */ open?(socket: Socket): void | Promise; - close?(socket: Socket): void | Promise; + close?(socket: Socket, error?: Error): void | Promise; error?(socket: Socket, error: Error): void | Promise; data?(socket: Socket, data: BinaryTypeList[DataBinaryType]): void | Promise; drain?(socket: Socket): void | Promise; diff --git a/src/bun.js/api/bun/socket.zig b/src/bun.js/api/bun/socket.zig index 8669db0dcbc1b1..23660b5f6df24a 100644 --- a/src/bun.js/api/bun/socket.zig +++ b/src/bun.js/api/bun/socket.zig @@ -1220,37 +1220,9 @@ pub const Listener = struct { if (ssl.?.server_name) |s| { server_name = bun.default_allocator.dupe(u8, s[0..bun.len(s)]) catch bun.outOfMemory(); } - uws.NewSocketHandler(true).configure( - socket_context, - true, - *TLSSocket, - struct { - pub const onOpen = NewSocket(true).onOpen; - pub const onClose = NewSocket(true).onClose; - pub const onData = NewSocket(true).onData; - pub const onWritable = NewSocket(true).onWritable; - pub const onTimeout = NewSocket(true).onTimeout; - pub const onConnectError = NewSocket(true).onConnectError; - pub const onEnd = NewSocket(true).onEnd; - pub const onHandshake = NewSocket(true).onHandshake; - }, - ); + uws.NewSocketHandler(true).configure(socket_context, true, *TLSSocket, NewSocket(true)); } else { - uws.NewSocketHandler(false).configure( - socket_context, - true, - *TCPSocket, - struct { - pub const onOpen = NewSocket(false).onOpen; - pub const onClose = NewSocket(false).onClose; - pub const onData = NewSocket(false).onData; - pub const onWritable = NewSocket(false).onWritable; - pub const onTimeout = NewSocket(false).onTimeout; - pub const onConnectError = NewSocket(false).onConnectError; - pub const onEnd = NewSocket(false).onEnd; - pub const onHandshake = NewSocket(false).onHandshake; - }, - ); + uws.NewSocketHandler(false).configure(socket_context, true, *TCPSocket, NewSocket(false)); } default_data.ensureStillAlive(); diff --git a/src/bun.js/bindings/BunProcess.cpp b/src/bun.js/bindings/BunProcess.cpp index 2042aa35aa11dd..a7a3a0ace8169d 100644 --- a/src/bun.js/bindings/BunProcess.cpp +++ b/src/bun.js/bindings/BunProcess.cpp @@ -1958,7 +1958,7 @@ static JSValue constructReportObjectComplete(VM& vm, Zig::GlobalObject* globalOb stackArray->push(globalObject, JSC::jsString(vm, line.toString().trim(isASCIIWhitespace))); }); - javascriptStack->putDirect(vm, JSC::Identifier::fromString(vm, "stack"_s), stackArray, 0); + javascriptStack->putDirect(vm, vm.propertyNames->stack, stackArray, 0); } JSC::JSObject* errorProperties = JSC::constructEmptyObject(globalObject, globalObject->objectPrototype(), 1); diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 93fa2e3290d01a..17d96ba8f4a42d 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -1627,12 +1627,14 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type { // debug("us_socket_shutdown({d})", .{@intFromPtr(this.socket)}); switch (this.socket) { .connected => |socket| { + debug("us_socket_shutdown({d})", .{@intFromPtr(socket)}); return us_socket_shutdown( comptime ssl_int, socket, ); }, .connecting => |socket| { + debug("us_connecting_socket_shutdown({d})", .{@intFromPtr(socket)}); return us_connecting_socket_shutdown( comptime ssl_int, socket, @@ -1651,14 +1653,14 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type { pub fn shutdownRead(this: ThisSocket) void { switch (this.socket) { .connected => |socket| { - // debug("us_socket_shutdown_read({d})", .{@intFromPtr(socket)}); + debug("us_socket_shutdown_read({d})", .{@intFromPtr(socket)}); return us_socket_shutdown_read( comptime ssl_int, socket, ); }, .connecting => |socket| { - // debug("us_connecting_socket_shutdown_read({d})", .{@intFromPtr(socket)}); + debug("us_connecting_socket_shutdown_read({d})", .{@intFromPtr(socket)}); return us_connecting_socket_shutdown_read( comptime ssl_int, socket, @@ -1677,12 +1679,14 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type { pub fn isShutdown(this: ThisSocket) bool { switch (this.socket) { .connected => |socket| { + debug("us_socket_is_shut_down({d})", .{@intFromPtr(socket)}); return us_socket_is_shut_down( comptime ssl_int, socket, ) > 0; }, .connecting => |socket| { + debug("us_connecting_socket_is_shut_down({d})", .{@intFromPtr(socket)}); return us_connecting_socket_is_shut_down( comptime ssl_int, socket, @@ -1709,12 +1713,14 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type { pub fn getError(this: ThisSocket) i32 { switch (this.socket) { .connected => |socket| { + debug("us_socket_get_error({d})", .{@intFromPtr(socket)}); return us_socket_get_error( comptime ssl_int, socket, ); }, .connecting => |socket| { + debug("us_connecting_socket_get_error({d})", .{@intFromPtr(socket)}); return us_connecting_socket_get_error( comptime ssl_int, socket, @@ -1740,6 +1746,7 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type { pub fn localPort(this: ThisSocket) i32 { switch (this.socket) { .connected => |socket| { + debug("us_socket_local_port({d})", .{@intFromPtr(socket)}); return us_socket_local_port( comptime ssl_int, socket, @@ -1751,6 +1758,7 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type { pub fn remoteAddress(this: ThisSocket, buf: [*]u8, length: *i32) void { switch (this.socket) { .connected => |socket| { + debug("us_socket_remote_address({d})", .{@intFromPtr(socket)}); return us_socket_remote_address( comptime ssl_int, socket, diff --git a/src/js/node/net.ts b/src/js/node/net.ts index 63b0fb5691a9b6..3128c3af9c2a80 100644 --- a/src/js/node/net.ts +++ b/src/js/node/net.ts @@ -30,7 +30,7 @@ const { normalizedArgsSymbol, } = require("internal/net"); const { ExceptionWithHostPort } = require("internal/shared"); -import type { SocketListener } from "bun"; +import type { SocketListener, SocketHandler } from "bun"; import type { ServerOpts, Server as ServerType } from "node:net"; const { getTimerDuration } = require("internal/timers"); const { validateFunction, validateNumber, validateAbortSignal } = require("internal/validators"); @@ -89,6 +89,15 @@ const kSetNoDelay = Symbol("kSetNoDelay"); const kSetKeepAlive = Symbol("kSetKeepAlive"); const kSetKeepAliveInitialDelay = Symbol("kSetKeepAliveInitialDelay"); const kConnectOptions = Symbol("connect-options"); +const kAttach = Symbol("kAttach"); +const kCloseRawConnection = Symbol("kCloseRawConnection"); +const kpendingRead = Symbol("kpendingRead"); +const kupgraded = Symbol("kupgraded"); +const ksocket = Symbol("ksocket"); +const khandlers = Symbol("khandlers"); +const kclosed = Symbol("closed"); +const kended = Symbol("ended"); +const kwriteCallback = Symbol("writeCallback"); function endNT(socket, callback, err) { socket.$end(); @@ -109,7 +118,6 @@ function finishSocket(hasError) { detachSocket(this); this.emit("close", hasError); } - function destroyNT(self, err) { self.destroy(err); } @@ -146,7 +154,6 @@ function writeAfterFIN(chunk, encoding, cb) { return false; } - function onConnectEnd() { if (!this._hadError && this.secureConnecting) { const options = this[kConnectOptions]; @@ -161,1034 +168,1009 @@ function onConnectEnd() { this.destroy(error); } } -var SocketClass; -const Socket = (function (InternalSocket) { - SocketClass = InternalSocket; - Object.defineProperty(SocketClass.prototype, Symbol.toStringTag, { - value: "Socket", - enumerable: false, - }); - - function Socket(options) { - return new InternalSocket(options); - } - Socket.prototype = InternalSocket.prototype; - return Object.defineProperty(Socket, Symbol.hasInstance, { - value(instance) { - return instance instanceof InternalSocket; - }, - }); -})( - class Socket extends Duplex { - static #Handlers = { - close: Socket.#Close, - data(socket, buffer) { - const { data: self } = socket; - if (!self) return; - - self.bytesRead += buffer.length; - if (!self.push(buffer)) { - socket.pause(); - } - }, - drain: Socket.#Drain, - end: Socket.#End, - error(socket, error, ignoreHadError) { - const self = socket.data; - if (!self) return; - if (self._hadError && !ignoreHadError) return; - self._hadError = true; - const callback = self.#writeCallback; - if (callback) { - self.#writeCallback = null; - callback(error); - } - self.emit("error", error); - }, - open(socket) { - const self = socket.data; - if (!self) return; - socket.timeout(Math.ceil(self.timeout / 1000)); - - if (self.#unrefOnConnected) socket.unref(); - self._handle = socket; - self.connecting = false; - const options = self[bunTLSConnectOptions]; +const SocketHandlers: SocketHandler = { + close(socket, err) { + const self = socket.data; + if (!self || self[kclosed]) return; + self[kclosed] = true; + //socket cannot be used after close + detachSocket(self); + SocketEmitEndNT(self, err); + self.data = null; + }, + data(socket, buffer) { + const { data: self } = socket; + if (!self) return; - if (options) { - const { session } = options; - if (session) { - self.setSession(session); - } - } + self.bytesRead += buffer.length; + if (!self.push(buffer)) { + socket.pause(); + } + }, + drain(socket) { + const self = socket.data; + if (!self) return; + const callback = self[kwriteCallback]; + self.connecting = false; + if (callback) { + const writeChunk = self._pendingData; + if (socket.$write(writeChunk || "", self._pendingEncoding || "utf8")) { + self._pendingData = self[kwriteCallback] = null; + callback(null); + } else { + self._pendingData = null; + } - if (self[kSetNoDelay]) { - socket.setNoDelay(true); - } + self[kBytesWritten] = socket.bytesWritten; + } + }, + end(socket) { + const self = socket.data; + if (!self) return; - if (self[kSetKeepAlive]) { - socket.setKeepAlive(true, self[kSetKeepAliveInitialDelay]); - } + // we just reuse the same code but we can push null or enqueue right away + SocketEmitEndNT(self); + }, + error(socket, error, ignoreHadError) { + const self = socket.data; + if (!self) return; + if (self._hadError && !ignoreHadError) return; + self._hadError = true; + + const callback = self[kwriteCallback]; + if (callback) { + self[kwriteCallback] = null; + callback(error); + } + self.emit("error", error); + }, + open(socket) { + const self = socket.data; + if (!self) return; + socket.timeout(Math.ceil(self.timeout / 1000)); + + self._handle = socket; + self.connecting = false; + const options = self[bunTLSConnectOptions]; + + if (options) { + const { session } = options; + if (session) { + self.setSession(session); + } + } - if (!self.#upgraded) { - self[kBytesWritten] = socket.bytesWritten; - // this is not actually emitted on nodejs when socket used on the connection - // this is already emmited on non-TLS socket and on TLS socket is emmited secureConnect after handshake - self.emit("connect", self); - self.emit("ready"); - } + if (self[kSetNoDelay]) { + socket.setNoDelay(true); + } - Socket.#Drain(socket); - }, - handshake(socket, success, verifyError) { - const { data: self } = socket; - if (!self) return; - if (!success && verifyError?.code === "ECONNRESET") { - // will be handled in onConnectEnd - return; - } + if (self[kSetKeepAlive]) { + socket.setKeepAlive(true, self[kSetKeepAliveInitialDelay]); + } - self._securePending = false; - self.secureConnecting = false; - self._secureEstablished = !!success; + if (!self[kupgraded]) { + self[kBytesWritten] = socket.bytesWritten; + // this is not actually emitted on nodejs when socket used on the connection + // this is already emmited on non-TLS socket and on TLS socket is emmited secureConnect after handshake + self.emit("connect", self); + self.emit("ready"); + } - self.emit("secure", self); - self.alpnProtocol = socket.alpnProtocol; - const { checkServerIdentity } = self[bunTLSConnectOptions]; - if (!verifyError && typeof checkServerIdentity === "function" && self.servername) { - const cert = self.getPeerCertificate(true); - verifyError = checkServerIdentity(self.servername, cert); - } - if (self._requestCert || self._rejectUnauthorized) { - if (verifyError) { - self.authorized = false; - self.authorizationError = verifyError.code || verifyError.message; - if (self._rejectUnauthorized) { - self.destroy(verifyError); - return; - } - } else { - self.authorized = true; - } - } else { - self.authorized = true; - } - self.emit("secureConnect", verifyError); - self.removeListener("end", onConnectEnd); - }, - timeout(socket) { - const self = socket.data; - if (!self) return; + SocketHandlers.drain(socket); + }, + handshake(socket, success, verifyError) { + const { data: self } = socket; + if (!self) return; + if (!success && verifyError?.code === "ECONNRESET") { + // will be handled in onConnectEnd + return; + } - self.emit("timeout", self); - }, - binaryType: "buffer", - }; - static #End(socket) { - const self = socket.data; - if (!self) return; + self._securePending = false; + self.secureConnecting = false; + self._secureEstablished = !!success; - // we just reuse the same code but we can push null or enqueue right away - Socket.#EmitEndNT(self); + self.emit("secure", self); + self.alpnProtocol = socket.alpnProtocol; + const { checkServerIdentity } = self[bunTLSConnectOptions]; + if (!verifyError && typeof checkServerIdentity === "function" && self.servername) { + const cert = self.getPeerCertificate(true); + verifyError = checkServerIdentity(self.servername, cert); } - static #EmitEndNT(self, err) { - if (!self.#ended) { - if (!self.allowHalfOpen) { - self.write = writeAfterFIN; + if (self._requestCert || self._rejectUnauthorized) { + if (verifyError) { + self.authorized = false; + self.authorizationError = verifyError.code || verifyError.message; + if (self._rejectUnauthorized) { + self.destroy(verifyError); + return; } - self.#ended = true; - self.push(null); + } else { + self.authorized = true; } - // TODO: check how the best way to handle this - // if (err) { - // self.destroy(err); - // } - } - static #Close(socket, err) { - const self = socket.data; - if (!self || self.#closed) return; - self.#closed = true; - //socket cannot be used after close - detachSocket(self); - Socket.#EmitEndNT(self, err); - self.data = null; + } else { + self.authorized = true; } - static #Drain(socket) { - const self = socket.data; - if (!self) return; - const callback = self.#writeCallback; - self.connecting = false; - if (callback) { - const writeChunk = self._pendingData; - if (socket.$write(writeChunk || "", self._pendingEncoding || "utf8")) { - self._pendingData = self.#writeCallback = null; - callback(null); - } else { - self._pendingData = null; - } + self.emit("secureConnect", verifyError); + self.removeListener("end", onConnectEnd); + }, + timeout(socket) { + const self = socket.data; + if (!self) return; - self[kBytesWritten] = socket.bytesWritten; - } - } - static [bunSocketServerHandlers] = { - data: Socket.#Handlers.data, - close(socket, err) { - const data = this.data; - if (!data) return; - - data.server[bunSocketServerConnections]--; - { - if (!data.#closed) { - data.#closed = true; - //socket cannot be used after close - detachSocket(data); - Socket.#EmitEndNT(data, err); - data.data = null; - } - } + self.emit("timeout", self); + }, + binaryType: "buffer", +}; - data.server._emitCloseIfDrained(); - }, - end(socket) { - Socket.#Handlers.end(socket); - }, - open(socket) { - const self = this.data; - socket[kServerSocket] = self._handle; - const options = self[bunSocketServerOptions]; - const { pauseOnConnect, connectionListener, InternalSocketClass, requestCert, rejectUnauthorized } = options; - const _socket = new InternalSocketClass({}); - _socket.isServer = true; - _socket.server = self; - _socket._requestCert = requestCert; - _socket._rejectUnauthorized = rejectUnauthorized; - - _socket.#attach(this.localPort, socket); - if (self.maxConnections && self[bunSocketServerConnections] >= self.maxConnections) { - const data = { - localAddress: _socket.localAddress, - localPort: _socket.localPort || this.localPort, - localFamily: _socket.localFamily, - remoteAddress: _socket.remoteAddress, - remotePort: _socket.remotePort, - remoteFamily: _socket.remoteFamily || "IPv4", - }; - - socket.end(); - - self.emit("drop", data); - return; - } +const SocketEmitEndNT = (self, err?) => { + if (!self[kended]) { + if (!self.allowHalfOpen) { + self.write = writeAfterFIN; + } + self[kended] = true; + self.push(null); + } + // TODO: check how the best way to handle this + // if (err) { + // self.destroy(err); + // } +}; - const bunTLS = _socket[bunTlsSymbol]; - const isTLS = typeof bunTLS === "function"; +const ServerHandlers: SocketHandler = { + data(socket, buffer) { + const { data: self } = socket; + if (!self) return; - self[bunSocketServerConnections]++; + self.bytesRead += buffer.length; + if (!self.push(buffer)) { + socket.pause(); + } + }, + close(socket, err) { + const data = this.data; + if (!data) return; + + data.server[bunSocketServerConnections]--; + { + if (!data[kclosed]) { + data[kclosed] = true; + //socket cannot be used after close + detachSocket(data); + SocketEmitEndNT(data, err); + data.data = null; + } + } - if (typeof connectionListener === "function") { - this.pauseOnConnect = pauseOnConnect; - if (!isTLS) { - connectionListener.$call(self, _socket); - } - } - self.emit("connection", _socket); - // the duplex implementation start paused, so we resume when pauseOnConnect is falsy - if (!pauseOnConnect && !isTLS) { - _socket.resume(); - } - }, + data.server._emitCloseIfDrained(); + }, + end(socket) { + SocketHandlers.end(socket); + }, + open(socket) { + const self = this.data; + socket[kServerSocket] = self._handle; + const options = self[bunSocketServerOptions]; + const { pauseOnConnect, connectionListener, InternalSocketClass, requestCert, rejectUnauthorized } = options; + const _socket = new InternalSocketClass({}); + _socket.isServer = true; + _socket.server = self; + _socket._requestCert = requestCert; + _socket._rejectUnauthorized = rejectUnauthorized; + + _socket[kAttach](this.localPort, socket); + if (self.maxConnections && self[bunSocketServerConnections] >= self.maxConnections) { + const data = { + localAddress: _socket.localAddress, + localPort: _socket.localPort || this.localPort, + localFamily: _socket.localFamily, + remoteAddress: _socket.remoteAddress, + remotePort: _socket.remotePort, + remoteFamily: _socket.remoteFamily || "IPv4", + }; - handshake(socket, success, verifyError) { - const { data: self } = socket; - if (!success && verifyError?.code === "ECONNRESET") { - const err = new ConnResetException("socket hang up"); - self.emit("_tlsError", err); - self.server.emit("tlsClientError", err, self); - self._hadError = true; - // error before handshake on the server side will only be emitted using tlsClientError - self.destroy(); - return; - } - self._securePending = false; - self.secureConnecting = false; - self._secureEstablished = !!success; - self.servername = socket.getServername(); - const server = self.server; - self.alpnProtocol = socket.alpnProtocol; - if (self._requestCert || self._rejectUnauthorized) { - if (verifyError) { - self.authorized = false; - self.authorizationError = verifyError.code || verifyError.message; - server.emit("tlsClientError", verifyError, self); - if (self._rejectUnauthorized) { - // if we reject we still need to emit secure - self.emit("secure", self); - self.destroy(verifyError); - return; - } - } else { - self.authorized = true; - } - } else { - self.authorized = true; - } - const connectionListener = server[bunSocketServerOptions]?.connectionListener; - if (typeof connectionListener === "function") { - connectionListener.$call(server, self); - } - server.emit("secureConnection", self); - // after secureConnection event we emmit secure and secureConnect - self.emit("secure", self); - self.emit("secureConnect", verifyError); - if (!server.pauseOnConnect) { - self.resume(); - } - }, - error(socket, error) { - const data = this.data; - if (!data) return; - - if (data._hadError) return; - data._hadError = true; - const bunTLS = this[bunTlsSymbol]; - - if (typeof bunTLS === "function") { - // Destroy socket if error happened before handshake's finish - if (!data._secureEstablished) { - data.destroy(error); - } else if ( - data.isServer && - data._rejectUnauthorized && - /peer did not return a certificate/.test(error?.message) - ) { - // Ignore server's authorization errors - data.destroy(); - } else { - // Emit error - data._emitTLSError(error); - this.emit("_tlsError", error); - this.server.emit("tlsClientError", error, data); - Socket.#Handlers.error(socket, error, true); - return; - } - } - Socket.#Handlers.error(socket, error, true); - data.server.emit("clientError", error, data); - }, - timeout: Socket.#Handlers.timeout, - drain: Socket.#Handlers.drain, - binaryType: "buffer", - }; + socket.end(); - bytesRead = 0; - [kBytesWritten] = undefined; - #closed = false; - #ended = false; - connecting = false; - localAddress = "127.0.0.1"; - remotePort; - [bunTLSConnectOptions] = null; - timeout = 0; - #writeCallback; - _pendingData; - _pendingEncoding; // for compatibility - #pendingRead; - _hadError = false; - isServer = false; - _handle = null; - _parent; - _parentWrap; - #socket; - server; - pauseOnConnect = false; - #upgraded; - #unrefOnConnected = false; - - #handlers = Socket.#Handlers; - [kSetNoDelay]; - [kSetKeepAlive]; - [kSetKeepAliveInitialDelay]; - constructor(options) { - const { - socket, - signal, - write, - read, - allowHalfOpen = false, - onread = null, - noDelay = false, - keepAlive = false, - keepAliveInitialDelay = 0, - ...opts - } = options || {}; - - if (options?.objectMode) - throw $ERR_INVALID_ARG_VALUE("options.objectMode", options.objectMode, "is not supported"); - if (options?.readableObjectMode) - throw $ERR_INVALID_ARG_VALUE("options.readableObjectMode", options.readableObjectMode, "is not supported"); - if (options?.writableObjectMode) - throw $ERR_INVALID_ARG_VALUE("options.writableObjectMode", options.writableObjectMode, "is not supported"); - - super({ - ...opts, - allowHalfOpen, - readable: true, - writable: true, - //For node.js compat do not emit close on destroy. - emitClose: false, - autoDestroy: true, - // Handle strings directly. - decodeStrings: false, - }); - this._parent = this; - this._parentWrap = this; - this.#pendingRead = undefined; - this.#upgraded = null; + self.emit("drop", data); + return; + } - this[kSetNoDelay] = Boolean(noDelay); - this[kSetKeepAlive] = Boolean(keepAlive); - this[kSetKeepAliveInitialDelay] = ~~(keepAliveInitialDelay / 1000); + const bunTLS = _socket[bunTlsSymbol]; + const isTLS = typeof bunTLS === "function"; - // Shut down the socket when we're finished with it. - this.on("end", onSocketEnd); + self[bunSocketServerConnections]++; - if (socket instanceof Socket) { - this.#socket = socket; - } - if (onread) { - if (typeof onread !== "object") { - throw new TypeError("onread must be an object"); - } - if (typeof onread.callback !== "function") { - throw new TypeError("onread.callback must be a function"); - } - // when the onread option is specified we use a different handlers object - this.#handlers = { - ...Socket.#Handlers, - data({ data: self }, buffer) { - if (!self) return; - try { - onread.callback(buffer.length, buffer); - } catch (e) { - self.emit("error", e); - } - }, - }; - } - if (signal) { - if (signal.aborted) { - process.nextTick(destroyNT, this, signal.reason); - } else { - signal.addEventListener("abort", destroyWhenAborted.bind(this)); - } + if (typeof connectionListener === "function") { + this.pauseOnConnect = pauseOnConnect; + if (!isTLS) { + connectionListener.$call(self, _socket); } } - - address() { - return { - address: this.localAddress, - family: this.localFamily, - port: this.localPort, - }; + self.emit("connection", _socket); + // the duplex implementation start paused, so we resume when pauseOnConnect is falsy + if (!pauseOnConnect && !isTLS) { + _socket.resume(); } + }, - get bufferSize() { - return this.writableLength; + handshake(socket, success, verifyError) { + const { data: self } = socket; + if (!success && verifyError?.code === "ECONNRESET") { + const err = new ConnResetException("socket hang up"); + self.emit("_tlsError", err); + self.server.emit("tlsClientError", err, self); + self._hadError = true; + // error before handshake on the server side will only be emitted using tlsClientError + self.destroy(); + return; } - - get _bytesDispatched() { - return this[kBytesWritten] || 0; + self._securePending = false; + self.secureConnecting = false; + self._secureEstablished = !!success; + self.servername = socket.getServername(); + const server = self.server; + self.alpnProtocol = socket.alpnProtocol; + if (self._requestCert || self._rejectUnauthorized) { + if (verifyError) { + self.authorized = false; + self.authorizationError = verifyError.code || verifyError.message; + server.emit("tlsClientError", verifyError, self); + if (self._rejectUnauthorized) { + // if we reject we still need to emit secure + self.emit("secure", self); + self.destroy(verifyError); + return; + } + } else { + self.authorized = true; + } + } else { + self.authorized = true; + } + const connectionListener = server[bunSocketServerOptions]?.connectionListener; + if (typeof connectionListener === "function") { + connectionListener.$call(server, self); } + server.emit("secureConnection", self); + // after secureConnection event we emmit secure and secureConnect + self.emit("secure", self); + self.emit("secureConnect", verifyError); + if (!server.pauseOnConnect) { + self.resume(); + } + }, + error(socket, error) { + const data = this.data; + if (!data) return; - get bytesWritten() { - let bytes = this[kBytesWritten] || 0; - const data = this._pendingData; - const writableBuffer = this.writableBuffer; - if (!writableBuffer) return undefined; + if (data._hadError) return; + data._hadError = true; + const bunTLS = this[bunTlsSymbol]; - for (const el of writableBuffer) { - bytes += el.chunk instanceof Buffer ? el.chunk.length : Buffer.byteLength(el.chunk, el.encoding); + if (typeof bunTLS === "function") { + // Destroy socket if error happened before handshake's finish + if (!data._secureEstablished) { + data.destroy(error); + } else if ( + data.isServer && + data._rejectUnauthorized && + /peer did not return a certificate/.test(error?.message) + ) { + // Ignore server's authorization errors + data.destroy(); + } else { + // Emit error + data._emitTLSError(error); + this.emit("_tlsError", error); + this.server.emit("tlsClientError", error, data); + SocketHandlers.error(socket, error, true); + return; } + } + SocketHandlers.error(socket, error, true); + data.server.emit("clientError", error, data); + }, + timeout: SocketHandlers.timeout, + drain: SocketHandlers.drain, + binaryType: "buffer", +}; - if ($isArray(data)) { - // Was a writev, iterate over chunks to get total length - for (let i = 0; i < data.length; i++) { - const chunk = data[i]; - - if (data.allBuffers || chunk instanceof Buffer) bytes += chunk.length; - else bytes += Buffer.byteLength(chunk.chunk, chunk.encoding); +function Socket(options?) { + const { + socket, + signal, + write, + read, + allowHalfOpen = false, + onread = null, + noDelay = false, + keepAlive = false, + keepAliveInitialDelay = 0, + ...opts + } = options || {}; + + if (options?.objectMode) throw $ERR_INVALID_ARG_VALUE("options.objectMode", options.objectMode, "is not supported"); + if (options?.readableObjectMode) + throw $ERR_INVALID_ARG_VALUE("options.readableObjectMode", options.readableObjectMode, "is not supported"); + if (options?.writableObjectMode) + throw $ERR_INVALID_ARG_VALUE("options.writableObjectMode", options.writableObjectMode, "is not supported"); + + Duplex.$call(this, { + ...opts, + allowHalfOpen, + readable: true, + writable: true, + //For node.js compat do not emit close on destroy. + emitClose: false, + autoDestroy: true, + // Handle strings directly. + decodeStrings: false, + }); + this._parent = this; + this._parentWrap = this; + this[kpendingRead] = undefined; + this[kupgraded] = null; + + this[kSetNoDelay] = Boolean(noDelay); + this[kSetKeepAlive] = Boolean(keepAlive); + this[kSetKeepAliveInitialDelay] = ~~(keepAliveInitialDelay / 1000); + + this[khandlers] = SocketHandlers; + this.bytesRead = 0; + this[kBytesWritten] = undefined; + this[kclosed] = false; + this[kended] = false; + this.connecting = false; + this.localAddress = "127.0.0.1"; + this.remotePort = undefined; + this[bunTLSConnectOptions] = null; + this.timeout = 0; + this[kwriteCallback] = undefined; + this._pendingData = undefined; + this._pendingEncoding = undefined; // for compatibility + this[kpendingRead] = undefined; + this._hadError = false; + this.isServer = false; + this._handle = null; + this._parent = undefined; + this._parentWrap = undefined; + this[ksocket] = undefined; + this.server = undefined; + this.pauseOnConnect = false; + this[kupgraded] = undefined; + + // Shut down the socket when we're finished with it. + this.on("end", onSocketEnd); + + if (socket instanceof Socket) { + this[ksocket] = socket; + } + if (onread) { + if (typeof onread !== "object") { + throw new TypeError("onread must be an object"); + } + if (typeof onread.callback !== "function") { + throw new TypeError("onread.callback must be a function"); + } + // when the onread option is specified we use a different handlers object + this[khandlers] = { + ...SocketHandlers, + data({ data: self }, buffer) { + if (!self) return; + try { + onread.callback(buffer.length, buffer); + } catch (e) { + self.emit("error", e); } - } else if (data) { - bytes += data.byteLength; - } - return bytes; + }, + }; + } + if (signal) { + if (signal.aborted) { + process.nextTick(destroyNT, this, signal.reason); + } else { + signal.addEventListener("abort", destroyWhenAborted.bind(this)); } + } +} +$toClass(Socket, "Socket", Duplex); - #attach(port, socket) { - this.remotePort = port; - socket.data = this; - socket.timeout(Math.ceil(this.timeout / 1000)); - if (this.#unrefOnConnected) socket.unref(); - this._handle = socket; - this.connecting = false; +Socket.prototype.address = function address() { + return { + address: this.localAddress, + family: this.localFamily, + port: this.localPort, + }; +}; - if (this[kSetNoDelay]) { - socket.setNoDelay(true); - } +Object.defineProperty(Socket.prototype, "bufferSize", { + get: function () { + return this.writableLength; + }, +}); - if (this[kSetKeepAlive]) { - socket.setKeepAlive(true, self[kSetKeepAliveInitialDelay]); - } +Object.defineProperty(Socket.prototype, "_bytesDispatched", { + get: function () { + return this[kBytesWritten] || 0; + }, +}); - if (!this.#upgraded) { - this[kBytesWritten] = socket.bytesWritten; - // this is not actually emitted on nodejs when socket used on the connection - // this is already emmited on non-TLS socket and on TLS socket is emmited secureConnect after handshake - this.emit("connect", this); - this.emit("ready"); - } - Socket.#Drain(socket); +Object.defineProperty(Socket.prototype, "bytesWritten", { + get: function () { + let bytes = this[kBytesWritten] || 0; + const data = this._pendingData; + const writableBuffer = this.writableBuffer; + if (!writableBuffer) return undefined; + for (const el of writableBuffer) { + bytes += el.chunk instanceof Buffer ? el.chunk.length : Buffer.byteLength(el.chunk, el.encoding); } - - #closeRawConnection() { - const connection = this.#upgraded; - connection.connecting = false; - connection._handle = null; - connection.unref(); - connection.destroy(); + if ($isArray(data)) { + // Was a writev, iterate over chunks to get total length + for (let i = 0; i < data.length; i++) { + const chunk = data[i]; + if (data.allBuffers || chunk instanceof Buffer) bytes += chunk.length; + else bytes += Buffer.byteLength(chunk.chunk, chunk.encoding); + } + } else if (data) { + bytes += data.byteLength; } + return bytes; + }, +}); - public connect(...args) { - const [options, connectListener] = - $isArray(args[0]) && args[0][normalizedArgsSymbol] - ? // args have already been normalized. - // Normalized array is passed as the first and only argument. - ($assert(args[0].length == 2 && typeof args[0][0] === "object"), args[0]) - : normalizeArgs(args); +Socket.prototype[kAttach] = function (port, socket) { + this.remotePort = port; + socket.data = this; + socket.timeout(Math.ceil(this.timeout / 1000)); + this._handle = socket; + this.connecting = false; - let connection = this.#socket; + if (this[kSetNoDelay]) { + socket.setNoDelay(true); + } - let upgradeDuplex = false; + if (this[kSetKeepAlive]) { + socket.setKeepAlive(true, self[kSetKeepAliveInitialDelay]); + } - let { - fd, - port, - host, - path, - socket, - localAddress, - localPort, - // TODOs - family, - hints, - lookup, - noDelay, - keepAlive, - keepAliveInitialDelay, - requestCert, - rejectUnauthorized, - pauseOnConnect, - servername, - checkServerIdentity, - session, - } = options; - - if (localAddress && !isIP(localAddress)) { - throw $ERR_INVALID_IP_ADDRESS(localAddress); - } - if (localPort) { - validateNumber(localPort, "options.localPort"); - } + if (!this[kupgraded]) { + this[kBytesWritten] = socket.bytesWritten; + // this is not actually emitted on nodejs when socket used on the connection + // this is already emmited on non-TLS socket and on TLS socket is emmited secureConnect after handshake + this.emit("connect", this); + this.emit("ready"); + } + SocketHandlers.drain(socket); +}; - this.servername = servername; +Socket.prototype[kCloseRawConnection] = function () { + const connection = this[kupgraded]; + connection.connecting = false; + connection._handle = null; + connection.unref(); + connection.destroy(); +}; - if (socket) { - connection = socket; - } - if (fd) { - bunConnect({ - data: this, - fd: fd, - socket: this.#handlers, - allowHalfOpen: this.allowHalfOpen, - }).catch(error => { - if (!this.destroyed) { - this.emit("error", error); - this.emit("close"); - } - }); +Socket.prototype.connect = function connect(...args) { + const [options, connectListener] = + $isArray(args[0]) && args[0][normalizedArgsSymbol] + ? // args have already been normalized. + // Normalized array is passed as the first and only argument. + ($assert(args[0].length == 2 && typeof args[0][0] === "object"), args[0]) + : normalizeArgs(args); + let connection = this[ksocket]; + let upgradeDuplex = false; + let { + fd, + port, + host, + path, + socket, + localAddress, + localPort, + // TODOs + family, + hints, + lookup, + noDelay, + keepAlive, + keepAliveInitialDelay, + requestCert, + rejectUnauthorized, + pauseOnConnect, + servername, + checkServerIdentity, + session, + } = options; + if (localAddress && !isIP(localAddress)) { + throw $ERR_INVALID_IP_ADDRESS(localAddress); + } + if (localPort) { + validateNumber(localPort, "options.localPort"); + } + this.servername = servername; + if (socket) { + connection = socket; + } + if (fd) { + bunConnect({ + data: this, + fd: fd, + socket: this[khandlers], + allowHalfOpen: this.allowHalfOpen, + }).catch(error => { + if (!this.destroyed) { + this.emit("error", error); + this.emit("close"); } - - this.pauseOnConnect = pauseOnConnect; - if (!pauseOnConnect) { - process.nextTick(() => { - this.resume(); - }); - this.connecting = true; + }); + } + this.pauseOnConnect = pauseOnConnect; + if (!pauseOnConnect) { + process.nextTick(() => { + this.resume(); + }); + this.connecting = true; + } + if (fd) { + return this; + } + if ( + // TLSSocket already created a socket and is forwarding it here. This is a private API. + !(socket && $isObject(socket) && socket instanceof Duplex) && + // public api for net.Socket.connect + port === undefined && + path == null + ) { + throw $ERR_MISSING_ARGS(["options", "port", "path"]); + } + this.remotePort = port; + const bunTLS = this[bunTlsSymbol]; + var tls = undefined; + if (typeof bunTLS === "function") { + tls = bunTLS.$call(this, port, host, true); + // Client always request Cert + this._requestCert = true; + if (tls) { + if (typeof rejectUnauthorized !== "undefined") { + this._rejectUnauthorized = rejectUnauthorized; + tls.rejectUnauthorized = rejectUnauthorized; + } else { + this._rejectUnauthorized = tls.rejectUnauthorized; } - - if (fd) { - return this; + tls.requestCert = true; + tls.session = session || tls.session; + this.servername = tls.servername; + tls.checkServerIdentity = checkServerIdentity || tls.checkServerIdentity; + this[bunTLSConnectOptions] = tls; + if (!connection && tls.socket) { + connection = tls.socket; } - + } + if (connection) { if ( - // TLSSocket already created a socket and is forwarding it here. This is a private API. - !(socket && $isObject(socket) && socket instanceof Duplex) && - // public api for net.Socket.connect - port === undefined && - path == null + typeof connection !== "object" || + !(connection instanceof Socket) || + typeof connection[bunTlsSymbol] === "function" ) { - throw $ERR_MISSING_ARGS(["options", "port", "path"]); + if (connection instanceof Duplex) { + upgradeDuplex = true; + } else { + throw new TypeError("socket must be an instance of net.Socket or Duplex"); + } } - - this.remotePort = port; - - const bunTLS = this[bunTlsSymbol]; - var tls = undefined; - - if (typeof bunTLS === "function") { - tls = bunTLS.$call(this, port, host, true); - // Client always request Cert - this._requestCert = true; - - if (tls) { - if (typeof rejectUnauthorized !== "undefined") { - this._rejectUnauthorized = rejectUnauthorized; - tls.rejectUnauthorized = rejectUnauthorized; + } + this.authorized = false; + this.secureConnecting = true; + this._secureEstablished = false; + this._securePending = true; + if (connectListener) this.on("secureConnect", connectListener); + this[kConnectOptions] = options; + this.prependListener("end", onConnectEnd); + } else if (connectListener) this.on("connect", connectListener); + // start using existing connection + try { + // reset the underlying writable object when establishing a new connection + // this is a function on `Duplex`, originally defined on `Writable` + // https://github.com/nodejs/node/blob/c5cfdd48497fe9bd8dbd55fd1fca84b321f48ec1/lib/net.js#L311 + // https://github.com/nodejs/node/blob/c5cfdd48497fe9bd8dbd55fd1fca84b321f48ec1/lib/net.js#L1126 + this._undestroy(); + if (connection) { + const socket = connection._handle; + if (!upgradeDuplex && socket) { + // if is named pipe socket we can upgrade it using the same wrapper than we use for duplex + upgradeDuplex = isNamedPipeSocket(socket); + } + if (upgradeDuplex) { + this.connecting = true; + this[kupgraded] = connection; + const [result, events] = upgradeDuplexToTLS(connection, { + data: this, + tls, + socket: this[khandlers], + }); + connection.on("data", events[0]); + connection.on("end", events[1]); + connection.on("drain", events[2]); + connection.on("close", events[3]); + this._handle = result; + } else { + if (socket) { + this.connecting = true; + this[kupgraded] = connection; + const result = socket.upgradeTLS({ + data: this, + tls, + socket: this[khandlers], + }); + if (result) { + const [raw, tls] = result; + // replace socket + connection._handle = raw; + this.once("end", this[kCloseRawConnection]); + raw.connecting = false; + this._handle = tls; } else { - this._rejectUnauthorized = tls.rejectUnauthorized; - } - tls.requestCert = true; - tls.session = session || tls.session; - this.servername = tls.servername; - tls.checkServerIdentity = checkServerIdentity || tls.checkServerIdentity; - this[bunTLSConnectOptions] = tls; - if (!connection && tls.socket) { - connection = tls.socket; + this._handle = null; + throw new Error("Invalid socket"); } - } - if (connection) { - if ( - typeof connection !== "object" || - !(connection instanceof Socket) || - typeof connection[bunTlsSymbol] === "function" - ) { - if (connection instanceof Duplex) { - upgradeDuplex = true; - } else { - throw new TypeError("socket must be an instance of net.Socket or Duplex"); + } else { + // wait to be connected + connection.once("connect", () => { + const socket = connection._handle; + if (!upgradeDuplex && socket) { + // if is named pipe socket we can upgrade it using the same wrapper than we use for duplex + upgradeDuplex = isNamedPipeSocket(socket); } - } - } - this.authorized = false; - this.secureConnecting = true; - this._secureEstablished = false; - this._securePending = true; - - if (connectListener) this.on("secureConnect", connectListener); - this[kConnectOptions] = options; - - this.prependListener("end", onConnectEnd); - } else if (connectListener) this.on("connect", connectListener); - - // start using existing connection - try { - // reset the underlying writable object when establishing a new connection - // this is a function on `Duplex`, originally defined on `Writable` - // https://github.com/nodejs/node/blob/c5cfdd48497fe9bd8dbd55fd1fca84b321f48ec1/lib/net.js#L311 - // https://github.com/nodejs/node/blob/c5cfdd48497fe9bd8dbd55fd1fca84b321f48ec1/lib/net.js#L1126 - this._undestroy(); - - if (connection) { - const socket = connection._handle; - if (!upgradeDuplex && socket) { - // if is named pipe socket we can upgrade it using the same wrapper than we use for duplex - upgradeDuplex = isNamedPipeSocket(socket); - } - if (upgradeDuplex) { - this.connecting = true; - this.#upgraded = connection; - const [result, events] = upgradeDuplexToTLS(connection, { - data: this, - tls, - socket: this.#handlers, - }); - - connection.on("data", events[0]); - connection.on("end", events[1]); - connection.on("drain", events[2]); - connection.on("close", events[3]); - - this._handle = result; - } else { - if (socket) { + if (upgradeDuplex) { + this.connecting = true; + this[kupgraded] = connection; + const [result, events] = upgradeDuplexToTLS(connection, { + data: this, + tls, + socket: this[khandlers], + }); + connection.on("data", events[0]); + connection.on("end", events[1]); + connection.on("drain", events[2]); + connection.on("close", events[3]); + this._handle = result; + } else { this.connecting = true; - this.#upgraded = connection; + this[kupgraded] = connection; const result = socket.upgradeTLS({ data: this, tls, - socket: this.#handlers, + socket: this[khandlers], }); if (result) { const [raw, tls] = result; // replace socket connection._handle = raw; - this.once("end", this.#closeRawConnection); + this.once("end", this[kCloseRawConnection]); raw.connecting = false; this._handle = tls; } else { this._handle = null; throw new Error("Invalid socket"); } - } else { - // wait to be connected - connection.once("connect", () => { - const socket = connection._handle; - if (!upgradeDuplex && socket) { - // if is named pipe socket we can upgrade it using the same wrapper than we use for duplex - upgradeDuplex = isNamedPipeSocket(socket); - } - if (upgradeDuplex) { - this.connecting = true; - this.#upgraded = connection; - - const [result, events] = upgradeDuplexToTLS(connection, { - data: this, - tls, - socket: this.#handlers, - }); - - connection.on("data", events[0]); - connection.on("end", events[1]); - connection.on("drain", events[2]); - connection.on("close", events[3]); - - this._handle = result; - } else { - this.connecting = true; - this.#upgraded = connection; - const result = socket.upgradeTLS({ - data: this, - tls, - socket: this.#handlers, - }); - - if (result) { - const [raw, tls] = result; - // replace socket - connection._handle = raw; - this.once("end", this.#closeRawConnection); - raw.connecting = false; - this._handle = tls; - } else { - this._handle = null; - throw new Error("Invalid socket"); - } - } - }); - } - } - } else if (path) { - // start using unix socket - bunConnect({ - data: this, - unix: path, - socket: this.#handlers, - tls, - allowHalfOpen: this.allowHalfOpen, - }).catch(error => { - if (!this.destroyed) { - this.emit("error", error); - this.emit("close"); - } - }); - } else { - // default start - bunConnect({ - data: this, - hostname: host || "localhost", - port: port, - socket: this.#handlers, - tls, - allowHalfOpen: this.allowHalfOpen, - }).catch(error => { - if (!this.destroyed) { - this.emit("error", error); - this.emit("close"); } }); } - } catch (error) { - process.nextTick(emitErrorAndCloseNextTick, this, error); } - return this; + } else if (path) { + // start using unix socket + bunConnect({ + data: this, + unix: path, + socket: this[khandlers], + tls, + allowHalfOpen: this.allowHalfOpen, + }).catch(error => { + if (!this.destroyed) { + this.emit("error", error); + this.emit("close"); + } + }); + } else { + // default start + bunConnect({ + data: this, + hostname: host || "localhost", + port: port, + socket: this[khandlers], + tls, + allowHalfOpen: this.allowHalfOpen, + }).catch(error => { + if (!this.destroyed) { + this.emit("error", error); + this.emit("close"); + } + }); } + } catch (error) { + process.nextTick(emitErrorAndCloseNextTick, this, error); + } + return this; +}; - end(...args) { - if (!this._readableState.endEmitted) { - this.secureConnecting = false; - } - return super.end(...args); - } +Socket.prototype.end = function end(...args) { + if (!this._readableState.endEmitted) { + this.secureConnecting = false; + } + return Duplex.prototype.end.$apply(this, args); +}; - _destroy(err, callback) { - this.connecting = false; - const { ending } = this._writableState; +Socket.prototype._destroy = function _destroy(err, callback) { + this.connecting = false; + const { ending } = this._writableState; - // lets make sure that the writable side is closed - if (!ending) { - // at this state destroyed will be true but we need to close the writable side - this._writableState.destroyed = false; - this.end(); + // lets make sure that the writable side is closed + if (!ending) { + // at this state destroyed will be true but we need to close the writable side + this._writableState.destroyed = false; + this.end(); - // we now restore the destroyed flag - this._writableState.destroyed = true; - } + // we now restore the destroyed flag + this._writableState.destroyed = true; + } - detachSocket(self); - callback(err); - process.nextTick(emitCloseNT, this, !!err); - } + detachSocket(self); + callback(err); + process.nextTick(emitCloseNT, this, !!err); +}; - _final(callback) { - if (this.connecting) { - return this.once("connect", () => this._final(callback)); - } - const socket = this._handle; +Socket.prototype._final = function _final(callback) { + if (this.connecting) { + return this.once("connect", () => this._final(callback)); + } + const socket = this._handle; - // already closed call destroy - if (!socket) return callback(); + // already closed call destroy + if (!socket) return callback(); - // emit FIN allowHalfOpen only allow the readable side to close first - process.nextTick(endNT, socket, callback); - } + // emit FIN allowHalfOpen only allow the readable side to close first + process.nextTick(endNT, socket, callback); +}; - get localFamily() { - return "IPv4"; - } +Object.defineProperty(Socket.prototype, "localFamily", { + get: function () { + return "IPv4"; + }, +}); - get localPort() { - return this._handle?.localPort; - } - get _connecting() { - return this.connecting; - } +Object.defineProperty(Socket.prototype, "localPort", { + get: function () { + return this._handle?.localPort; + }, +}); - get pending() { - return !this._handle || this.connecting; - } +Object.defineProperty(Socket.prototype, "_connecting", { + get: function () { + return this.connecting; + }, +}); - resume() { - if (!this.connecting) { - this._handle?.resume(); - } - return super.resume(); - } - pause() { - if (!this.destroyed) { - this._handle?.pause(); - } - return super.pause(); - } - read(size) { - if (!this.connecting) { - this._handle?.resume(); - } - return super.read(size); - } +Object.defineProperty(Socket.prototype, "pending", { + get: function () { + return !this._handle || this.connecting; + }, +}); - _read(size) { - const socket = this._handle; - if (this.connecting || !socket) { - this.once("connect", () => this._read(size)); - } else { - socket?.resume(); - } - } +Socket.prototype.resume = function resume() { + if (!this.connecting) { + this._handle?.resume(); + } + return Duplex.prototype.resume.$call(this); +}; - _reset() { - this.resetAndClosing = true; - return this.destroy(); - } +Socket.prototype.pause = function pause() { + if (!this.destroyed) { + this._handle?.pause(); + } + return Duplex.prototype.pause.$call(this); +}; - get readyState() { - if (this.connecting) return "opening"; - if (this.readable) { - return this.writable ? "open" : "readOnly"; - } else { - return this.writable ? "writeOnly" : "closed"; - } - } +Socket.prototype.read = function read(size) { + if (!this.connecting) { + this._handle?.resume(); + } + return Duplex.prototype.read.$call(this, size); +}; - ref() { - const socket = this._handle; - if (!socket) { - this.#unrefOnConnected = false; - return this; - } - socket.ref(); - return this; - } +Socket.prototype._read = function _read(size) { + const socket = this._handle; + if (this.connecting || !socket) { + this.once("connect", () => this._read(size)); + } else { + socket?.resume(); + } +}; - get remoteAddress() { - return this._handle?.remoteAddress; - } +Socket.prototype._reset = function _reset() { + this.resetAndClosing = true; + return this.destroy(); +}; - get remoteFamily() { - return "IPv4"; - } +Object.defineProperty(Socket.prototype, "readyState", { + get: function () { + if (this.connecting) return "opening"; + if (this.readable && this.writable) return "open"; + if (this.readable && !this.writable) return "readOnly"; + if (!this.readable && this.writable) return "writeOnly"; + return "closed"; + }, +}); - resetAndDestroy() { - if (this._handle) { - if (this.connecting) { - this.once("connect", () => this._reset()); - } else { - this._reset(); - } - } else { - this.destroy($ERR_SOCKET_CLOSED()); - } - return this; +Socket.prototype.ref = function ref() { + const socket = this._handle; + if (!socket) { + this.once("connect", this.ref); + return this; + } + socket.ref(); + return this; +}; + +Object.defineProperty(Socket.prototype, "remoteAddress", { + get: function () { + return this._handle?.remoteAddress; + }, +}); + +Object.defineProperty(Socket.prototype, "remoteFamily", { + get: function () { + return "IPv4"; + }, +}); + +Socket.prototype.resetAndDestroy = function resetAndDestroy() { + if (this._handle) { + if (this.connecting) { + this.once("connect", () => this._reset()); + } else { + this._reset(); } + } else { + this.destroy($ERR_SOCKET_CLOSED()); + } + return this; +}; - setKeepAlive(enable = false, initialDelayMsecs = 0) { - enable = Boolean(enable); - const initialDelay = ~~(initialDelayMsecs / 1000); +Socket.prototype.setKeepAlive = function setKeepAlive(enable = false, initialDelayMsecs = 0) { + enable = Boolean(enable); + const initialDelay = ~~(initialDelayMsecs / 1000); - if (!this._handle) { - this[kSetKeepAlive] = enable; - this[kSetKeepAliveInitialDelay] = initialDelay; - return this; - } + if (!this._handle) { + this[kSetKeepAlive] = enable; + this[kSetKeepAliveInitialDelay] = initialDelay; + return this; + } + if (!this._handle.setKeepAlive) { + return this; + } + if (enable !== this[kSetKeepAlive] || (enable && this[kSetKeepAliveInitialDelay] !== initialDelay)) { + this[kSetKeepAlive] = enable; + this[kSetKeepAliveInitialDelay] = initialDelay; + this._handle.setKeepAlive(enable, initialDelay); + } + return this; +}; - if (!this._handle.setKeepAlive) { - return this; - } +Socket.prototype.setNoDelay = function setNoDelay(enable = true) { + // Backwards compatibility: assume true when `enable` is omitted + enable = Boolean(enable === undefined ? true : enable); - if (enable !== this[kSetKeepAlive] || (enable && this[kSetKeepAliveInitialDelay] !== initialDelay)) { - this[kSetKeepAlive] = enable; - this[kSetKeepAliveInitialDelay] = initialDelay; - this._handle.setKeepAlive(enable, initialDelay); - } + if (!this._handle) { + this[kSetNoDelay] = enable; + return this; + } + if (this._handle.setNoDelay && enable !== this[kSetNoDelay]) { + this[kSetNoDelay] = enable; + this._handle.setNoDelay(enable); + } + return this; +}; - return this; - } +Socket.prototype.setTimeout = function setTimeout(timeout, callback) { + timeout = getTimerDuration(timeout, "msecs"); + // internally or timeouts are in seconds + // we use Math.ceil because 0 would disable the timeout and less than 1 second but greater than 1ms would be 1 second (the minimum) + this._handle?.timeout(Math.ceil(timeout / 1000)); + this.timeout = timeout; + if (callback !== undefined) { + validateFunction(callback, "callback"); + this.once("timeout", callback); + } + return this; +}; - setNoDelay(enable = true) { - // Backwards compatibility: assume true when `enable` is omitted - enable = Boolean(enable === undefined ? true : enable); +Socket.prototype._unrefTimer = function _unrefTimer() { + // for compatibility +}; - if (!this._handle) { - this[kSetNoDelay] = enable; - return this; - } +Socket.prototype.unref = function unref() { + const socket = this._handle; + if (!socket) { + this.once("connect", this.ref); + return this; + } + socket.ref(); + return this; +}; - if (this._handle.setNoDelay && enable !== this[kSetNoDelay]) { - this[kSetNoDelay] = enable; - this._handle.setNoDelay(enable); - } - return this; - } +// https://github.com/nodejs/node/blob/2eff28fb7a93d3f672f80b582f664a7c701569fb/lib/net.js#L785 +Socket.prototype.destroySoon = function destroySoon() { + if (this.writable) this.end(); + if (this.writableFinished) this.destroy(); + else this.once("finish", this.destroy); +}; - setTimeout(timeout, callback) { - timeout = getTimerDuration(timeout, "msecs"); - // internally or timeouts are in seconds - // we use Math.ceil because 0 would disable the timeout and less than 1 second but greater than 1ms would be 1 second (the minimum) - this._handle?.timeout(Math.ceil(timeout / 1000)); - this.timeout = timeout; - if (callback !== undefined) { - validateFunction(callback, "callback"); - this.once("timeout", callback); - } - return this; +//TODO: migrate to native +Socket.prototype._writev = function _writev(data, callback) { + const allBuffers = data.allBuffers; + const chunks = data; + if (allBuffers) { + if (data.length === 1) { + return this._write(data[0], "buffer", callback); } - // for compatibility - _unrefTimer() {} - unref() { - const socket = this._handle; - if (!socket) { - this.#unrefOnConnected = true; - return this; - } - socket.unref(); - return this; + for (let i = 0; i < data.length; i++) { + data[i] = data[i].chunk; } - - // https://github.com/nodejs/node/blob/2eff28fb7a93d3f672f80b582f664a7c701569fb/lib/net.js#L785 - destroySoon() { - if (this.writable) this.end(); - - if (this.writableFinished) this.destroy(); - else this.once("finish", this.destroy); + } else { + if (data.length === 1) { + const { chunk, encoding } = data[0]; + return this._write(chunk, encoding, callback); } - - //TODO: migrate to native - _writev(data, callback) { - const allBuffers = data.allBuffers; - const chunks = data; - if (allBuffers) { - if (data.length === 1) { - return this._write(data[0], "buffer", callback); - } - for (let i = 0; i < data.length; i++) { - data[i] = data[i].chunk; - } + for (let i = 0; i < data.length; i++) { + const { chunk, encoding } = data[i]; + if (typeof chunk === "string") { + data[i] = Buffer.from(chunk, encoding); } else { - if (data.length === 1) { - const { chunk, encoding } = data[0]; - return this._write(chunk, encoding, callback); - } - for (let i = 0; i < data.length; i++) { - const { chunk, encoding } = data[i]; - if (typeof chunk === "string") { - data[i] = Buffer.from(chunk, encoding); - } else { - data[i] = chunk; - } - } + data[i] = chunk; } - const chunk = Buffer.concat(chunks || []); - return this._write(chunk, "buffer", callback); } + } + const chunk = Buffer.concat(chunks || []); + return this._write(chunk, "buffer", callback); +}; - _write(chunk, encoding, callback) { - // If we are still connecting, then buffer this for later. - // The Writable logic will buffer up any more writes while - // waiting for this one to be done. - if (this.connecting) { - this.#writeCallback = callback; - this._pendingData = chunk; - this._pendingEncoding = encoding; - function onClose() { - callback($ERR_SOCKET_CLOSED_BEFORE_CONNECTION()); - } - this.once("connect", function connect() { - this.off("close", onClose); - }); - this.once("close", onClose); - return; - } - this._pendingData = null; - this._pendingEncoding = ""; - this.#writeCallback = null; - const socket = this._handle; - if (!socket) { - callback($ERR_SOCKET_CLOSED()); - return false; - } - const success = socket.$write(chunk, encoding); - this[kBytesWritten] = socket.bytesWritten; - if (success) { - callback(); - } else if (this.#writeCallback) { - callback(new Error("overlapping _write()")); - } else { - this.#writeCallback = callback; - } +Socket.prototype._write = function _write(chunk, encoding, callback) { + // If we are still connecting, then buffer this for later. + // The Writable logic will buffer up any more writes while + // waiting for this one to be done. + if (this.connecting) { + this[kwriteCallback] = callback; + this._pendingData = chunk; + this._pendingEncoding = encoding; + function onClose() { + callback($ERR_SOCKET_CLOSED_BEFORE_CONNECTION()); } - }, -); + this.once("connect", function connect() { + this.off("close", onClose); + }); + this.once("close", onClose); + return; + } + this._pendingData = null; + this._pendingEncoding = ""; + this[kwriteCallback] = null; + const socket = this._handle; + if (!socket) { + callback($ERR_SOCKET_CLOSED()); + return false; + } + const success = socket.$write(chunk, encoding); + this[kBytesWritten] = socket.bytesWritten; + if (success) { + callback(); + } else if (this[kwriteCallback]) { + callback(new Error("overlapping _write()")); + } else { + this[kwriteCallback] = callback; + } +}; function createConnection(port, host, connectListener) { if (typeof port === "object") { @@ -1203,11 +1185,11 @@ const connect = createConnection; type MaybeListener = SocketListener | null; -function Server(): void; -function Server(options?: null | undefined): void; -function Server(connectionListener: () => {}): void; -function Server(options: ServerOpts, connectionListener?: () => {}): void; -function Server(options?, connectionListener?): void { +function Server(); +function Server(options?: null | undefined); +function Server(connectionListener: () => {}); +function Server(options: ServerOpts, connectionListener?: () => {}); +function Server(options?, connectionListener?) { if (!(this instanceof Server)) { return new Server(options, connectionListener); } @@ -1470,7 +1452,7 @@ Server.prototype.listen = function listen(port, hostname, onListen) { tls.rejectUnauthorized = false; } } else { - options.InternalSocketClass = SocketClass; + options.InternalSocketClass = Socket; } listenInCluster( @@ -1518,7 +1500,7 @@ Server.prototype[kRealListen] = function realListen( reusePort: reusePort || this[bunSocketServerOptions]?.reusePort || false, ipv6Only: ipv6Only || this[bunSocketServerOptions]?.ipv6Only || false, exclusive: exclusive || this[bunSocketServerOptions]?.exclusive || false, - socket: SocketClass[bunSocketServerHandlers], + socket: ServerHandlers, }); } else { this._handle = Bun.listen({ @@ -1529,7 +1511,7 @@ Server.prototype[kRealListen] = function realListen( reusePort: reusePort || this[bunSocketServerOptions]?.reusePort || false, ipv6Only: ipv6Only || this[bunSocketServerOptions]?.ipv6Only || false, exclusive: exclusive || this[bunSocketServerOptions]?.exclusive || false, - socket: SocketClass[bunSocketServerHandlers], + socket: ServerHandlers, }); } @@ -1726,7 +1708,7 @@ export default { isIPv4, isIPv6, Socket, - [Symbol.for("::bunternal::")]: SocketClass, + [Symbol.for("::bunternal::")]: Socket, _normalizeArgs: normalizeArgs, getDefaultAutoSelectFamily: $zig("node_net_binding.zig", "getDefaultAutoSelectFamily"),