Skip to content

Commit

Permalink
router: optimize computeProcID (#4520)
Browse files Browse the repository at this point in the history
Fixes #4519
jiceatscion authored May 14, 2024
1 parent f82aabe commit 4c9c342
Showing 4 changed files with 71 additions and 29 deletions.
1 change: 1 addition & 0 deletions router/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ go_library(
srcs = [
"connector.go",
"dataplane.go",
"fnv1aCheap.go",
"metrics.go",
"svc.go",
],
42 changes: 25 additions & 17 deletions router/dataplane.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"hash"
"hash/fnv"
"math/big"
"net"
"net/netip"
@@ -598,16 +597,21 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig,
procQs []chan packet) {

log.Debug("Run receiver for", "interface", ifID)
randomValue := make([]byte, 16)
if _, err := rand.Read(randomValue); err != nil {

// Each receiver (therefore each input interface) has a unique random seed for the procID hash
// function.
hashSeed := fnv1aOffset32
randomBytes := make([]byte, 4)
if _, err := rand.Read(randomBytes); err != nil {
panic("Error while generating random value")
}
for _, c := range randomBytes {
hashSeed = hashFNV1a(hashSeed, c)
}

msgs := underlayconn.NewReadMessages(cfg.BatchSize)
numReusable := 0 // unused buffers from previous loop
metrics := d.forwardingMetrics[ifID] // If receiver exists, fw metrics exist too.
flowIDBuffer := make([]byte, 3)
hasher := fnv.New32a()

enqueueForProcessing := func(pkt ipv4.Message) {
srcAddr := pkt.Addr.(*net.UDPAddr)
@@ -616,8 +620,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig,
metrics[sc].InputPacketsTotal.Inc()
metrics[sc].InputBytesTotal.Add(float64(size))

procID, err := computeProcID(pkt.Buffers[0],
cfg.NumProcessors, randomValue, flowIDBuffer, hasher)
procID, err := computeProcID(pkt.Buffers[0], cfg.NumProcessors, hashSeed)
if err != nil {
log.Debug("Error while computing procID", "err", err)
d.returnPacketToPool(pkt.Buffers[0])
@@ -658,9 +661,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig,
}
}

func computeProcID(data []byte, numProcRoutines int, randomValue []byte,
flowIDBuffer []byte, hasher hash.Hash32) (uint32, error) {

func computeProcID(data []byte, numProcRoutines int, hashSeed uint32) (uint32, error) {
if len(data) < slayers.CmnHdrLen {
return 0, errShortPacket
}
@@ -670,14 +671,21 @@ func computeProcID(data []byte, numProcRoutines int, randomValue []byte,
if len(data) < slayers.CmnHdrLen+addrHdrLen {
return 0, errShortPacket
}
copy(flowIDBuffer[0:3], data[1:4])
flowIDBuffer[0] &= 0xF // the left 4 bits don't belong to the flowID

hasher.Reset()
hasher.Write(randomValue)
hasher.Write(flowIDBuffer[:])
hasher.Write(data[slayers.CmnHdrLen : slayers.CmnHdrLen+addrHdrLen])
return hasher.Sum32() % uint32(numProcRoutines), nil
s := hashSeed

// inject the flowID
s = hashFNV1a(s, data[1]&0xF) // The left 4 bits aren't part of the flowID.
for _, c := range data[2:4] {
s = hashFNV1a(s, c)
}

// Inject the src/dst addresses
for _, c := range data[slayers.CmnHdrLen : slayers.CmnHdrLen+addrHdrLen] {
s = hashFNV1a(s, c)
}

return s % uint32(numProcRoutines), nil
}

func (d *DataPlane) returnPacketToPool(pkt []byte) {
28 changes: 16 additions & 12 deletions router/dataplane_internal_test.go
Original file line number Diff line number Diff line change
@@ -213,18 +213,25 @@ func TestForwarder(t *testing.T) {
}

func TestComputeProcId(t *testing.T) {
randomValue := []byte{1, 2, 3, 4}
randomValueBytes := []byte{1, 2, 3, 4}
numProcs := 10000

// this function returns the procID by using the slayers.SCION serialization
// implementation
// ComputeProcID expects the per-receiver random number to be pre-hashed into the seed that we
// pass.
hashSeed := fnv1aOffset32
for _, c := range randomValueBytes {
hashSeed = hashFNV1a(hashSeed, c)
}

// this function returns the procID as we expect it by using the slayers.SCION serialization
// implementation.
referenceHash := func(s *slayers.SCION) uint32 {
flowBuf := make([]byte, 4)
binary.BigEndian.PutUint32(flowBuf, s.FlowID)
flowBuf[0] &= 0xF
tmpBuffer := make([]byte, 100)
hasher := fnv.New32a()
hasher.Write(randomValue)
hasher.Write(randomValueBytes)
hasher.Write(flowBuf[1:4])
if err := s.SerializeAddrHdr(tmpBuffer); err != nil {
panic(err)
@@ -233,19 +240,17 @@ func TestComputeProcId(t *testing.T) {
return hasher.Sum32() % uint32(numProcs)
}

// this helper returns the procID by using the extraction
// from dataplane.computeProcID()
// this helper returns the procID as the router actually makes it by using the extraction
// from dataplane.computeProcID() along with hashFNV1a() for the seed.
computeProcIDHelper := func(payload []byte, s *slayers.SCION) (uint32, error) {
flowIdBuffer := make([]byte, 3)
hasher := fnv.New32a()
buffer := gopacket.NewSerializeBuffer()
err := gopacket.SerializeLayers(buffer,
gopacket.SerializeOptions{FixLengths: true},
s, gopacket.Payload(payload))
require.NoError(t, err)
raw := buffer.Bytes()

return computeProcID(raw, numProcs, randomValue, flowIdBuffer, hasher)
return computeProcID(raw, numProcs, hashSeed)
}
type ret struct {
payload []byte
@@ -405,9 +410,8 @@ func TestComputeProcIdErrorCases(t *testing.T) {
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
randomValue := []byte{1, 2, 3, 4}
flowIdBuffer := make([]byte, 3)
_, actualErr := computeProcID(tc.data, 10000, randomValue, flowIdBuffer, fnv.New32a())
randomValue := uint32(1234) // not a proper hash seed, but hash result is irrelevant.
_, actualErr := computeProcID(tc.data, 10000, randomValue)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError.Error(), actualErr.Error())
} else {
29 changes: 29 additions & 0 deletions router/fnv1aCheap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 SCION Association
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package router

// fnv1aOffset32 is an initial offset that can be used as initial state when calling
// hashFNV1a.
const fnv1aOffset32 uint32 = 2166136261

// hashFNV1a returns a hash value for the given initial state combined with the given byte.
// To get a hash for a sequence of bytes, invoke for each byte, passing the returned value
// of one call as the state for the next. Example. s1 = hashFNV1a(fnv1aOffset, byte1)
// s2 = hashFNV1a(s1, byte2) etc. It is valid and recommended to use a value obtained
// from calla to hashFNV1a() as the initial state rather than fnv1aOffset32 itself.
func hashFNV1a(state uint32, c byte) uint32 {
const prime32 = 16777619
return (state ^ uint32(c)) * prime32
}

0 comments on commit 4c9c342

Please sign in to comment.