diff --git a/.gitignore b/.gitignore index fb86551894..e994a71edd 100644 --- a/.gitignore +++ b/.gitignore @@ -91,6 +91,11 @@ _build node_modules # gobra +####### *.vpr tmp/ logger.log + +# emacs backup files +#################### +*~ diff --git a/demo/file_transfer/file_transfer.py b/demo/file_transfer/file_transfer.py index 6207159e35..f4ede50682 100644 --- a/demo/file_transfer/file_transfer.py +++ b/demo/file_transfer/file_transfer.py @@ -66,11 +66,15 @@ def _get_br_traffic(self, endpoint): conn.request('GET', '/metrics') resp = conn.getresponse() metrics = resp.read().decode('utf-8') + total = 0 + regexp = re.compile(r"""^router_input_bytes_total{.*interface="internal".*\s(.*)$""") for line in metrics.splitlines(): - m = re.search(r"""^router_input_bytes_total{interface="internal".*\s(.*)$""", line) - if m is not None: - return float(m.group(1)) / 1024 / 1024 - return None + try: + m = regexp.search(line) + total += float(m.group(1)) / 1024 / 1024 + except (TypeError, AttributeError, ValueError): + pass + return total def setup_prepare(self): print("setting up the infrastructure") diff --git a/go.mod b/go.mod index 0bfb551a1e..759cd9b13d 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/pelletier/go-toml v1.9.5 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.14.0 + github.com/prometheus/procfs v0.12.0 github.com/quic-go/quic-go v0.38.1 github.com/sergi/go-diff v1.3.1 github.com/smartystreets/goconvey v1.7.2 @@ -44,7 +45,7 @@ require ( go4.org/netipx v0.0.0-20230125063823-8449b0a6169f golang.org/x/crypto v0.6.0 golang.org/x/net v0.10.0 - golang.org/x/sync v0.2.0 + golang.org/x/sync v0.3.0 golang.org/x/tools v0.9.1 google.golang.org/grpc v1.57.2 google.golang.org/grpc/examples v0.0.0-20230222033013-5353eaa44095 @@ -97,7 +98,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect - github.com/prometheus/procfs v0.8.0 // indirect github.com/quic-go/qtls-go1-20 v0.3.3 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect @@ -112,7 +112,7 @@ require ( go.uber.org/multierr v1.8.0 // indirect golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect golang.org/x/mod v0.10.0 // indirect - golang.org/x/sys v0.8.0 // indirect + golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.9.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index 2860c3acf3..8fcf193faf 100644 --- a/go.sum +++ b/go.sum @@ -360,8 +360,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= -github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/quic-go/qtls-go1-20 v0.3.3 h1:17/glZSLI9P9fDAeyCHBFSWSqJcwx1byhLwP5eUIDCM= github.com/quic-go/qtls-go1-20 v0.3.3/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k= github.com/quic-go/quic-go v0.38.1 h1:M36YWA5dEhEeT+slOu/SwMEucbYd0YFidxG3KlGPZaE= @@ -564,8 +564,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= -golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -617,8 +617,8 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/go_deps.bzl b/go_deps.bzl index a936afe9fb..1d7c033a63 100644 --- a/go_deps.bzl +++ b/go_deps.bzl @@ -1012,8 +1012,8 @@ def go_deps(): go_repository( name = "com_github_prometheus_procfs", importpath = "github.com/prometheus/procfs", - sum = "h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=", - version = "v0.8.0", + sum = "h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=", + version = "v0.12.0", ) go_repository( name = "com_github_quic_go_qpack", @@ -1482,14 +1482,14 @@ def go_deps(): go_repository( name = "org_golang_x_sync", importpath = "golang.org/x/sync", - sum = "h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=", - version = "v0.2.0", + sum = "h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=", + version = "v0.3.0", ) go_repository( name = "org_golang_x_sys", importpath = "golang.org/x/sys", - sum = "h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=", - version = "v0.8.0", + sum = "h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=", + version = "v0.12.0", ) go_repository( name = "org_golang_x_term", diff --git a/pkg/private/processmetrics/BUILD.bazel b/pkg/private/processmetrics/BUILD.bazel new file mode 100644 index 0000000000..180a3e4809 --- /dev/null +++ b/pkg/private/processmetrics/BUILD.bazel @@ -0,0 +1,60 @@ +load("//tools/lint:go.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "processmetrics_linux.go", + "processmetrics_other.go", + ], + importpath = "github.com/scionproto/scion/pkg/private/processmetrics", + visibility = ["//visibility:public"], + deps = select({ + "@io_bazel_rules_go//go/platform:aix": [ + "//pkg/private/serrors:go_default_library", + ], + "@io_bazel_rules_go//go/platform:android": [ + "//pkg/private/serrors:go_default_library", + "@com_github_prometheus_client_golang//prometheus:go_default_library", + "@com_github_prometheus_procfs//:go_default_library", + ], + "@io_bazel_rules_go//go/platform:darwin": [ + "//pkg/private/serrors:go_default_library", + ], + "@io_bazel_rules_go//go/platform:dragonfly": [ + "//pkg/private/serrors:go_default_library", + ], + "@io_bazel_rules_go//go/platform:freebsd": [ + "//pkg/private/serrors:go_default_library", + ], + "@io_bazel_rules_go//go/platform:illumos": [ + "//pkg/private/serrors:go_default_library", + ], + "@io_bazel_rules_go//go/platform:ios": [ + "//pkg/private/serrors:go_default_library", + ], + "@io_bazel_rules_go//go/platform:js": [ + "//pkg/private/serrors:go_default_library", + ], + "@io_bazel_rules_go//go/platform:linux": [ + "//pkg/private/serrors:go_default_library", + "@com_github_prometheus_client_golang//prometheus:go_default_library", + "@com_github_prometheus_procfs//:go_default_library", + ], + "@io_bazel_rules_go//go/platform:netbsd": [ + "//pkg/private/serrors:go_default_library", + ], + "@io_bazel_rules_go//go/platform:openbsd": [ + "//pkg/private/serrors:go_default_library", + ], + "@io_bazel_rules_go//go/platform:plan9": [ + "//pkg/private/serrors:go_default_library", + ], + "@io_bazel_rules_go//go/platform:solaris": [ + "//pkg/private/serrors:go_default_library", + ], + "@io_bazel_rules_go//go/platform:windows": [ + "//pkg/private/serrors:go_default_library", + ], + "//conditions:default": [], + }), +) diff --git a/pkg/private/processmetrics/processmetrics_linux.go b/pkg/private/processmetrics/processmetrics_linux.go new file mode 100644 index 0000000000..821bd34dbe --- /dev/null +++ b/pkg/private/processmetrics/processmetrics_linux.go @@ -0,0 +1,214 @@ +// Copyright 2023 SCION Association +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package processmetrics provides a custom collector to export process-level +// metrics beyond what prometheus.ProcesssCollector offers. +// This implementation is restricted to Linux. The generic implementation +// does nothing. +// This code works only if the delayacct kernel feature is turned on. +// this is done by "sysctl kernel.task_delayacct=1". +// +// In order to make a fair run-to-run comparison of these metrics, we must +// relate them to the actual CPU time that was *available* to the router. +// That is, the time that the process was either running, blocked, or sleeping, +// but not "runnable" (which in unix-ese implies *not* running). +// A custom collector in pkg/processmetrics exposes the running and runnable +// metrics directly from the scheduler. +// Possibly crude example of a query that accounts for available cpu: +// +// rate(router_processed_pkts_total[1m]) +// / on (instance, job) group_left () +// (1 - rate(process_runnable_seconds_total[1m])) +// +// This shows processed_packets per available cpu seconds, as opposed to +// real time. +// Possibly crude example of a query that only looks at cpu use efficiency; +// This shows processed_packets per consumed cpu seconds: +// +// rate(router_processed_pkts_total[1m]) +// / on (instance, job) group_left () +// (rate(process_running_seconds_total[1m])) +// + +//go:build linux + +package processmetrics + +import ( + "os" + "path/filepath" + "strconv" + "syscall" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/procfs" + + "github.com/scionproto/scion/pkg/private/serrors" +) + +var ( + // These two metrics allows to infer the amount of CPU time that was available, used or not, + // to the process: + // wallClock time = runningTime + runnableTime + sleepingTime. + // availableCPU = runningTime + sleepingTime + // Therefore AvailbleTime = wallClockTime - runnableTime. + // runningTime should be the same as uTime+sTime reported in a variety of other ways, + // but when doing calculations, better use the two data from the same source. So, collect them + // both. + runningTime = prometheus.NewDesc( + "process_running_seconds_total", + "Time the process spend running since it started (all threads summed).", + nil, nil, + ) + runnableTime = prometheus.NewDesc( + "process_runnable_seconds_total", + "Time the process spend runnable (unscheduled) since it started (all threads summed).", + nil, nil, + ) + // This metric is introspective. It's trying to gauge if we're successful in collecting the + // other two at a reasonable cost. + tasklistUpdates = prometheus.NewDesc( + "process_metrics_tasklist_updates_total", + "The number of time the processmetrics collector recreated its list of tasks.", + nil, nil, + ) +) + +// procStatCollector is a custom collector for some process-wide statistics +// that are not available in default collectors. +type procStatCollector struct { + myPid int + myProcs procfs.Procs + myTasks *os.File + lastTaskCount uint64 + taskListUpdates int64 + totalRunning uint64 + totalRunnable uint64 +} + +// UpdateStat fetches the raw per-thread scheduling metrics from /proc. +// That is: from /proc/task/*/schedstat. +// This raw data is cached for Collect to pick-up and reshape +// when prometheus scrapes. +func (c *procStatCollector) updateStat() error { + + // procfs.AllThreads is expensive (lots of garbage in its wake) and often idempotent. + // To reduce the cost, we skip doing it when we know that the threads line-up is + // unchanged. Since Go never terminates the threads it creates, if the lineup has + // changed, the count has changed. We can only get that from the syscall API. + // As soon as we get the bareFd the IOs with that file become blocking. So, the thread + // collector thread could theoretically block on the IO (not sure stating /proc results + // in any IO wait, though). + + var taskStat syscall.Stat_t + err := syscall.Fstat(int(c.myTasks.Fd()), &taskStat) + if err != nil { + return err + } + newCount := taskStat.Nlink - 2 + if newCount != c.lastTaskCount { + c.taskListUpdates++ + c.myProcs, err = procfs.AllThreads(c.myPid) + if err != nil { + return err + } + c.lastTaskCount = newCount + } + + // Sum the times of all threads. + totalRunning := uint64(0) + totalRunnable := uint64(0) + for _, p := range c.myProcs { + // The procfs API gives us no choice. For each thread, it builds an object with a + // set of stats, which we throw on the garbage pile after picking what we need. + schedStat, oneErr := p.Schedstat() + if oneErr != nil { + err = oneErr + // The only reason would be that this thread has disappeared, which doesn't + // invalidate the values from the others. So, continuing makes more sense. + continue + } + totalRunning += schedStat.RunningNanoseconds + totalRunnable += schedStat.WaitingNanoseconds + } + + c.totalRunning = totalRunning + c.totalRunnable = totalRunnable + return err +} + +// Describe tells prometheus all the metrics that this collector +// collects. +func (c *procStatCollector) Describe(ch chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(c, ch) +} + +// Collect picks the necessary raw metrics from lastSchedstat +// and derives the metrics to be returned. This is invoked whenever +// prometheus scrapes. The derivation consists mostly in unit conversions. +// Because raw metrics are very few and not expensive to get, Collect +// currently calls updateStat() every time to get the latest. +func (c *procStatCollector) Collect(ch chan<- prometheus.Metric) { + _ = c.updateStat() + + ch <- prometheus.MustNewConstMetric( + runningTime, + prometheus.CounterValue, + float64(c.totalRunning)/1000000000, // Report duration in SI + ) + ch <- prometheus.MustNewConstMetric( + runnableTime, + prometheus.CounterValue, + float64(c.totalRunnable)/1000000000, // Report duration in SI + ) + ch <- prometheus.MustNewConstMetric( + tasklistUpdates, + prometheus.CounterValue, + float64(c.taskListUpdates), + ) +} + +// Init creates a new collector for process statistics. +// The collector exposes those statistics to prometheus and responds +// to scraping requests. Call this only once per process or get an error. +// It is safe to ignore errors from this but prometheus may lack some +// metrics. +func Init() error { + me := os.Getpid() + taskPath := filepath.Join(procfs.DefaultMountPoint, strconv.Itoa(me), "task") + taskDir, err := os.Open(taskPath) + if err != nil { + return serrors.WrapStr("Opening /proc/pid/task/ failed", err, + "pid", me) + } + + c := &procStatCollector{ + myPid: me, + myTasks: taskDir, + } + + err = c.updateStat() + if err != nil { + // Ditch the broken collector. It won't do anything useful. + return serrors.WrapStr("First update failed", err) + } + + // It works. Register it so prometheus milks it. + err = prometheus.Register(c) + if err != nil { + return serrors.WrapStr("Registration failed", err) + } + + return nil +} diff --git a/pkg/private/processmetrics/processmetrics_other.go b/pkg/private/processmetrics/processmetrics_other.go new file mode 100644 index 0000000000..fee4922a45 --- /dev/null +++ b/pkg/private/processmetrics/processmetrics_other.go @@ -0,0 +1,29 @@ +// Copyright 2023 SCION Association +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package processmetrics provides a custom collector to export process-level +// metrics beyond what prometheus.ProcesssCollector offers. +// This is the generic implementation. It does nothing. + +//go:build !linux + +package processmetrics + +import ( + "github.com/scionproto/scion/pkg/private/serrors" +) + +func Init() error { + return serrors.New("Not supported for this platform") +} diff --git a/router/BUILD.bazel b/router/BUILD.bazel index d7ebcbc60e..ed41f1d88a 100644 --- a/router/BUILD.bazel +++ b/router/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//pkg/experimental/epic:go_default_library", "//pkg/log:go_default_library", "//pkg/private/common:go_default_library", + "//pkg/private/processmetrics:go_default_library", "//pkg/private/serrors:go_default_library", "//pkg/private/util:go_default_library", "//pkg/scrypto:go_default_library", diff --git a/router/dataplane.go b/router/dataplane.go index 362c298c34..2a1e06e3d6 100644 --- a/router/dataplane.go +++ b/router/dataplane.go @@ -27,7 +27,6 @@ import ( "math/big" "net" "net/netip" - "strconv" "sync" "time" @@ -40,6 +39,7 @@ import ( "github.com/scionproto/scion/pkg/drkey" libepic "github.com/scionproto/scion/pkg/experimental/epic" "github.com/scionproto/scion/pkg/log" + "github.com/scionproto/scion/pkg/private/processmetrics" "github.com/scionproto/scion/pkg/private/serrors" "github.com/scionproto/scion/pkg/private/util" "github.com/scionproto/scion/pkg/scrypto" @@ -110,7 +110,7 @@ type DataPlane struct { mtx sync.Mutex running bool Metrics *Metrics - forwardingMetrics map[uint16]forwardingMetrics + forwardingMetrics map[uint16]interfaceMetrics ExperimentalSCMPAuthentication bool @@ -397,7 +397,7 @@ func (d *DataPlane) AddSvc(svc addr.SVC, a *net.UDPAddr) error { } d.svc.AddSvc(svc, a) if d.Metrics != nil { - labels := serviceMetricLabels(d.localIA, svc) + labels := serviceLabels(d.localIA, svc) d.Metrics.ServiceInstanceChanges.With(labels).Add(1) d.Metrics.ServiceInstanceCount.With(labels).Add(1) } @@ -416,7 +416,7 @@ func (d *DataPlane) DelSvc(svc addr.SVC, a *net.UDPAddr) error { } d.svc.DelSvc(svc, a) if d.Metrics != nil { - labels := serviceMetricLabels(d.localIA, svc) + labels := serviceLabels(d.localIA, svc) d.Metrics.ServiceInstanceChanges.With(labels).Add(1) d.Metrics.ServiceInstanceCount.With(labels).Add(-1) } @@ -589,9 +589,13 @@ type packet struct { // The address to where we are forwarding the packet. // Will be set by the processing routine dstAddr *net.UDPAddr - // The ingress on which this packet arrived. Will be - // set by the receiver - ingress uint16 + // The ingress on which this packet arrived. This is + // set by the receiver. + ingress uint16 + // The type of traffic. This is used for metrics at the forwarding stage, but is most + // economically determined at the processing stage. So transport it here. + trafficType trafficType + // The goods rawPacket []byte } @@ -610,23 +614,24 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig, } msgs := underlayconn.NewReadMessages(cfg.BatchSize) - numReusable := 0 // unused buffers from previous loop - metrics := d.forwardingMetrics[ifID] + numReusable := 0 // unused buffers from previous loop + metrics := d.forwardingMetrics[ifID] // If receiver exists, fw metrics exist too. flowIDBuffer := make([]byte, 3) hasher := fnv.New32a() enqueueForProcessing := func(pkt ipv4.Message) { - metrics.InputPacketsTotal.Inc() - metrics.InputBytesTotal.Add(float64(pkt.N)) - srcAddr := pkt.Addr.(*net.UDPAddr) + size := pkt.N + sc := classOfSize(size) + metrics[sc].InputPacketsTotal.Inc() + metrics[sc].InputBytesTotal.Add(float64(size)) procID, err := computeProcID(pkt.Buffers[0], cfg.NumProcessors, randomValue, flowIDBuffer, hasher) if err != nil { log.Debug("Error while computing procID", "err", err) d.returnPacketToPool(pkt.Buffers[0]) - metrics.DroppedPacketsInvalid.Inc() + metrics[sc].DroppedPacketsInvalid.Inc() return } outPkt := packet{ @@ -638,7 +643,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig, case procQs[procID] <- outPkt: default: d.returnPacketToPool(pkt.Buffers[0]) - metrics.DroppedPacketsBusyProcessor.Inc() + metrics[sc].DroppedPacketsBusyProcessor.Inc() } } @@ -699,9 +704,12 @@ func (d *DataPlane) runProcessor(id int, q <-chan packet, if !ok { continue } - metrics := d.forwardingMetrics[p.ingress] result, err := processor.processPkt(p.rawPacket, p.srcAddr, p.ingress) + + sc := classOfSize(len(p.rawPacket)) + metrics := d.forwardingMetrics[p.ingress][sc] metrics.ProcessedPackets.Inc() + egress := result.EgressID switch { case err == nil: @@ -732,7 +740,7 @@ func (d *DataPlane) runProcessor(id int, q <-chan packet, } p.rawPacket = result.OutPkt p.dstAddr = result.OutAddr - + p.trafficType = result.TrafficType select { case fwCh <- p: default: @@ -753,8 +761,9 @@ func (d *DataPlane) runSlowPathProcessor(id int, q <-chan slowPacket, if !ok { continue } - metrics := d.forwardingMetrics[p.packet.ingress] res, err := processor.processPacket(p) + sc := classOfSize(len(p.rawPacket)) + metrics := d.forwardingMetrics[p.packet.ingress][sc] if err != nil { log.Debug("Error processing packet", "err", err) metrics.DroppedPacketsInvalid.Inc() @@ -829,6 +838,17 @@ func (p *slowPathPacketProcessor) reset() { p.e2eLayer = slayers.EndToEndExtnSkipper{} } +// processResults carries what could be determined while processing +// a packet. In most cases of error, all fields are left to their +// zero value. +type processResult struct { + EgressID uint16 + OutAddr *net.UDPAddr + OutPkt []byte + SlowPathRequest slowPathRequest + TrafficType trafficType +} + func (p *slowPathPacketProcessor) processPacket(pkt slowPacket) (processResult, error) { var err error p.reset() @@ -887,64 +907,95 @@ func (p *slowPathPacketProcessor) processPacket(pkt slowPacket) (processResult, } } -func (d *DataPlane) runForwarder(ifID uint16, conn BatchConn, - cfg *RunConfig, c <-chan packet) { +func updateOutputMetrics(metrics interfaceMetrics, packets []packet) { + // We need to collect stats by traffic type and size class. + // Try to reduce the metrics lookup penalty by using some + // simpler staging data structure. + writtenPkts := [ttMax][maxSizeClass]int{} + writtenBytes := [ttMax][maxSizeClass]int{} + for _, p := range packets { + s := len(p.rawPacket) + sc := classOfSize(s) + tt := p.trafficType + writtenPkts[tt][sc]++ + writtenBytes[tt][sc] += s + } + for t := ttOther; t < ttMax; t++ { + for sc := minSizeClass; sc < maxSizeClass; sc++ { + if writtenPkts[t][sc] > 0 { + metrics[sc].Output[t].OutputPacketsTotal.Add(float64(writtenPkts[t][sc])) + metrics[sc].Output[t].OutputBytesTotal.Add(float64(writtenBytes[t][sc])) + } + } + } +} + +func (d *DataPlane) runForwarder(ifID uint16, conn BatchConn, cfg *RunConfig, c <-chan packet) { log.Debug("Initialize forwarder for", "interface", ifID) - writeMsgs := make(underlayconn.Messages, cfg.BatchSize) - for i := range writeMsgs { - writeMsgs[i].Buffers = make([][]byte, 1) + + // We use this somewhat like a ring buffer. + pkts := make([]packet, cfg.BatchSize) + + // We use this as a temporary buffer, but allocate it just once + // to save on garbage handling. + msgs := make(underlayconn.Messages, cfg.BatchSize) + for i := range msgs { + msgs[i].Buffers = make([][]byte, 1) } + metrics := d.forwardingMetrics[ifID] - remaining := 0 + toWrite := 0 for d.running { - available := readUpTo(c, cfg.BatchSize-remaining, remaining == 0, - writeMsgs[remaining:]) - available += remaining - written, _ := conn.WriteBatch(writeMsgs[:available], 0) + toWrite += readUpTo(c, cfg.BatchSize-toWrite, toWrite == 0, pkts[toWrite:]) + + // Turn the packets into underlay messages that WriteBatch can send. + for i, p := range pkts[:toWrite] { + msgs[i].Buffers[0] = p.rawPacket + msgs[i].Addr = nil + if p.dstAddr != nil { + msgs[i].Addr = p.dstAddr + } + } + written, _ := conn.WriteBatch(msgs[:toWrite], 0) if written < 0 { // WriteBatch returns -1 on error, we just consider this as // 0 packets written written = 0 } - writtenBytes := 0 - for i := range writeMsgs[:written] { - writtenBytes += len(writeMsgs[i].Buffers[0]) - d.returnPacketToPool(writeMsgs[i].Buffers[0]) + + updateOutputMetrics(metrics, pkts[:written]) + + for _, p := range pkts[:written] { + d.returnPacketToPool(p.rawPacket) } - metrics.OutputPacketsTotal.Add(float64(written)) - metrics.OutputBytesTotal.Add(float64(writtenBytes)) - if written != available { - metrics.DroppedPacketsInvalid.Inc() - d.returnPacketToPool(writeMsgs[written].Buffers[0]) - remaining = available - written - 1 - for i := 0; i < remaining; i++ { - writeMsgs[i].Buffers[0] = writeMsgs[i+written+1].Buffers[0] - writeMsgs[i].Addr = writeMsgs[i+written+1].Addr + if written != toWrite { + // Only one is dropped at this time. We'll retry the rest. + sc := classOfSize(len(pkts[written].rawPacket)) + metrics[sc].DroppedPacketsInvalid.Inc() + d.returnPacketToPool(pkts[written].rawPacket) + toWrite -= (written + 1) + // Shift the leftovers to the head of the buffers. + for i := 0; i < toWrite; i++ { + pkts[i] = pkts[i+written+1] } + } else { - remaining = 0 + toWrite = 0 } } } -func readUpTo(c <-chan packet, n int, needsBlocking bool, msg []ipv4.Message) int { - assign := func(p packet, m *ipv4.Message) { - m.Buffers[0] = p.rawPacket - m.Addr = nil - if p.dstAddr != nil { - m.Addr = p.dstAddr - } - } +func readUpTo(c <-chan packet, n int, needsBlocking bool, pkts []packet) int { i := 0 if needsBlocking { p, ok := <-c if !ok { return i } - assign(p, &msg[i]) + pkts[i] = p i++ } @@ -954,7 +1005,7 @@ func readUpTo(c <-chan packet, n int, needsBlocking bool, msg []ipv4.Message) in if !ok { return i } - assign(p, &msg[i]) + pkts[i] = p default: return i } @@ -963,29 +1014,6 @@ func readUpTo(c <-chan packet, n int, needsBlocking bool, msg []ipv4.Message) in return i } -// initMetrics initializes the metrics related to packet forwarding. The -// counters are already instantiated for all the relevant interfaces so this -// will not have to be repeated during packet forwarding. -func (d *DataPlane) initMetrics() { - d.forwardingMetrics = make(map[uint16]forwardingMetrics) - labels := interfaceToMetricLabels(0, d.localIA, d.neighborIAs) - d.forwardingMetrics[0] = initForwardingMetrics(d.Metrics, labels) - for id := range d.external { - if _, notOwned := d.internalNextHops[id]; notOwned { - continue - } - labels = interfaceToMetricLabels(id, d.localIA, d.neighborIAs) - d.forwardingMetrics[id] = initForwardingMetrics(d.Metrics, labels) - } -} - -type processResult struct { - EgressID uint16 - OutAddr *net.UDPAddr - OutPkt []byte - SlowPathRequest slowPathRequest -} - func newPacketProcessor(d *DataPlane) *scionPacketProcessor { p := &scionPacketProcessor{ d: d, @@ -1035,6 +1063,7 @@ func (p *scionPacketProcessor) processPkt(rawPkt []byte, if err != nil { return processResult{}, err } + pld := p.lastLayer.LayerPayload() pathType := p.scionLayer.PathType @@ -1782,17 +1811,20 @@ func (p *scionPacketProcessor) process() (processResult, error) { if r, err := p.handleIngressRouterAlert(); err != nil { return r, err } - // Inbound: pkts destined to the local IA. + // Inbound: pkt destined to the local IA. if p.scionLayer.DstIA == p.d.localIA { a, r, err := p.resolveInbound() if err != nil { return r, err } - return processResult{OutAddr: a, OutPkt: p.rawPkt}, nil + return processResult{OutAddr: a, OutPkt: p.rawPkt, TrafficType: ttIn}, nil } - // Outbound: pkts leaving the local IA. - // BRTransit: pkts leaving from the same BR different interface. + // Outbound: pkt leaving the local IA. This Could be: + // * Pure outbound: from this AS, in via internal, out via external. + // * ASTransit in: from another AS, in via external, out via internal to other BR. + // * ASTransit out: from another AS, in via internal from other BR, out via external. + // * BRTransit: from another AS, in via external, out via external. if p.path.IsXover() && !p.peering { // An effective cross-over is a change of segment other than at // a peering hop. @@ -1823,14 +1855,27 @@ func (p *scionPacketProcessor) process() (processResult, error) { } egressID := p.egressInterface() if _, ok := p.d.external[egressID]; ok { + // Not ASTransit in if err := p.processEgress(); err != nil { return processResult{}, err } - return processResult{EgressID: egressID, OutPkt: p.rawPkt}, nil + // Finish deciding the trafficType... + var tt trafficType + if p.scionLayer.SrcIA == p.d.localIA { + // Pure outbound + tt = ttOut + } else if p.ingressID == 0 { + // ASTransit out + tt = ttOutTransit + } else { + // Therefore it is BRTransit + tt = ttBrTransit + } + return processResult{EgressID: egressID, OutPkt: p.rawPkt, TrafficType: tt}, nil } - // ASTransit: pkts leaving from another AS BR. + // ASTransit in: pkt leaving this AS through another BR. if a, ok := p.d.internalNextHops[egressID]; ok { - return processResult{OutAddr: a, OutPkt: p.rawPkt}, nil + return processResult{OutAddr: a, OutPkt: p.rawPkt, TrafficType: ttInTransit}, nil } errCode := slayers.SCMPCodeUnknownHopFieldEgress if !p.infoField.ConsDir { @@ -2365,69 +2410,25 @@ func nextHdr(layer gopacket.DecodingLayer) slayers.L4ProtocolType { } } -// forwardingMetrics contains the subset of Metrics relevant for forwarding, -// instantiated with some interface-specific labels. -type forwardingMetrics struct { - InputBytesTotal prometheus.Counter - OutputBytesTotal prometheus.Counter - InputPacketsTotal prometheus.Counter - OutputPacketsTotal prometheus.Counter - DroppedPacketsInvalid prometheus.Counter - DroppedPacketsBusyProcessor prometheus.Counter - DroppedPacketsBusyForwarder prometheus.Counter - DroppedPacketsBusySlowPath prometheus.Counter - ProcessedPackets prometheus.Counter -} - -func initForwardingMetrics(metrics *Metrics, labels prometheus.Labels) forwardingMetrics { - c := forwardingMetrics{ - InputBytesTotal: metrics.InputBytesTotal.With(labels), - InputPacketsTotal: metrics.InputPacketsTotal.With(labels), - OutputBytesTotal: metrics.OutputBytesTotal.With(labels), - OutputPacketsTotal: metrics.OutputPacketsTotal.With(labels), - ProcessedPackets: metrics.ProcessedPackets.With(labels), - } - labels["reason"] = "invalid" - c.DroppedPacketsInvalid = metrics.DroppedPacketsTotal.With(labels) - labels["reason"] = "busy_processor" - c.DroppedPacketsBusyProcessor = metrics.DroppedPacketsTotal.With(labels) - labels["reason"] = "busy_forwarder" - c.DroppedPacketsBusyForwarder = metrics.DroppedPacketsTotal.With(labels) - labels["reason"] = "busy_slow_path" - c.DroppedPacketsBusySlowPath = metrics.DroppedPacketsTotal.With(labels) - - c.InputBytesTotal.Add(0) - c.InputPacketsTotal.Add(0) - c.OutputBytesTotal.Add(0) - c.OutputPacketsTotal.Add(0) - c.DroppedPacketsInvalid.Add(0) - c.DroppedPacketsBusyProcessor.Add(0) - c.DroppedPacketsBusyForwarder.Add(0) - c.DroppedPacketsBusySlowPath.Add(0) - c.ProcessedPackets.Add(0) - return c -} - -func interfaceToMetricLabels(id uint16, localIA addr.IA, - neighbors map[uint16]addr.IA) prometheus.Labels { - - if id == 0 { - return prometheus.Labels{ - "isd_as": localIA.String(), - "interface": "internal", - "neighbor_isd_as": localIA.String(), - } - } - return prometheus.Labels{ - "isd_as": localIA.String(), - "interface": strconv.FormatUint(uint64(id), 10), - "neighbor_isd_as": neighbors[id].String(), - } -} - -func serviceMetricLabels(localIA addr.IA, svc addr.SVC) prometheus.Labels { - return prometheus.Labels{ - "isd_as": localIA.String(), - "service": svc.BaseString(), +// initMetrics initializes the metrics related to packet forwarding. The counters are already +// instantiated for all the relevant interfaces so this will not have to be repeated during packet +// forwarding. +func (d *DataPlane) initMetrics() { + d.forwardingMetrics = make(map[uint16]interfaceMetrics) + d.forwardingMetrics[0] = newInterfaceMetrics(d.Metrics, 0, d.localIA, d.neighborIAs) + for id := range d.external { + if _, notOwned := d.internalNextHops[id]; notOwned { + continue + } + d.forwardingMetrics[id] = newInterfaceMetrics(d.Metrics, id, d.localIA, d.neighborIAs) + } + + // Start our custom /proc/pid/stat collector to export iowait time and (in the future) other + // process-wide metrics that prometheus does not. + err := processmetrics.Init() + + // we can live without these metrics. Just log the error. + if err != nil { + log.Error("Could not initialize processmetrics", "err", err) } } diff --git a/router/dataplane_internal_test.go b/router/dataplane_internal_test.go index 6e92a8219d..1b51607f47 100644 --- a/router/dataplane_internal_test.go +++ b/router/dataplane_internal_test.go @@ -421,6 +421,12 @@ func TestSlowPathProcessing(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() payload := []byte("actualpayloadbytes") + + // ProcessPacket assumes some pre-conditions: + // * The ingress interface has to exist. This fake map is good for the test cases we have. + // * InternalNextHops may not be nil. Empty is ok for all the test cases we have. + fakeExternalInterfaces := map[uint16]BatchConn{1: nil} + fakeInternalNextHops := map[uint16]*net.UDPAddr{} testCases := map[string]struct { mockMsg func() []byte prepareDP func(*gomock.Controller) *DataPlane @@ -430,9 +436,12 @@ func TestSlowPathProcessing(t *testing.T) { }{ "svc nobackend": { prepareDP: func(ctrl *gomock.Controller) *DataPlane { - return NewDP(nil, nil, mock_router.NewMockBatchConn(ctrl), nil, + return NewDP(fakeExternalInterfaces, + nil, mock_router.NewMockBatchConn(ctrl), + fakeInternalNextHops, map[addr.SVC][]*net.UDPAddr{}, - xtest.MustParseIA("1-ff00:0:110"), nil, testKey) + xtest.MustParseIA("1-ff00:0:110"), + nil, testKey) }, mockMsg: func() []byte { spkt := prepBaseMsg(t, payload, 0) @@ -452,7 +461,9 @@ func TestSlowPathProcessing(t *testing.T) { }, "svc invalid": { prepareDP: func(ctrl *gomock.Controller) *DataPlane { - return NewDP(nil, nil, mock_router.NewMockBatchConn(ctrl), nil, + return NewDP(fakeExternalInterfaces, + nil, mock_router.NewMockBatchConn(ctrl), + fakeInternalNextHops, map[addr.SVC][]*net.UDPAddr{}, xtest.MustParseIA("1-ff00:0:110"), nil, testKey) }, @@ -474,7 +485,9 @@ func TestSlowPathProcessing(t *testing.T) { }, "invalid dest": { prepareDP: func(ctrl *gomock.Controller) *DataPlane { - return NewDP(nil, nil, mock_router.NewMockBatchConn(ctrl), nil, + return NewDP(fakeExternalInterfaces, + nil, mock_router.NewMockBatchConn(ctrl), + fakeInternalNextHops, map[addr.SVC][]*net.UDPAddr{}, xtest.MustParseIA("1-ff00:0:110"), nil, testKey) }, @@ -500,6 +513,8 @@ func TestSlowPathProcessing(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() dp := tc.prepareDP(ctrl) + dp.initMetrics() + rawPacket := tc.mockMsg() var srcAddr *net.UDPAddr diff --git a/router/dataplane_test.go b/router/dataplane_test.go index 57f3b20ddf..baf163fb53 100644 --- a/router/dataplane_test.go +++ b/router/dataplane_test.go @@ -576,6 +576,13 @@ func TestProcessPkt(t *testing.T) { epicTS, err := libepic.CreateTimestamp(now, now) require.NoError(t, err) + // ProcessPacket assumes some pre-conditions: + // * The ingress interface has to exist. This fake map is good for most test cases. + // Others need a custom one. + // * InternalNextHops may not be nil. Empty is ok (sufficient unless testing AS transit). + fakeExternalInterfaces := map[uint16]router.BatchConn{1: nil, 2: nil, 3: nil} + fakeInternalNextHops := map[uint16]*net.UDPAddr{} + testCases := map[string]struct { mockMsg func(bool) *ipv4.Message prepareDP func(*gomock.Controller) *router.DataPlane @@ -585,8 +592,10 @@ func TestProcessPkt(t *testing.T) { }{ "inbound": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { - return router.NewDP(nil, nil, mock_router.NewMockBatchConn(ctrl), nil, - nil, xtest.MustParseIA("1-ff00:0:110"), nil, key) + return router.NewDP(fakeExternalInterfaces, + nil, mock_router.NewMockBatchConn(ctrl), + fakeInternalNextHops, nil, + xtest.MustParseIA("1-ff00:0:110"), nil, key) }, mockMsg: func(afterProcessing bool) *ipv4.Message { spkt, dpath := prepBaseMsg(now) @@ -596,7 +605,7 @@ func TestProcessPkt(t *testing.T) { dpath.HopFields = []path.HopField{ {ConsIngress: 41, ConsEgress: 40}, {ConsIngress: 31, ConsEgress: 30}, - {ConsIngress: 01, ConsEgress: 0}, + {ConsIngress: 1, ConsEgress: 0}, } dpath.Base.PathMeta.CurrHF = 2 dpath.HopFields[2].Mac = computeMAC(t, key, dpath.InfoFields[0], dpath.HopFields[2]) @@ -620,7 +629,9 @@ func TestProcessPkt(t *testing.T) { map[uint16]topology.LinkType{ 1: topology.Child, }, - nil, nil, nil, xtest.MustParseIA("1-ff00:0:110"), nil, key) + nil, + fakeInternalNextHops, nil, + xtest.MustParseIA("1-ff00:0:110"), nil, key) }, mockMsg: func(afterProcessing bool) *ipv4.Message { spkt, dpath := prepBaseMsg(now) @@ -650,13 +661,16 @@ func TestProcessPkt(t *testing.T) { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( map[uint16]router.BatchConn{ + uint16(1): mock_router.NewMockBatchConn(ctrl), uint16(2): mock_router.NewMockBatchConn(ctrl), }, map[uint16]topology.LinkType{ 1: topology.Parent, 2: topology.Child, }, - nil, nil, nil, xtest.MustParseIA("1-ff00:0:110"), nil, key) + nil, + fakeInternalNextHops, nil, + xtest.MustParseIA("1-ff00:0:110"), nil, key) }, mockMsg: func(afterProcessing bool) *ipv4.Message { spkt, dpath := prepBaseMsg(now) @@ -685,12 +699,15 @@ func TestProcessPkt(t *testing.T) { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( map[uint16]router.BatchConn{ + uint16(1): mock_router.NewMockBatchConn(ctrl), uint16(2): mock_router.NewMockBatchConn(ctrl), }, map[uint16]topology.LinkType{ 2: topology.Parent, 1: topology.Child, - }, nil, nil, nil, xtest.MustParseIA("1-ff00:0:110"), nil, key) + }, nil, + fakeInternalNextHops, nil, + xtest.MustParseIA("1-ff00:0:110"), nil, key) }, mockMsg: func(afterProcessing bool) *ipv4.Message { spkt, dpath := prepBaseMsg(now) @@ -720,13 +737,16 @@ func TestProcessPkt(t *testing.T) { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( map[uint16]router.BatchConn{ + uint16(1): mock_router.NewMockBatchConn(ctrl), uint16(2): mock_router.NewMockBatchConn(ctrl), }, map[uint16]topology.LinkType{ 1: topology.Peer, 2: topology.Child, }, - nil, nil, nil, xtest.MustParseIA("1-ff00:0:110"), nil, key) + nil, + fakeInternalNextHops, nil, + xtest.MustParseIA("1-ff00:0:110"), nil, key) }, mockMsg: func(afterProcessing bool) *ipv4.Message { // Story: the packet just left segment 0 which ends at @@ -792,12 +812,15 @@ func TestProcessPkt(t *testing.T) { return router.NewDP( map[uint16]router.BatchConn{ uint16(1): mock_router.NewMockBatchConn(ctrl), + uint16(2): mock_router.NewMockBatchConn(ctrl), }, map[uint16]topology.LinkType{ 1: topology.Peer, 2: topology.Child, }, - nil, nil, nil, xtest.MustParseIA("1-ff00:0:110"), nil, key) + nil, + fakeInternalNextHops, nil, + xtest.MustParseIA("1-ff00:0:110"), nil, key) }, mockMsg: func(afterProcessing bool) *ipv4.Message { // Story: the packet lands on the last (peering) hop of @@ -870,13 +893,16 @@ func TestProcessPkt(t *testing.T) { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( map[uint16]router.BatchConn{ + uint16(1): mock_router.NewMockBatchConn(ctrl), uint16(2): mock_router.NewMockBatchConn(ctrl), }, map[uint16]topology.LinkType{ 1: topology.Peer, 2: topology.Child, }, - nil, nil, nil, xtest.MustParseIA("1-ff00:0:110"), nil, key) + nil, + fakeInternalNextHops, nil, + xtest.MustParseIA("1-ff00:0:110"), nil, key) }, mockMsg: func(afterProcessing bool) *ipv4.Message { // Story: the packet just left hop 1 (the first hop @@ -946,12 +972,15 @@ func TestProcessPkt(t *testing.T) { return router.NewDP( map[uint16]router.BatchConn{ uint16(1): mock_router.NewMockBatchConn(ctrl), + uint16(2): mock_router.NewMockBatchConn(ctrl), }, map[uint16]topology.LinkType{ 1: topology.Peer, 2: topology.Child, }, - nil, nil, nil, xtest.MustParseIA("1-ff00:0:110"), nil, key) + nil, + fakeInternalNextHops, nil, + xtest.MustParseIA("1-ff00:0:110"), nil, key) }, mockMsg: func(afterProcessing bool) *ipv4.Message { // Story: the packet lands on the second (non-peering) hop of @@ -1029,7 +1058,12 @@ func TestProcessPkt(t *testing.T) { }, "astransit direct": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { - return router.NewDP(nil, + return router.NewDP( + map[uint16]router.BatchConn{ + uint16(1): mock_router.NewMockBatchConn(ctrl), + // Interface 3 isn't in the external interfaces of this router + // another router has it. + }, map[uint16]topology.LinkType{ 1: topology.Core, 3: topology.Core, @@ -1055,12 +1089,15 @@ func TestProcessPkt(t *testing.T) { return ret }, srcInterface: 1, - egressInterface: 0, + egressInterface: 0, // Internal forward to the egress router assertFunc: assert.NoError, }, "astransit xover": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { - return router.NewDP(nil, + return router.NewDP( + map[uint16]router.BatchConn{ + uint16(51): mock_router.NewMockBatchConn(ctrl), + }, map[uint16]topology.LinkType{ 51: topology.Child, 3: topology.Core, @@ -1107,13 +1144,15 @@ func TestProcessPkt(t *testing.T) { ret.Flags, ret.NN, ret.N, ret.OOB = 0, 0, 0, nil return ret }, - srcInterface: 51, - egressInterface: 0, + srcInterface: 51, // == consEgress, bc non-consdir + egressInterface: 0, // Cross-over. The egress happens in the next segment. assertFunc: assert.NoError, }, "svc": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { - return router.NewDP(nil, nil, mock_router.NewMockBatchConn(ctrl), nil, + return router.NewDP(fakeExternalInterfaces, + nil, mock_router.NewMockBatchConn(ctrl), + fakeInternalNextHops, map[addr.SVC][]*net.UDPAddr{ addr.SvcCS: { &net.UDPAddr{ @@ -1150,7 +1189,7 @@ func TestProcessPkt(t *testing.T) { "onehop inbound": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - nil, + fakeExternalInterfaces, nil, mock_router.NewMockBatchConn(ctrl), nil, map[addr.SVC][]*net.UDPAddr{ @@ -1213,7 +1252,9 @@ func TestProcessPkt(t *testing.T) { "onehop inbound invalid src": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - nil, nil, nil, nil, nil, + fakeExternalInterfaces, + nil, nil, + fakeInternalNextHops, nil, xtest.MustParseIA("1-ff00:0:110"), map[uint16]addr.IA{ uint16(1): xtest.MustParseIA("1-ff00:0:111"), @@ -1243,7 +1284,7 @@ func TestProcessPkt(t *testing.T) { } return toMsg(t, spkt, dpath) }, - srcInterface: 1, + srcInterface: 2, egressInterface: 21, assertFunc: assert.Error, }, @@ -1254,7 +1295,8 @@ func TestProcessPkt(t *testing.T) { uint16(1): mock_router.NewMockBatchConn(ctrl), }, nil, - mock_router.NewMockBatchConn(ctrl), nil, + mock_router.NewMockBatchConn(ctrl), + fakeInternalNextHops, map[addr.SVC][]*net.UDPAddr{ addr.SvcCS: {&net.UDPAddr{ IP: net.ParseIP("172.0.2.10"), @@ -1314,8 +1356,8 @@ func TestProcessPkt(t *testing.T) { uint16(2): mock_router.NewMockBatchConn(ctrl), }, nil, - mock_router.NewMockBatchConn(ctrl), nil, - nil, + mock_router.NewMockBatchConn(ctrl), + fakeInternalNextHops, nil, xtest.MustParseIA("1-ff00:0:110"), map[uint16]addr.IA{ uint16(2): xtest.MustParseIA("1-ff00:0:111"), @@ -1356,8 +1398,10 @@ func TestProcessPkt(t *testing.T) { }, "epic inbound": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { - return router.NewDP(nil, nil, mock_router.NewMockBatchConn(ctrl), nil, - nil, xtest.MustParseIA("1-ff00:0:110"), nil, key) + return router.NewDP(fakeExternalInterfaces, + nil, mock_router.NewMockBatchConn(ctrl), + fakeInternalNextHops, nil, + xtest.MustParseIA("1-ff00:0:110"), nil, key) }, mockMsg: func(afterProcessing bool) *ipv4.Message { spkt, epicpath, dpath := prepEpicMsg(t, @@ -1372,8 +1416,10 @@ func TestProcessPkt(t *testing.T) { }, "epic malformed path": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { - return router.NewDP(nil, nil, mock_router.NewMockBatchConn(ctrl), nil, - nil, xtest.MustParseIA("1-ff00:0:110"), nil, key) + return router.NewDP(fakeExternalInterfaces, + nil, mock_router.NewMockBatchConn(ctrl), + fakeInternalNextHops, nil, + xtest.MustParseIA("1-ff00:0:110"), nil, key) }, mockMsg: func(afterProcessing bool) *ipv4.Message { spkt, epicpath, dpath := prepEpicMsg(t, @@ -1389,8 +1435,10 @@ func TestProcessPkt(t *testing.T) { }, "epic invalid timestamp": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { - return router.NewDP(nil, nil, mock_router.NewMockBatchConn(ctrl), nil, - nil, xtest.MustParseIA("1-ff00:0:110"), nil, key) + return router.NewDP(fakeExternalInterfaces, + nil, mock_router.NewMockBatchConn(ctrl), + fakeInternalNextHops, nil, + xtest.MustParseIA("1-ff00:0:110"), nil, key) }, mockMsg: func(afterProcessing bool) *ipv4.Message { spkt, epicpath, dpath := prepEpicMsg(t, @@ -1408,8 +1456,10 @@ func TestProcessPkt(t *testing.T) { }, "epic invalid LHVF": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { - return router.NewDP(nil, nil, mock_router.NewMockBatchConn(ctrl), nil, - nil, xtest.MustParseIA("1-ff00:0:110"), nil, key) + return router.NewDP(fakeExternalInterfaces, + nil, mock_router.NewMockBatchConn(ctrl), + fakeInternalNextHops, nil, + xtest.MustParseIA("1-ff00:0:110"), nil, key) }, mockMsg: func(afterProcessing bool) *ipv4.Message { spkt, epicpath, dpath := prepEpicMsg(t, diff --git a/router/export_test.go b/router/export_test.go index a82d01e5a0..f8134b4a3d 100644 --- a/router/export_test.go +++ b/router/export_test.go @@ -58,10 +58,12 @@ func NewDP( svc: &services{m: svc}, internal: internal, internalIP: netip.MustParseAddr("198.51.100.1"), + Metrics: metrics, } if err := dp.SetKey(key); err != nil { panic(err) } + dp.initMetrics() return dp } diff --git a/router/metrics.go b/router/metrics.go index 71a09a03c0..df7af4c74f 100644 --- a/router/metrics.go +++ b/router/metrics.go @@ -15,8 +15,14 @@ package router import ( + "math/bits" + "strconv" + "strings" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/scionproto/scion/pkg/addr" ) // Metrics defines the data-plane metrics for the BR. @@ -39,8 +45,8 @@ type Metrics struct { SiblingBFDStateChanges *prometheus.CounterVec } -// NewMetrics initializes the metrics for the Border Router, and registers them -// with the default registry. +// NewMetrics initializes the metrics for the Border Router, and registers them with the default +// registry. func NewMetrics() *Metrics { return &Metrics{ ProcessedPackets: promauto.NewCounterVec( @@ -48,42 +54,42 @@ func NewMetrics() *Metrics { Name: "router_processed_pkts_total", Help: "Total number of packets processed by the processor", }, - []string{"interface", "isd_as", "neighbor_isd_as"}, + []string{"interface", "isd_as", "neighbor_isd_as", "sizeclass"}, ), InputBytesTotal: promauto.NewCounterVec( prometheus.CounterOpts{ Name: "router_input_bytes_total", Help: "Total number of bytes received", }, - []string{"interface", "isd_as", "neighbor_isd_as"}, + []string{"interface", "isd_as", "neighbor_isd_as", "sizeclass"}, ), OutputBytesTotal: promauto.NewCounterVec( prometheus.CounterOpts{ Name: "router_output_bytes_total", Help: "Total number of bytes sent.", }, - []string{"interface", "isd_as", "neighbor_isd_as"}, + []string{"interface", "isd_as", "neighbor_isd_as", "sizeclass", "type"}, ), InputPacketsTotal: promauto.NewCounterVec( prometheus.CounterOpts{ Name: "router_input_pkts_total", Help: "Total number of packets received", }, - []string{"interface", "isd_as", "neighbor_isd_as"}, + []string{"interface", "isd_as", "neighbor_isd_as", "sizeclass"}, ), OutputPacketsTotal: promauto.NewCounterVec( prometheus.CounterOpts{ Name: "router_output_pkts_total", Help: "Total number of packets sent.", }, - []string{"interface", "isd_as", "neighbor_isd_as"}, + []string{"interface", "isd_as", "neighbor_isd_as", "sizeclass", "type"}, ), DroppedPacketsTotal: promauto.NewCounterVec( prometheus.CounterOpts{ Name: "router_dropped_pkts_total", Help: "Total number of packets dropped by the router.", }, - []string{"interface", "isd_as", "neighbor_isd_as", "reason"}, + []string{"interface", "isd_as", "neighbor_isd_as", "sizeclass", "reason"}, ), InterfaceUp: promauto.NewGaugeVec( prometheus.GaugeOpts{ @@ -159,3 +165,213 @@ func NewMetrics() *Metrics { ), } } + +// trafficType labels traffic as being of either of the following types: in, out, inTransit, +// outTransit, brTransit. inTransit or outTransit means that traffic is crossing the local AS via +// two routers. If the router being observed is the one receiving the packet from the outside, then +// the type is inTransit; else it is outTransit. brTransit means that traffic is crossing only the +// observed router. Non-scion traffic or somehow malformed traffic has type Other. +type trafficType uint8 + +const ( + ttOther trafficType = iota + ttIn + ttOut + ttInTransit + ttOutTransit + ttBrTransit + ttMax +) + +// Returns a human-friendly representation of the given traffic type. +func (t trafficType) String() string { + switch t { + case ttIn: + return "in" + case ttOut: + return "out" + case ttInTransit: + return "in_transit" + case ttOutTransit: + return "out_transit" + case ttBrTransit: + return "br_transit" + } + return "other" +} + +// sizeClass is the number of bits needed to represent some given size. This is quicker than +// computing Log2 and serves the same purpose. +type sizeClass uint8 + +// maxSizeClass is the smallest NOT-supported sizeClass. This must be enough to support the largest +// valid packet size (defined by bufSize). Since this must be a constant (to allow efficient +// fixed-sized arrays), we have to assert it's large enough for bufSize. Just in case we do get +// packets larger than bufSize, they are simply put in the last class. +const maxSizeClass sizeClass = 15 + +// This will failto compile if bufSize cannot fit in (maxSizeClass - 1) bits. +const _ = uint(1<<(maxSizeClass-1) - 1 - bufSize) + +// minSizeClass is the smallest sizeClass that we care about. +// All smaller classes are conflated with this one. +const minSizeClass sizeClass = 6 + +func classOfSize(pktSize int) sizeClass { + cs := sizeClass(bits.Len32(uint32(pktSize))) + if cs > maxSizeClass-1 { + return maxSizeClass - 1 + } + if cs <= minSizeClass { + return minSizeClass + } + return cs +} + +// Returns a human-friendly representation of the given size class. Avoid bracket notation to make +// the values possibly easier to use in monitoring queries. +func (sc sizeClass) String() string { + low := strconv.Itoa((1 << sc) >> 1) + high := strconv.Itoa((1 << sc) - 1) + if sc == minSizeClass { + low = "0" + } + if sc == maxSizeClass { + high = "inf" + } + + return strings.Join([]string{low, high}, "_") +} + +// interfaceMetrics is the set of metrics that are relevant for one given interface. It is a map +// that associates each (traffic-type, size-class) pair with the set of metrics belonging to that +// interface that have these label values. This set of metrics is itself a trafficMetric structure. +// Explanation: Metrics are labeled by interface, local-as, neighbor-as, packet size, and (for +// output metrics only) traffic type. Instances are grouped in a hierarchical manner for efficient +// access by the using code. forwardingMetrics is a map of interface to interfaceMetrics. To access +// a specific InputPacketsTotal counter, one refers to: +// +// dataplane.forwardingMetrics[interface][size-class]. +// +// trafficMetrics.Output is an array of outputMetrics indexed by traffic type. +type interfaceMetrics map[sizeClass]trafficMetrics + +// trafficMetrics groups all the metrics instances that all share the same interface AND +// sizeClass label values (but have different names - i.e. they count different things). +type trafficMetrics struct { + InputBytesTotal prometheus.Counter + InputPacketsTotal prometheus.Counter + DroppedPacketsInvalid prometheus.Counter + DroppedPacketsBusyProcessor prometheus.Counter + DroppedPacketsBusyForwarder prometheus.Counter + DroppedPacketsBusySlowPath prometheus.Counter + ProcessedPackets prometheus.Counter + Output [ttMax]outputMetrics +} + +// outputMetrics groups all the metrics about traffic that has reached the output stage. Metrics +// instances in each of these all have the same interface AND sizeClass AND trafficType label +// values. +type outputMetrics struct { + OutputBytesTotal prometheus.Counter + OutputPacketsTotal prometheus.Counter +} + +func newInterfaceMetrics( + metrics *Metrics, + id uint16, + localIA addr.IA, + neighbors map[uint16]addr.IA) interfaceMetrics { + + ifLabels := interfaceLabels(0, localIA, neighbors) + m := interfaceMetrics{} + for sc := minSizeClass; sc < maxSizeClass; sc++ { + scLabels := prometheus.Labels{"sizeclass": sc.String()} + m[sc] = newTrafficMetrics(metrics, ifLabels, scLabels) + } + return m +} + +func newTrafficMetrics( + metrics *Metrics, + ifLabels prometheus.Labels, + scLabels prometheus.Labels) trafficMetrics { + + c := trafficMetrics{ + InputBytesTotal: metrics.InputBytesTotal.MustCurryWith(ifLabels).With(scLabels), + InputPacketsTotal: metrics.InputPacketsTotal.MustCurryWith(ifLabels).With(scLabels), + ProcessedPackets: metrics.ProcessedPackets.MustCurryWith(ifLabels).With(scLabels), + } + + // Output metrics have the extra "trafficType" label. + for t := ttOther; t < ttMax; t++ { + ttLabels := prometheus.Labels{"type": t.String()} + c.Output[t] = newOutputMetrics(metrics, ifLabels, scLabels, ttLabels) + } + + // Dropped metrics have the extra "Reason" label. + reasonMap := map[string]string{} + + reasonMap["reason"] = "invalid" + c.DroppedPacketsInvalid = + metrics.DroppedPacketsTotal.MustCurryWith(ifLabels).MustCurryWith(scLabels).With(reasonMap) + + reasonMap["reason"] = "busy_processor" + c.DroppedPacketsBusyProcessor = + metrics.DroppedPacketsTotal.MustCurryWith(ifLabels).MustCurryWith(scLabels).With(reasonMap) + + reasonMap["reason"] = "busy_forwarder" + c.DroppedPacketsBusyForwarder = + metrics.DroppedPacketsTotal.MustCurryWith(ifLabels).MustCurryWith(scLabels).With(reasonMap) + + reasonMap["reason"] = "busy_slow_path" + c.DroppedPacketsBusySlowPath = + metrics.DroppedPacketsTotal.MustCurryWith(ifLabels).MustCurryWith(scLabels).With(reasonMap) + + c.InputBytesTotal.Add(0) + c.InputPacketsTotal.Add(0) + c.DroppedPacketsInvalid.Add(0) + c.DroppedPacketsBusyProcessor.Add(0) + c.DroppedPacketsBusyForwarder.Add(0) + c.DroppedPacketsBusySlowPath.Add(0) + c.ProcessedPackets.Add(0) + return c +} + +func newOutputMetrics( + metrics *Metrics, + ifLabels prometheus.Labels, + scLabels prometheus.Labels, + ttLabels prometheus.Labels) outputMetrics { + + om := outputMetrics{} + om.OutputBytesTotal = + metrics.OutputBytesTotal.MustCurryWith(ifLabels).MustCurryWith(scLabels).With(ttLabels) + om.OutputPacketsTotal = + metrics.OutputPacketsTotal.MustCurryWith(ifLabels).MustCurryWith(scLabels).With(ttLabels) + om.OutputBytesTotal.Add(0) + om.OutputPacketsTotal.Add(0) + return om +} + +func interfaceLabels(id uint16, localIA addr.IA, neighbors map[uint16]addr.IA) prometheus.Labels { + if id == 0 { + return prometheus.Labels{ + "isd_as": localIA.String(), + "interface": "internal", + "neighbor_isd_as": localIA.String(), + } + } + return prometheus.Labels{ + "isd_as": localIA.String(), + "interface": strconv.FormatUint(uint64(id), 10), + "neighbor_isd_as": neighbors[id].String(), + } +} + +func serviceLabels(localIA addr.IA, svc addr.SVC) prometheus.Labels { + return prometheus.Labels{ + "isd_as": localIA.String(), + "service": svc.BaseString(), + } +} diff --git a/tools/topology/monitoring.py b/tools/topology/monitoring.py index 60ac56007e..930a7b5210 100644 --- a/tools/topology/monitoring.py +++ b/tools/topology/monitoring.py @@ -69,6 +69,9 @@ class MonitoringGenerator(object): "Sciond": "SD", "Dispatcher": "dispatcher", } + JOB_METRIC_RELABEL = { + # "BR": "" + } def __init__(self, args): """ @@ -127,10 +130,14 @@ def _write_config_files(self, config_dict): def _write_config_file(self, config_path, job_dict): scrape_configs = [] for job_name, file_paths in job_dict.items(): - scrape_configs.append({ + job_scrape_config = { 'job_name': job_name, 'file_sd_configs': [{'files': file_paths}], - }) + } + relabels = self.JOB_METRIC_RELABEL.get(job_name) + if relabels is not None: + job_scrape_config['metric_relabel_configs'] = relabels + scrape_configs.append(job_scrape_config) config = { 'global': { 'scrape_interval': '1s', @@ -164,7 +171,7 @@ def _write_dc_file(self): 'version': DOCKER_COMPOSE_CONFIG_VERSION, 'services': { 'prometheus': { - 'image': 'prom/prometheus:v2.6.0', + 'image': 'prom/prometheus:v2.47.2', 'container_name': name+'prometheus', 'network_mode': 'host', 'volumes': [