diff --git a/packages/cosmic-swingset/src/kernel-stats.js b/packages/cosmic-swingset/src/kernel-stats.js index 9faac70cbb9..d1d24df5001 100644 --- a/packages/cosmic-swingset/src/kernel-stats.js +++ b/packages/cosmic-swingset/src/kernel-stats.js @@ -5,6 +5,9 @@ import { View, } from '@opentelemetry/sdk-metrics'; +import { Fail } from '@endo/errors'; +import { isNat } from '@endo/nat'; + import { makeLegacyMap } from '@agoric/store'; import { @@ -19,6 +22,8 @@ import process from 'node:process'; /** @import {Histogram, Meter as OTelMeter, MetricAttributes} from '@opentelemetry/api' */ +/** @import {TotalMap} from '@agoric/internal' */ + // import { diag, DiagConsoleLogger, DiagLogLevel } from '@opentelemetry/api'; // diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.VERBOSE); @@ -72,6 +77,44 @@ const HISTOGRAM_METRICS = /** @type {const} */ ({ }, }); +/** @enum {(typeof QueueMetricAspect)[keyof typeof QueueMetricAspect]} */ +const QueueMetricAspect = /** @type {const} */ ({ + Length: 'length', + IncrementCount: 'increments', + DecrementCount: 'decrements', +}); + +/** + * Queue metrics come in {length,add,remove} triples sharing a common prefix. + * + * @param {string} namePrefix + * @param {string} descPrefix + * @returns {Record} + */ +const makeQueueMetrics = (namePrefix, descPrefix) => { + /** @type {Array<[QueueMetricAspect, string, string]>} */ + const metricsMeta = [ + [QueueMetricAspect.Length, 'length', 'length'], + [QueueMetricAspect.IncrementCount, 'add', 'increments'], + [QueueMetricAspect.DecrementCount, 'remove', 'decrements'], + ]; + const entries = metricsMeta.map(([aspect, nameSuffix, descSuffix]) => { + const name = `${namePrefix}_${nameSuffix}`; + const description = `${descPrefix} ${descSuffix}`; + return [name, { aspect, description }]; + }); + return Object.fromEntries(entries); +}; + +const QUEUE_METRICS = harden({ + // "cosmic_swingset_inbound_queue_{length,add,remove}" measurements carry a + // "queue" attribute. + // Future OpenTelemetry SDKs should support expressing that in Instrument + // creation: + // https://opentelemetry.io/docs/specs/otel/metrics/api/#instrument-advisory-parameter-attributes + ...makeQueueMetrics('cosmic_swingset_inbound_queue', 'inbound queue'), +}); + const wrapDeltaMS = (finisher, useDeltaMS) => { const startMS = Date.now(); return (...finishArgs) => { @@ -239,59 +282,148 @@ export function makeSlogCallbacks({ metricMeter, attributes = {} }) { return harden(slogCallbacks); } +/** + * @template {string} QueueName + * @typedef InboundQueueMetricsManager + * @property {(newLengths: Record) => void} updateLengths + * @property {(queueName: QueueName, delta?: number) => void} decStat + * @property {() => Record} getStats + */ + /** * Create a metrics manager for inbound queues. It must be initialized with the - * length from durable storage and informed of each subsequent change so that - * metrics can be provided from RAM. + * length of each queue and informed of each subsequent change so that metrics + * can be provided from RAM. * * Note that the add/remove counts will get reset at restart, but * Prometheus/etc tools can tolerate that just fine. * - * @param {number} initialLength + * @template {string} QueueName + * @param {OTelMeter} metricMeter + * @param {Record} initialLengths per-queue + * @param {Console} logger + * @returns {InboundQueueMetricsManager} */ -export function makeInboundQueueMetrics(initialLength) { - let length = initialLength; - let add = 0; - let remove = 0; +function makeInboundQueueMetrics(metricMeter, initialLengths, logger) { + const initialEntries = Object.entries(initialLengths); + const zeroEntries = initialEntries.map(([queueName]) => [queueName, 0]); + const makeQueueCounts = entries => { + for (const [queueName, length] of entries) { + isNat(length) || + Fail`invalid initial length for queue ${queueName}: ${length}`; + } + return /** @type {TotalMap} */ (new Map(entries)); + }; + /** + * For each {length,increment count,decrement count} aspect (each such aspect + * corresponding to a single OpenTelemetry Instrument), keep a map of values + * keyed by queue name (each corresponding to a value of Attribute "queue"). + * + * @type {Record>} + */ + const counterData = { + [QueueMetricAspect.Length]: makeQueueCounts(initialEntries), + [QueueMetricAspect.IncrementCount]: makeQueueCounts(zeroEntries), + [QueueMetricAspect.DecrementCount]: makeQueueCounts(zeroEntries), + }; + + // In the event of misconfigured reporting for an unknown queue, accept the + // data with a warning rather than either ignore it or halt the chain. + const provideQueue = queueName => { + if (counterData[QueueMetricAspect.Length].has(queueName)) return; + logger.warn(`unknown inbound queue ${JSON.stringify(queueName)}`); + for (const [aspect, map] of Object.entries(counterData)) { + const old = map.get(queueName); + old === undefined || + Fail`internal: unexpected preexisting ${aspect}=${old} data for late queue ${queueName}`; + map.set(queueName, 0); + } + }; + + const nudge = (map, queueName, delta) => { + const old = map.get(queueName); + old !== undefined || + Fail`internal: unexpected missing data for queue ${queueName}`; + map.set(queueName, old + delta); + }; + + // Wire up callbacks for reporting the OpenTelemetry measurements: + // queue length is an UpDownCounter, while increment and decrement counts are + // [monotonic] Counters. + // But note that the Prometheus representation of the former will be a Gauge: + // https://prometheus.io/docs/concepts/metric_types/ + for (const [name, { aspect, description }] of Object.entries(QUEUE_METRICS)) { + const isMonotonic = aspect !== QueueMetricAspect.Length; + const instrumentOptions = { description }; + const asyncInstrument = isMonotonic + ? metricMeter.createObservableCounter(name, instrumentOptions) + : metricMeter.createObservableUpDownCounter(name, instrumentOptions); + asyncInstrument.addCallback(observer => { + for (const [queueName, value] of counterData[aspect].entries()) { + observer.observe(value, { queue: queueName }); + } + }); + } return harden({ - updateLength: newLength => { - const delta = newLength - length; - length = newLength; - if (delta > 0) { - add += delta; - } else { - remove -= delta; + updateLengths: newLengths => { + for (const [queueName, newLength] of Object.entries(newLengths)) { + provideQueue(queueName); + isNat(newLength) || + Fail`invalid length for queue ${queueName}: ${newLength}`; + const oldLength = counterData[QueueMetricAspect.Length].get(queueName); + counterData[QueueMetricAspect.Length].set(queueName, newLength); + if (newLength > oldLength) { + const map = counterData[QueueMetricAspect.IncrementCount]; + nudge(map, queueName, newLength - oldLength); + } else if (newLength < oldLength) { + const map = counterData[QueueMetricAspect.DecrementCount]; + nudge(map, queueName, oldLength - newLength); + } } }, - decStat: (delta = 1) => { - length -= delta; - remove += delta; + decStat: (queueName, delta = 1) => { + provideQueue(queueName); + isNat(delta) || Fail`invalid decStat for queue ${queueName}: ${delta}`; + nudge(counterData[QueueMetricAspect.Length], queueName, -delta); + nudge(counterData[QueueMetricAspect.DecrementCount], queueName, delta); }, - getStats: () => ({ - cosmic_swingset_inbound_queue_length: length, - cosmic_swingset_inbound_queue_add: add, - cosmic_swingset_inbound_queue_remove: remove, - }), + getStats: () => { + // For each [length,add,remove] metric name, emit both a + // per-queue-name count and a pre-aggregated sum over all queue names + // (the latter is necessary for backwards compatibility until all old + // consumers of e.g. slog entries have been updated). + const entries = []; + for (const [name, { aspect }] of Object.entries(QUEUE_METRICS)) { + let sum = 0; + for (const [queueName, value] of counterData[aspect].entries()) { + sum += value; + entries.push([`${name}_${queueName}`, value]); + } + entries.push([name, sum]); + } + return Object.fromEntries(entries); + }, }); } /** + * @template {string} QueueName * @param {object} config * @param {any} config.controller * @param {OTelMeter} config.metricMeter * @param {Console} config.log * @param {MetricAttributes} [config.attributes] - * @param {ReturnType} [config.inboundQueueMetrics] + * @param {Record} [config.initialQueueLengths] per-queue */ export function exportKernelStats({ controller, metricMeter, log = console, attributes = {}, - inboundQueueMetrics, + initialQueueLengths = /** @type {any} */ ({}), }) { const kernelStatsMetrics = new Set(); const kernelStatsCounters = new Map(); @@ -356,26 +488,12 @@ export function exportKernelStats({ kernelStatsMetrics.add(key); } - if (inboundQueueMetrics) { - // These are not kernelStatsMetrics, they're outside the kernel. - for (const name of ['length', 'add', 'remove']) { - const key = `cosmic_swingset_inbound_queue_${name}`; - const options = { - description: `inbound queue ${name}`, - }; - const counter = - name === 'length' - ? metricMeter.createObservableUpDownCounter(key, options) - : metricMeter.createObservableCounter(key, options); - - counter.addCallback(observableResult => { - observableResult.observe( - inboundQueueMetrics.getStats()[key], - attributes, - ); - }); - } - } + // These are not kernelStatsMetrics, they're outside the kernel. + const inboundQueueMetrics = makeInboundQueueMetrics( + metricMeter, + initialQueueLengths, + log, + ); // TODO: We probably shouldn't roll our own Node.js process metrics, but a // cursory search for "opentelemetry node.js VM instrumentation" didn't reveal @@ -466,6 +584,7 @@ export function exportKernelStats({ return { crankScheduler, + inboundQueueMetrics, schedulerCrankTimeHistogram, schedulerBlockTimeHistogram, }; diff --git a/packages/cosmic-swingset/src/launch-chain.js b/packages/cosmic-swingset/src/launch-chain.js index 73f090bf64f..1f8d5108c3b 100644 --- a/packages/cosmic-swingset/src/launch-chain.js +++ b/packages/cosmic-swingset/src/launch-chain.js @@ -38,7 +38,6 @@ import { fileURLToPath } from 'url'; import { makeDefaultMeterProvider, - makeInboundQueueMetrics, exportKernelStats, makeSlogCallbacks, } from './kernel-stats.js'; @@ -97,6 +96,8 @@ const parseUpgradePlanInfo = (upgradePlan, prefix = '') => { * - cleanup: for dealing with data from terminated vats */ +/** @typedef {Extract} InboundQueueName */ + /** @type {CrankerPhase} */ const CLEANUP = 'cleanup'; @@ -466,15 +467,17 @@ export async function launch({ ? parseInt(env.END_BLOCK_SPIN_MS, 10) : 0; - const inboundQueueMetrics = makeInboundQueueMetrics( - actionQueue.size() + highPriorityQueue.size(), - ); - const { crankScheduler } = exportKernelStats({ + const initialQueueLengths = /** @type {Record} */ ({ + queued: actionQueue.size(), + 'high-priority': highPriorityQueue.size(), + forced: runThisBlock.size(), + }); + const { crankScheduler, inboundQueueMetrics } = exportKernelStats({ controller, metricMeter, // @ts-expect-error Type 'Logger' is not assignable to type 'Console'. log: console, - inboundQueueMetrics, + initialQueueLengths, }); /** @@ -734,13 +737,13 @@ export async function launch({ * * @param {InboundQueue} inboundQueue * @param {Cranker} runSwingset - * @param {CrankerPhase} phase + * @param {InboundQueueName} phase */ async function processActions(inboundQueue, runSwingset, phase) { let keepGoing = true; for await (const { action, context } of inboundQueue.consumeAll()) { const inboundNum = `${context.blockHeight}-${context.txHash}-${context.msgIdx}`; - inboundQueueMetrics.decStat(); + inboundQueueMetrics.decStat(phase); countInboundAction(action.type); await performAction(action, inboundNum); keepGoing = await runSwingset(phase); @@ -814,9 +817,12 @@ export async function launch({ // First, record new actions (bridge/mailbox/etc events that cosmos // added up for delivery to swingset) into our inboundQueue metrics - inboundQueueMetrics.updateLength( - actionQueue.size() + highPriorityQueue.size() + runThisBlock.size(), - ); + const newLengths = /** @type {Record} */ ({ + queued: actionQueue.size(), + 'high-priority': highPriorityQueue.size(), + forced: runThisBlock.size(), + }); + inboundQueueMetrics.updateLengths(newLengths); // If we have work to complete this block, it needs to run to completion. // It will also run to completion any work that swingset still had pending.