From 4c9c342047d9bbaa1f8bb722cf55a29de581a669 Mon Sep 17 00:00:00 2001 From: jiceatscion <139873336+jiceatscion@users.noreply.github.com> Date: Tue, 14 May 2024 10:31:02 +0200 Subject: [PATCH] router: optimize computeProcID (#4520) Fixes #4519 --- router/BUILD.bazel | 1 + router/dataplane.go | 42 ++++++++++++++++++------------- router/dataplane_internal_test.go | 28 ++++++++++++--------- router/fnv1aCheap.go | 29 +++++++++++++++++++++ 4 files changed, 71 insertions(+), 29 deletions(-) create mode 100644 router/fnv1aCheap.go diff --git a/router/BUILD.bazel b/router/BUILD.bazel index bcdad66129..f60cbb7b19 100644 --- a/router/BUILD.bazel +++ b/router/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "connector.go", "dataplane.go", + "fnv1aCheap.go", "metrics.go", "svc.go", ], diff --git a/router/dataplane.go b/router/dataplane.go index 082e40cc58..ff387ce187 100644 --- a/router/dataplane.go +++ b/router/dataplane.go @@ -23,7 +23,6 @@ import ( "errors" "fmt" "hash" - "hash/fnv" "math/big" "net" "net/netip" @@ -598,16 +597,21 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig, procQs []chan packet) { log.Debug("Run receiver for", "interface", ifID) - randomValue := make([]byte, 16) - if _, err := rand.Read(randomValue); err != nil { + + // Each receiver (therefore each input interface) has a unique random seed for the procID hash + // function. + hashSeed := fnv1aOffset32 + randomBytes := make([]byte, 4) + if _, err := rand.Read(randomBytes); err != nil { panic("Error while generating random value") } + for _, c := range randomBytes { + hashSeed = hashFNV1a(hashSeed, c) + } msgs := underlayconn.NewReadMessages(cfg.BatchSize) numReusable := 0 // unused buffers from previous loop metrics := d.forwardingMetrics[ifID] // If receiver exists, fw metrics exist too. - flowIDBuffer := make([]byte, 3) - hasher := fnv.New32a() enqueueForProcessing := func(pkt ipv4.Message) { srcAddr := pkt.Addr.(*net.UDPAddr) @@ -616,8 +620,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig, metrics[sc].InputPacketsTotal.Inc() metrics[sc].InputBytesTotal.Add(float64(size)) - procID, err := computeProcID(pkt.Buffers[0], - cfg.NumProcessors, randomValue, flowIDBuffer, hasher) + procID, err := computeProcID(pkt.Buffers[0], cfg.NumProcessors, hashSeed) if err != nil { log.Debug("Error while computing procID", "err", err) d.returnPacketToPool(pkt.Buffers[0]) @@ -658,9 +661,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig, } } -func computeProcID(data []byte, numProcRoutines int, randomValue []byte, - flowIDBuffer []byte, hasher hash.Hash32) (uint32, error) { - +func computeProcID(data []byte, numProcRoutines int, hashSeed uint32) (uint32, error) { if len(data) < slayers.CmnHdrLen { return 0, errShortPacket } @@ -670,14 +671,21 @@ func computeProcID(data []byte, numProcRoutines int, randomValue []byte, if len(data) < slayers.CmnHdrLen+addrHdrLen { return 0, errShortPacket } - copy(flowIDBuffer[0:3], data[1:4]) - flowIDBuffer[0] &= 0xF // the left 4 bits don't belong to the flowID - hasher.Reset() - hasher.Write(randomValue) - hasher.Write(flowIDBuffer[:]) - hasher.Write(data[slayers.CmnHdrLen : slayers.CmnHdrLen+addrHdrLen]) - return hasher.Sum32() % uint32(numProcRoutines), nil + s := hashSeed + + // inject the flowID + s = hashFNV1a(s, data[1]&0xF) // The left 4 bits aren't part of the flowID. + for _, c := range data[2:4] { + s = hashFNV1a(s, c) + } + + // Inject the src/dst addresses + for _, c := range data[slayers.CmnHdrLen : slayers.CmnHdrLen+addrHdrLen] { + s = hashFNV1a(s, c) + } + + return s % uint32(numProcRoutines), nil } func (d *DataPlane) returnPacketToPool(pkt []byte) { diff --git a/router/dataplane_internal_test.go b/router/dataplane_internal_test.go index 84e5e0962c..9fa22fb4a2 100644 --- a/router/dataplane_internal_test.go +++ b/router/dataplane_internal_test.go @@ -213,18 +213,25 @@ func TestForwarder(t *testing.T) { } func TestComputeProcId(t *testing.T) { - randomValue := []byte{1, 2, 3, 4} + randomValueBytes := []byte{1, 2, 3, 4} numProcs := 10000 - // this function returns the procID by using the slayers.SCION serialization - // implementation + // ComputeProcID expects the per-receiver random number to be pre-hashed into the seed that we + // pass. + hashSeed := fnv1aOffset32 + for _, c := range randomValueBytes { + hashSeed = hashFNV1a(hashSeed, c) + } + + // this function returns the procID as we expect it by using the slayers.SCION serialization + // implementation. referenceHash := func(s *slayers.SCION) uint32 { flowBuf := make([]byte, 4) binary.BigEndian.PutUint32(flowBuf, s.FlowID) flowBuf[0] &= 0xF tmpBuffer := make([]byte, 100) hasher := fnv.New32a() - hasher.Write(randomValue) + hasher.Write(randomValueBytes) hasher.Write(flowBuf[1:4]) if err := s.SerializeAddrHdr(tmpBuffer); err != nil { panic(err) @@ -233,11 +240,9 @@ func TestComputeProcId(t *testing.T) { return hasher.Sum32() % uint32(numProcs) } - // this helper returns the procID by using the extraction - // from dataplane.computeProcID() + // this helper returns the procID as the router actually makes it by using the extraction + // from dataplane.computeProcID() along with hashFNV1a() for the seed. computeProcIDHelper := func(payload []byte, s *slayers.SCION) (uint32, error) { - flowIdBuffer := make([]byte, 3) - hasher := fnv.New32a() buffer := gopacket.NewSerializeBuffer() err := gopacket.SerializeLayers(buffer, gopacket.SerializeOptions{FixLengths: true}, @@ -245,7 +250,7 @@ func TestComputeProcId(t *testing.T) { require.NoError(t, err) raw := buffer.Bytes() - return computeProcID(raw, numProcs, randomValue, flowIdBuffer, hasher) + return computeProcID(raw, numProcs, hashSeed) } type ret struct { payload []byte @@ -405,9 +410,8 @@ func TestComputeProcIdErrorCases(t *testing.T) { } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - randomValue := []byte{1, 2, 3, 4} - flowIdBuffer := make([]byte, 3) - _, actualErr := computeProcID(tc.data, 10000, randomValue, flowIdBuffer, fnv.New32a()) + randomValue := uint32(1234) // not a proper hash seed, but hash result is irrelevant. + _, actualErr := computeProcID(tc.data, 10000, randomValue) if tc.expectedError != nil { assert.Equal(t, tc.expectedError.Error(), actualErr.Error()) } else { diff --git a/router/fnv1aCheap.go b/router/fnv1aCheap.go new file mode 100644 index 0000000000..280d56706e --- /dev/null +++ b/router/fnv1aCheap.go @@ -0,0 +1,29 @@ +// Copyright 2024 SCION Association +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package router + +// fnv1aOffset32 is an initial offset that can be used as initial state when calling +// hashFNV1a. +const fnv1aOffset32 uint32 = 2166136261 + +// hashFNV1a returns a hash value for the given initial state combined with the given byte. +// To get a hash for a sequence of bytes, invoke for each byte, passing the returned value +// of one call as the state for the next. Example. s1 = hashFNV1a(fnv1aOffset, byte1) +// s2 = hashFNV1a(s1, byte2) etc. It is valid and recommended to use a value obtained +// from calla to hashFNV1a() as the initial state rather than fnv1aOffset32 itself. +func hashFNV1a(state uint32, c byte) uint32 { + const prime32 = 16777619 + return (state ^ uint32(c)) * prime32 +}