Skip to content

Commit

Permalink
Cleans up port netcode (#3251)
Browse files Browse the repository at this point in the history
thegrb93 authored Jan 28, 2025
1 parent 61b976c commit 7d156f3
Showing 1 changed file with 193 additions and 168 deletions.
361 changes: 193 additions & 168 deletions lua/wire/wireshared.lua
Original file line number Diff line number Diff line change
@@ -314,6 +314,102 @@ elseif SERVER then
end
end

do
local max_items_per_flush = 1024
local queue_limit = 65536
local ack_timeout = 30

local function PlyQueue()
return {
__flushing = false,
__ack_timeout = 0,
add = function(self, item)
local n=#self+1
if n>queue_limit then return end
self[n]=item
end
}
end

local net_Send = SERVER and net.Send or net.SendToServer
WireLib.NetQueue = {
__index = {
add = SERVER and function(self, item, ply)
if ply==nil then
for _, ply in player.Iterator() do self.plyqueues[ply]:add(item) end
else
self.plyqueues[ply]:add(item)
end
self:notifyFlush()
end or function(self, item)
self.plyqueues[NULL]:add(item)
self:notifyFlush()
end,
cleanup = function(self, ply)
self.plyqueues[ply] = nil
end,
notifyFlush = function(self)
if self.flushing then return end
self.flushing = true
timer.Simple(0, function() self:flush() end)
end,
flush = function(self)
for ply, queue in pairs(self.plyqueues) do self:flushQueue(ply, queue) end
self.flushing = false
end,
flushQueue = function(self, ply, queue)
if queue[1]==nil then return end
local t = CurTime()
if queue.__flushing and t<queue.__ack_timeout then return end
queue.__flushing = true
queue.__ack_timeout = t+ack_timeout

net.Start(self.name)
local written = 0
while written < #queue and written < max_items_per_flush and net.BytesWritten() < 32768 do
net.WriteUInt(1, 1)
written = written + 1
queue[written]()
end
net.WriteUInt(0, 1)
net_Send(ply)
for i=1,#queue do queue[i]=queue[i+written] end
end,
receive = function(self, ply)
if net.ReadUInt(1)==0 then -- An empty message indicates a receive Ack
local plyqueue = self.plyqueues[ply]
plyqueue.__flushing = false
self:flushQueue(ply, plyqueue)
else
if self.receivecb then
for i=1, max_items_per_flush do
if net.BytesLeft()<=0 then break end
self.receivecb()
if net.ReadUInt(1)==0 then break end
end
end
net.Start(self.name) net.WriteUInt(0, 1) net_Send(ply) -- Send an empty message to Ack
end
end,
},
__call = function(t, name, receivecb)
if SERVER then util.AddNetworkString(name) end

local queue = setmetatable({
name = name,
receivecb = receivecb,
flushing = false,
plyqueues = setmetatable({},{__index = function(t,k) local v=PlyQueue() t[k]=v return v end})
}, t)

net.Receive(name, function(len, ply) queue:receive(ply or NULL) end)

return queue
end
}
setmetatable(WireLib.NetQueue, WireLib.NetQueue)
end

function WireLib.ErrorNoHalt(message)
-- ErrorNoHalt clips messages to 512 characters, so chain calls if necessary
for i=1,#message, 511 do
@@ -440,21 +536,20 @@ function WireLib.HasPorts(ent)
return false
end

local WirePortQueue = WireLib.NetQueue("wire_ports")
local CMD_DELETE,CMD_PORT,CMD_LINK = 0,1,2
local PORT_TYPE_INPUT,PORT_TYPE_OUTPUT = 0,1
if SERVER then
local INPUT,OUTPUT = 1,-1
local DELETE,PORT,LINK = 1,2,3

local ents_with_inputs = {}
local ents_with_outputs = {}
--local IOlookup = { [INPUT] = ents_with_inputs, [OUTPUT] = ents_with_outputs }

util.AddNetworkString("wire_ports")
timer.Create("Debugger.PoolTypeStrings",1,1,function()
if WireLib.Debugger and WireLib.Debugger.formatPort then
for typename,_ in pairs(WireLib.Debugger.formatPort) do util.AddNetworkString(typename) end -- Reduce bandwidth
end
end)
local queue = {}

function WireLib.GetPorts(ent)
local eid = ent:EntIndex()
@@ -472,211 +567,141 @@ if SERVER then
end
end

function WireLib._RemoveWire(eid, DontSend) -- To remove the inputs without to remove the entity. Very important for IsWire checks!
local hasinputs, hasoutputs = ents_with_inputs[eid], ents_with_outputs[eid]
if hasinputs or hasoutputs then
ents_with_inputs[eid] = nil
ents_with_outputs[eid] = nil
if not DontSend then
net.Start("wire_ports")
net.WriteInt(-3, 8) -- set eid
net.WriteUInt(eid, MAX_EDICT_BITS) -- entity id
if hasinputs then net.WriteInt(-1, 8) end -- delete inputs
if hasoutputs then net.WriteInt(-2, 8) end -- delete outputs
net.WriteInt(0, 8) -- break
net.Broadcast()
end
end
local function SendDeletePort(queue, eid, porttype)
queue:add(function()
net.WriteUInt(CMD_DELETE, 2)
net.WriteUInt(eid, MAX_EDICT_BITS)
net.WriteUInt(porttype, 1)
end)
end

hook.Add("EntityRemoved", "wire_ports", function(ent)
if not ent:IsPlayer() then
WireLib._RemoveWire(ent:EntIndex())
end
end)

function WireLib._SetInputs(ent, lqueue)
local queue = lqueue or queue
local eid = ent:EntIndex()

if not ents_with_inputs[eid] then ents_with_inputs[eid] = {} end
local function SendPortInfo(queue, eid, porttype, ports)
queue:add(function()
net.WriteUInt(CMD_PORT, 2)
net.WriteUInt(eid, MAX_EDICT_BITS)
net.WriteUInt(porttype, 1)

net.WriteUInt(table.Count(ports), 8)
for Name, CurPort in pairs_sortvalues(ports, WireLib.PortComparator) do
net.WriteString(Name)
net.WriteString(CurPort.Type)
net.WriteString(CurPort.Desc or "")
if porttype==PORT_TYPE_INPUT then
net.WriteBool(CurPort.SrcId and true or false)
end
end
end)
end

queue[#queue+1] = { eid, DELETE, INPUT }
local function SendLinkInfo(queue, eid, num, state)
queue:add(function()
net.WriteUInt(CMD_LINK, 2)
net.WriteUInt(eid, MAX_EDICT_BITS)
net.WriteUInt(1, 8)
net.WriteUInt(num, 8)
net.WriteBool(state)
end)
end

for Name, CurPort in pairs_sortvalues(ent.Inputs, WireLib.PortComparator) do
local entry = { Name, CurPort.Type, CurPort.Desc or "" }
ents_with_inputs[eid][#ents_with_inputs[eid]+1] = entry
queue[#queue+1] = { eid, PORT, INPUT, entry, CurPort.Num }
function WireLib._RemoveWire(eid, DontSend) -- To remove the inputs without to remove the entity. Very important for IsWire checks!
if ents_with_inputs[eid] then
ents_with_inputs[eid] = nil
if not DontSend then SendDeletePort(WirePortQueue, eid, PORT_TYPE_INPUT) end
end
for _, CurPort in pairs_sortvalues(ent.Inputs, WireLib.PortComparator) do
WireLib._SetLink(CurPort, lqueue)
if ents_with_outputs[eid] then
ents_with_outputs[eid] = nil
if not DontSend then SendDeletePort(WirePortQueue, eid, PORT_TYPE_OUTPUT) end
end
end

function WireLib._SetOutputs(ent, lqueue)
local queue = lqueue or queue
function WireLib._SetInputs(ent)
local eid = ent:EntIndex()

if not ents_with_outputs[eid] then ents_with_outputs[eid] = {} end

queue[#queue+1] = { eid, DELETE, OUTPUT }
local ent_input_array = {}
ents_with_inputs[eid] = ent_input_array

for Name, CurPort in pairs_sortvalues(ent.Outputs, WireLib.PortComparator) do
local entry = { Name, CurPort.Type, CurPort.Desc or "" }
ents_with_outputs[eid][#ents_with_outputs[eid]+1] = entry
queue[#queue+1] = { eid, PORT, OUTPUT, entry, CurPort.Num }
for Name, CurPort in pairs_sortvalues(ent.Inputs, WireLib.PortComparator) do
ent_input_array[#ent_input_array+1] = { Name, CurPort.Type, CurPort.Desc or "", CurPort.Num }
end
SendPortInfo(WirePortQueue, eid, PORT_TYPE_INPUT, ent.Inputs)
end

function WireLib._SetLink(input, lqueue)
local ent = input.Entity
local num = input.Num
local state = input.SrcId and true or false

local queue = lqueue or queue
function WireLib._SetOutputs(ent)
local eid = ent:EntIndex()

queue[#queue+1] = {eid, LINK, num, state}
end

local eid = 0
local numports, firstportnum, portstrings = {}, {}, {}
local function writeCurrentStrings()
-- Write the current (input or output) string information
for IO=OUTPUT,INPUT,2 do -- so, k= -1 and k= 1
if numports[IO] then
net.WriteInt(firstportnum[IO], 8) -- Control code for inputs/outputs is also the offset (the first port number we're writing over)
net.WriteUInt(numports[IO], 8) -- Send number of ports
net.WriteBit(IO==OUTPUT)
for i=1,numports[IO]*3 do net.WriteString(portstrings[IO][i] or "") end
numports[IO] = nil
end
end
end
local function writemsg(msg)
-- First write a signed int for the command code
-- Then sometimes write extra data specific to the command (type varies)

if msg[1] ~= eid then
eid = msg[1]
writeCurrentStrings() -- We're switching to talking about a different entity, lets send port information
net.WriteInt(-3,8)
net.WriteUInt(eid,MAX_EDICT_BITS)
end

local msgtype = msg[2]
local ent_output_array = {}
ents_with_outputs[eid] = ent_output_array

if msgtype == DELETE then
numports[msg[3]] = nil
net.WriteInt(msg[3] == INPUT and -1 or -2, 8)
elseif msgtype == PORT then
local _,_,IO,entry,num = unpack(msg)

if not numports[IO] then
firstportnum[IO] = num
numports[IO] = 0
portstrings[IO] = {}
end
local i = numports[IO]*3
portstrings[IO][i+1] = entry[1]
portstrings[IO][i+2] = entry[2]
portstrings[IO][i+3] = entry[3]
numports[IO] = numports[IO]+1
elseif msgtype == LINK then
local _,_,num,state = unpack(msg)
net.WriteInt(-4, 8)
net.WriteUInt(num, 8)
net.WriteBit(state)
for Name, CurPort in pairs_sortvalues(ent.Outputs, WireLib.PortComparator) do
ent_output_array[#ent_output_array+1] = { Name, CurPort.Type, CurPort.Desc or "", CurPort.Num }
end
SendPortInfo(WirePortQueue, eid, PORT_TYPE_OUTPUT, ent.Outputs)
end

local function FlushQueue(lqueue, ply)
-- Zero these two for the writemsg function
eid = 0
numports = {}

net.Start("wire_ports")
for i=1,#lqueue do
writemsg(lqueue[i])
end
writeCurrentStrings()
net.WriteInt(0,8)
if ply then net.Send(ply) else net.Broadcast() end
function WireLib._SetLink(input)
SendLinkInfo(WirePortQueue, input.Entity:EntIndex(), input.Num, input.SrcId and true or false)
end

hook.Add("Think", "wire_ports", function()
if not next(queue) then return end
FlushQueue(queue)
queue = {}
end)

hook.Add("PlayerInitialSpawn", "wire_ports", function(ply)
local lqueue = {}
local queue = WirePortQueue.plyqueues[ply]
for eid, _ in pairs(ents_with_inputs) do
WireLib._SetInputs(Entity(eid), lqueue)
SendPortInfo(queue, eid, PORT_TYPE_INPUT, Entity(eid).Inputs)
end
for eid, _ in pairs(ents_with_outputs) do
WireLib._SetOutputs(Entity(eid), lqueue)
SendPortInfo(queue, eid, PORT_TYPE_OUTPUT, Entity(eid).Outputs)
end
WirePortQueue:flushQueue(ply, queue)
end)

hook.Add("EntityRemoved", "wire_ports", function(ent)
if ent:IsPlayer() then
WirePortQueue:cleanup(ent)
else
WireLib._RemoveWire(ent:EntIndex())
end
FlushQueue(lqueue, ply)
end)

elseif CLIENT then
local ents_with_inputs = {}
local ents_with_outputs = {}

net.Receive("wire_ports", function(netlen)
local eid = 0
local connections = {} -- In case any cmd -4's come in before link strings
while true do
local cmd = net.ReadInt(8)
if cmd == 0 then
break
elseif cmd == -1 then
function WirePortQueue.receivecb()
local cmd, eid = net.ReadUInt(2), net.ReadUInt(MAX_EDICT_BITS)
if cmd == CMD_DELETE then
if net.ReadUInt(1)==PORT_TYPE_INPUT then
-- print("Delete",eid,"input")
ents_with_inputs[eid] = nil
elseif cmd == -2 then
else
-- print("Delete",eid,"output")
ents_with_outputs[eid] = nil
elseif cmd == -3 then
eid = net.ReadUInt(MAX_EDICT_BITS)
elseif cmd == -4 then
connections[#connections+1] = {eid, net.ReadUInt(8), net.ReadBit() ~= 0} -- Delay this process till after the loop
elseif cmd > 0 then
local entry

local amount = net.ReadUInt(8)
if net.ReadBit() ~= 0 then
-- outputs
entry = ents_with_outputs[eid]
if not entry then
entry = {}
ents_with_outputs[eid] = entry
end
else
-- inputs
entry = ents_with_inputs[eid]
if not entry then
entry = {}
ents_with_inputs[eid] = entry
end
end
elseif cmd == CMD_PORT then
if net.ReadUInt(1)==PORT_TYPE_INPUT then
local entry = {}
for i=1, net.ReadUInt(8) do
entry[i] = {net.ReadString(), net.ReadString(), net.ReadString(), net.ReadBool()}
-- print("Port",eid,entry[i][1],entry[i][2],entry[i][3],entry[i][4])
end

local endindex = cmd+amount-1
for i = cmd,endindex do
ents_with_inputs[eid]=entry
else
local entry = {}
for i=1, net.ReadUInt(8) do
entry[i] = {net.ReadString(), net.ReadString(), net.ReadString()}
-- print("Port",eid,entry[i][1],entry[i][2],entry[i][3])
end
ents_with_outputs[eid]=entry
end
end
for i=1, #connections do
local eid, num, state = unpack(connections[i])
local entry = ents_with_inputs[eid]
if not entry then
entry = {}
ents_with_inputs[eid] = entry
elseif entry[num] then
entry[num][4] = state
elseif cmd == CMD_LINK then
for i=1, net.ReadUInt(8) do
local num, state = net.ReadUInt(8), net.ReadBool()
-- print("Link",eid,num, state)
local entry = ents_with_inputs[eid]
if entry and entry[num] then
entry[num][4] = state
end
end
end
end)
end

function WireLib.GetPorts(ent)
local eid = ent:EntIndex()

0 comments on commit 7d156f3

Please sign in to comment.