Skip to content

Commit

Permalink
feat(cosmic-swingset): Split inbound queue length metrics by queue name
Browse files Browse the repository at this point in the history
Fixes #10900
  • Loading branch information
gibson042 committed Jan 29, 2025
1 parent 21a47b9 commit e8b99bc
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 55 deletions.
207 changes: 163 additions & 44 deletions packages/cosmic-swingset/src/kernel-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import {
View,
} from '@opentelemetry/sdk-metrics';

import { Fail } from '@endo/errors';
import { isNat } from '@endo/nat';

import { makeLegacyMap } from '@agoric/store';

import {
Expand All @@ -19,6 +22,8 @@ import process from 'node:process';

/** @import {Histogram, Meter as OTelMeter, MetricAttributes} from '@opentelemetry/api' */

/** @import {TotalMap} from '@agoric/internal' */

// import { diag, DiagConsoleLogger, DiagLogLevel } from '@opentelemetry/api';

// diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.VERBOSE);
Expand Down Expand Up @@ -72,6 +77,44 @@ const HISTOGRAM_METRICS = /** @type {const} */ ({
},
});

/** @enum {(typeof QueueMetricAspect)[keyof typeof QueueMetricAspect]} */
const QueueMetricAspect = /** @type {const} */ ({
Length: 'length',
IncrementCount: 'increments',
DecrementCount: 'decrements',
});

/**
* Queue metrics come in {length,add,remove} triples sharing a common prefix.
*
* @param {string} namePrefix
* @param {string} descPrefix
* @returns {Record<string, {aspect: QueueMetricAspect, description: string}>}
*/
const makeQueueMetrics = (namePrefix, descPrefix) => {
/** @type {Array<[QueueMetricAspect, string, string]>} */
const metricsMeta = [
[QueueMetricAspect.Length, 'length', 'length'],
[QueueMetricAspect.IncrementCount, 'add', 'increments'],
[QueueMetricAspect.DecrementCount, 'remove', 'decrements'],
];
const entries = metricsMeta.map(([aspect, nameSuffix, descSuffix]) => {
const name = `${namePrefix}_${nameSuffix}`;
const description = `${descPrefix} ${descSuffix}`;
return [name, { aspect, description }];
});
return Object.fromEntries(entries);
};

const QUEUE_METRICS = harden({
// "cosmic_swingset_inbound_queue_{length,add,remove}" measurements carry a
// "queue" attribute.
// Future OpenTelemetry SDKs should support expressing that in Instrument
// creation:
// https://opentelemetry.io/docs/specs/otel/metrics/api/#instrument-advisory-parameter-attributes
...makeQueueMetrics('cosmic_swingset_inbound_queue', 'inbound queue'),
});

const wrapDeltaMS = (finisher, useDeltaMS) => {
const startMS = Date.now();
return (...finishArgs) => {
Expand Down Expand Up @@ -239,59 +282,148 @@ export function makeSlogCallbacks({ metricMeter, attributes = {} }) {
return harden(slogCallbacks);
}

/**
* @template {string} QueueName
* @typedef InboundQueueMetricsManager
* @property {(newLengths: Record<QueueName, number>) => void} updateLengths
* @property {(queueName: QueueName, delta?: number) => void} decStat
* @property {() => Record<string, number>} getStats
*/

/**
* Create a metrics manager for inbound queues. It must be initialized with the
* length from durable storage and informed of each subsequent change so that
* metrics can be provided from RAM.
* length of each queue and informed of each subsequent change so that metrics
* can be provided from RAM.
*
* Note that the add/remove counts will get reset at restart, but
* Prometheus/etc tools can tolerate that just fine.
*
* @param {number} initialLength
* @template {string} QueueName
* @param {OTelMeter} metricMeter
* @param {Record<QueueName, number>} initialLengths per-queue
* @param {Console} logger
* @returns {InboundQueueMetricsManager<QueueName>}
*/
export function makeInboundQueueMetrics(initialLength) {
let length = initialLength;
let add = 0;
let remove = 0;
function makeInboundQueueMetrics(metricMeter, initialLengths, logger) {
const initialEntries = Object.entries(initialLengths);
const zeroEntries = initialEntries.map(([queueName]) => [queueName, 0]);
const makeQueueCounts = entries => {
for (const [queueName, length] of entries) {
isNat(length) ||
Fail`invalid initial length for queue ${queueName}: ${length}`;
}
return /** @type {TotalMap<string, number>} */ (new Map(entries));
};
/**
* For each {length,increment count,decrement count} aspect (each such aspect
* corresponding to a single OpenTelemetry Instrument), keep a map of values
* keyed by queue name (each corresponding to a value of Attribute "queue").
*
* @type {Record<QueueMetricAspect, TotalMap<string, number>>}
*/
const counterData = {
[QueueMetricAspect.Length]: makeQueueCounts(initialEntries),
[QueueMetricAspect.IncrementCount]: makeQueueCounts(zeroEntries),
[QueueMetricAspect.DecrementCount]: makeQueueCounts(zeroEntries),
};

// In the event of misconfigured reporting for an unknown queue, accept the
// data with a warning rather than either ignore it or halt the chain.
const provideQueue = queueName => {
if (counterData[QueueMetricAspect.Length].has(queueName)) return;
logger.warn(`unknown inbound queue ${JSON.stringify(queueName)}`);
for (const [aspect, map] of Object.entries(counterData)) {
const old = map.get(queueName);
old === undefined ||
Fail`internal: unexpected preexisting ${aspect}=${old} data for late queue ${queueName}`;
map.set(queueName, 0);
}
};

const nudge = (map, queueName, delta) => {
const old = map.get(queueName);
old !== undefined ||
Fail`internal: unexpected missing data for queue ${queueName}`;
map.set(queueName, old + delta);
};

// Wire up callbacks for reporting the OpenTelemetry measurements:
// queue length is an UpDownCounter, while increment and decrement counts are
// [monotonic] Counters.
// But note that the Prometheus representation of the former will be a Gauge:
// https://prometheus.io/docs/concepts/metric_types/
for (const [name, { aspect, description }] of Object.entries(QUEUE_METRICS)) {
const isMonotonic = aspect !== QueueMetricAspect.Length;
const instrumentOptions = { description };
const asyncInstrument = isMonotonic
? metricMeter.createObservableCounter(name, instrumentOptions)
: metricMeter.createObservableUpDownCounter(name, instrumentOptions);
asyncInstrument.addCallback(observer => {
for (const [queueName, value] of counterData[aspect].entries()) {
observer.observe(value, { queue: queueName });
}
});
}

return harden({
updateLength: newLength => {
const delta = newLength - length;
length = newLength;
if (delta > 0) {
add += delta;
} else {
remove -= delta;
updateLengths: newLengths => {
for (const [queueName, newLength] of Object.entries(newLengths)) {
provideQueue(queueName);
isNat(newLength) ||
Fail`invalid length for queue ${queueName}: ${newLength}`;
const oldLength = counterData[QueueMetricAspect.Length].get(queueName);
counterData[QueueMetricAspect.Length].set(queueName, newLength);
if (newLength > oldLength) {
const map = counterData[QueueMetricAspect.IncrementCount];
nudge(map, queueName, newLength - oldLength);
} else if (newLength < oldLength) {
const map = counterData[QueueMetricAspect.DecrementCount];
nudge(map, queueName, oldLength - newLength);
}
}
},

decStat: (delta = 1) => {
length -= delta;
remove += delta;
decStat: (queueName, delta = 1) => {
provideQueue(queueName);
isNat(delta) || Fail`invalid decStat for queue ${queueName}: ${delta}`;
nudge(counterData[QueueMetricAspect.Length], queueName, -delta);
nudge(counterData[QueueMetricAspect.DecrementCount], queueName, delta);
},

getStats: () => ({
cosmic_swingset_inbound_queue_length: length,
cosmic_swingset_inbound_queue_add: add,
cosmic_swingset_inbound_queue_remove: remove,
}),
getStats: () => {
// For each [length,add,remove] metric name, emit both a
// per-queue-name count and a pre-aggregated sum over all queue names
// (the latter is necessary for backwards compatibility until all old
// consumers of e.g. slog entries have been updated).
const entries = [];
for (const [name, { aspect }] of Object.entries(QUEUE_METRICS)) {
let sum = 0;
for (const [queueName, value] of counterData[aspect].entries()) {
sum += value;
entries.push([`${name}_${queueName}`, value]);
}
entries.push([name, sum]);
}
return Object.fromEntries(entries);
},
});
}

/**
* @template {string} QueueName
* @param {object} config
* @param {any} config.controller
* @param {OTelMeter} config.metricMeter
* @param {Console} config.log
* @param {MetricAttributes} [config.attributes]
* @param {ReturnType<typeof makeInboundQueueMetrics>} [config.inboundQueueMetrics]
* @param {Record<QueueName, number>} [config.initialQueueLengths] per-queue
*/
export function exportKernelStats({
controller,
metricMeter,
log = console,
attributes = {},
inboundQueueMetrics,
initialQueueLengths = /** @type {any} */ ({}),
}) {
const kernelStatsMetrics = new Set();
const kernelStatsCounters = new Map();
Expand Down Expand Up @@ -356,26 +488,12 @@ export function exportKernelStats({
kernelStatsMetrics.add(key);
}

if (inboundQueueMetrics) {
// These are not kernelStatsMetrics, they're outside the kernel.
for (const name of ['length', 'add', 'remove']) {
const key = `cosmic_swingset_inbound_queue_${name}`;
const options = {
description: `inbound queue ${name}`,
};
const counter =
name === 'length'
? metricMeter.createObservableUpDownCounter(key, options)
: metricMeter.createObservableCounter(key, options);

counter.addCallback(observableResult => {
observableResult.observe(
inboundQueueMetrics.getStats()[key],
attributes,
);
});
}
}
// These are not kernelStatsMetrics, they're outside the kernel.
const inboundQueueMetrics = makeInboundQueueMetrics(
metricMeter,
initialQueueLengths,
log,
);

// TODO: We probably shouldn't roll our own Node.js process metrics, but a
// cursory search for "opentelemetry node.js VM instrumentation" didn't reveal
Expand Down Expand Up @@ -466,6 +584,7 @@ export function exportKernelStats({

return {
crankScheduler,
inboundQueueMetrics,
schedulerCrankTimeHistogram,
schedulerBlockTimeHistogram,
};
Expand Down
28 changes: 17 additions & 11 deletions packages/cosmic-swingset/src/launch-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import { fileURLToPath } from 'url';

import {
makeDefaultMeterProvider,
makeInboundQueueMetrics,
exportKernelStats,
makeSlogCallbacks,
} from './kernel-stats.js';
Expand Down Expand Up @@ -97,6 +96,8 @@ const parseUpgradePlanInfo = (upgradePlan, prefix = '') => {
* - cleanup: for dealing with data from terminated vats
*/

/** @typedef {Extract<CrankerPhase, 'queued' | 'high-priority' | 'forced'>} InboundQueueName */

/** @type {CrankerPhase} */
const CLEANUP = 'cleanup';

Expand Down Expand Up @@ -466,15 +467,17 @@ export async function launch({
? parseInt(env.END_BLOCK_SPIN_MS, 10)
: 0;

const inboundQueueMetrics = makeInboundQueueMetrics(
actionQueue.size() + highPriorityQueue.size(),
);
const { crankScheduler } = exportKernelStats({
const initialQueueLengths = /** @type {Record<InboundQueueName, number>} */ ({
queued: actionQueue.size(),
'high-priority': highPriorityQueue.size(),
forced: runThisBlock.size(),
});
const { crankScheduler, inboundQueueMetrics } = exportKernelStats({
controller,
metricMeter,
// @ts-expect-error Type 'Logger<BaseLevels>' is not assignable to type 'Console'.
log: console,
inboundQueueMetrics,
initialQueueLengths,
});

/**
Expand Down Expand Up @@ -734,13 +737,13 @@ export async function launch({
*
* @param {InboundQueue} inboundQueue
* @param {Cranker} runSwingset
* @param {CrankerPhase} phase
* @param {InboundQueueName} phase
*/
async function processActions(inboundQueue, runSwingset, phase) {
let keepGoing = true;
for await (const { action, context } of inboundQueue.consumeAll()) {
const inboundNum = `${context.blockHeight}-${context.txHash}-${context.msgIdx}`;
inboundQueueMetrics.decStat();
inboundQueueMetrics.decStat(phase);
countInboundAction(action.type);
await performAction(action, inboundNum);
keepGoing = await runSwingset(phase);
Expand Down Expand Up @@ -814,9 +817,12 @@ export async function launch({

// First, record new actions (bridge/mailbox/etc events that cosmos
// added up for delivery to swingset) into our inboundQueue metrics
inboundQueueMetrics.updateLength(
actionQueue.size() + highPriorityQueue.size() + runThisBlock.size(),
);
const newLengths = /** @type {Record<InboundQueueName, number>} */ ({
queued: actionQueue.size(),
'high-priority': highPriorityQueue.size(),
forced: runThisBlock.size(),
});
inboundQueueMetrics.updateLengths(newLengths);

// If we have work to complete this block, it needs to run to completion.
// It will also run to completion any work that swingset still had pending.
Expand Down

0 comments on commit e8b99bc

Please sign in to comment.