Filter for topk metrics #22608
asked this question in
Replies: 1 comment
I ended up implementing an efficient version of this in Lua. Here's the code in case anyone else finds it useful. I think the same algorithm would be worth implementing natively in Vector; I'd be happy to have a swing at that if there's interest! [transforms.requestlog_metrics]
type = "log_to_metric"
inputs = ["haproxy_requestlogs"]
# Count the number of requests by client IP
type = "counter"
field = "client_ip"
name = "haproxy:requests_from_top_clients"
tags.ip = "{{client_ip}}"
# Reduce Vector's overhead by aggregating summing all of the metric events once every few seconds so
# that it can just store the sum and not a full event per request. This is also crucial to the requestlog_top_clients
# transforms below because it allows per-second rates to be inferred from individual events
type = "aggregate"
inputs = ["requestlog_metrics"]
# Turn individual events into per-second counts.
# This interval is the amount of aggregation we do before looking for high aggregated counts as part of the
# top-k calculation below.
# Increasing this will be more taxing for Vector (since it needs more memory for more series in the aggregation
# window), but will provide a longer window for capturing high-volume traffic so that the top-k
# algorithm can prioritise more sustained traffic over short bursts.
# Decreasing this will be less taxing for Vector, but will bias the top-k algorithm more towards shorter
# spikes in traffic.
# 1 second is a nice number that will show high-rate clients clearly, isn't too expensive, and makes the
# resulting counts easy to reason about (since they're requests/second)
interval_ms = 1000
# Sort and keep only the top ~1000 IPs using some custom stateful lua heap magic since there's no built in
# transform for what I want.
type = "lua"
version = "2"
inputs = ["requestlog_aggregate"]
source = '''
-- Top-k (k = 1000) double-buffer algorithm using a min-heap.
-- This is designed for efficient stream processing so:
-- - event should be processed in <= O(log(k)) time
-- - the whole thing needs only O(k) memory
-- By design, if lots of new top-rate clients are coming online quickly, we
-- can burst and allow more than k metrics to be omitted within a given flush window,
-- but this should stablise quickly. Since we're using this for tracking top client
-- IPs for DDoS protection, responsiveness is a priority.
-- Relies on aggregation in the previous step (per-second seems ideal) so that we can
-- treat each the count on new event that comes in as a peak-rate value for use in
-- determining the top clients based on that peak rate.
-- The k limit is (approximately) enforced within each flush interval. This interval
-- also determines how long we remember about old high peak rates.
-- We maintain a current heap (for fast O(log(k)) checks before emitting each event)
-- and an old_map (taken the old heap after each flush) to ensure that we don't forget
-- about the old top IPs after each flush.
-- The flushing mechanism ensures that we forget about peak traffic after two flushes
-- and don't keep requiring new IPs to send more than the all-time peak.
k = 1000
-- Optimisation: don't even consider any new events below this size.
-- ie. below this per-batch (per second since aggregate interval_ms=1000) rate.
-- This stops us from pummeling prometheus with lots of different random metrics for
-- low rate clients that happen to be the highest in a given window and then disappear.
-- This should be low enough that we usually have some clients exceeding it every few
-- minutes — even during low traffic periods — otherwise we won't export any metrics to
-- prometheus during those periods and our absent-metric alerts will fire.
min_count = 10
-- Current heap: an array of elements, each element is { ip = <ip>, count = <number> }
-- Also maintain a hash mapping: ip -> index in the heap.
heap = {}
heap_index = {}
heap_size = 0
-- Old map: ip -> count from previous flush; used to preserve memory between rotations
old_map = {}
-- Track the lowest value from the old heap, starting with the minimum size limit
minimum_count_worth_considering = min_count
-- Helper: swap elements in the heap and update indices.
function swap(i, j)
heap[i], heap[j] = heap[j], heap[i]
heap_index[heap[i].ip] = i
heap_index[heap[j].ip] = j
-- Heapify-up from index i.
function heapify_up(i)
while i > 1 do
local parent = math.floor(i / 2)
if heap[i].count < heap[parent].count then
swap(i, parent)
i = parent
-- Heapify-down from index i.
function heapify_down(i)
while true do
local left = 2 * i
local right = 2 * i + 1
local smallest = i
if left <= heap_size and heap[left].count < heap[smallest].count then
smallest = left
if right <= heap_size and heap[right].count < heap[smallest].count then
smallest = right
if smallest ~= i then
swap(i, smallest)
i = smallest
-- Insert a new element into the heap.
function heap_insert(ip, count)
heap_size = heap_size + 1
heap[heap_size] = {ip = ip, count = count}
heap_index[ip] = heap_size
-- Remove the smallest (root) element from the heap.
function heap_remove_root()
local root = heap[1]
heap[1] = heap[heap_size]
heap_index[heap[1].ip] = 1
heap[heap_size] = nil
heap_index[root.ip] = nil
heap_size = heap_size - 1
return root
-- Update the count for an IP already in the heap.
function heap_update(ip, new_count)
local i = heap_index[ip]
if not i then return end
heap[i].count = new_count
-- Process each incoming event
function process_event(event, emit)
local count = event.metric.counter.value
if not count then return end
-- Only consider doing anything with this event if it's at least as large as the
-- smallest of the old IPs (since we don't want to include a bunch of tiny nonsense
-- just because we see it first). If there really is a new-normal low level of traffic from
-- now on, then we will start adding clients again in the next cycle.
if count < minimum_count_worth_considering then return end
local ip = event.metric.tags.ip
if not ip then return end
-- The logic below can decide to emit for multiple reasons, so track whether we
-- intend to emit using a flag
local will_emit = false
-- If IP is already in the current heap, update its count (using max semantics).
if heap_index[ip] then
if count > heap[heap_index[ip]].count then
heap_update(ip, count)
will_emit = true
-- If the IP is in the old map (but not in current), we still emit it
if old_map[ip] then
will_emit = true
-- The IP is new. If there is room in the heap, then insert it
if heap_size < k then
heap_insert(ip, count)
will_emit = true
-- Heap is full; compare against the smallest element.
if count > heap[1].count then
heap_insert(ip, count)
will_emit = true
-- If we decided to emit, then do it
if will_emit then
metric = {
name = "haproxy:requests_from_top_clients",
kind = "incremental",
tags = { ip = ip },
counter = { value = count }
-- We're done
-- Timer handler: flush the current heap and rotate buffers.
-- Note: this can't be pre-empted by process_event - all of the Lua is
-- single-threaded with locking around each function, so don't worry
-- about races.
function cycle_top_client_buffers()
-- If there was anything in the old heap
old_root = heap[1]
if old_root then
-- Use the smallest value from the old heap as the new minimum-value for consideration
-- so that we can adapt to increasing values and more efficiently discard low value
-- events in the next window.
minimum_count_worth_considering = old_root.count
-- We didn't find any events during this flush interval that were as big as the smallest
-- from the last interval - the last one must have been anomalously large.
-- Always decrease our threshold for inclusion by 10% for the next window. This lets us track
-- decreases in the top rate as well as increases. If we didn't do this, then the floor for
-- inclusion would increase monotonically.
-- This acts as an exponential adaptive threshold allowing us to quickly find a new floor as
-- traffic rates change over time.
minimum_count_worth_considering = math.ceil(minimum_count_worth_considering * 0.9)
-- Respect the hard-coded abolute min_count threshold
if minimum_count_worth_considering < min_count then
minimum_count_worth_considering = min_count
-- Rotate: copy current heap data into old_map
old_map = {}
for i = 1, heap_size do
local entry = heap[i]
old_map[entry.ip] = entry.count
-- Reset the current heap
heap = {}
heap_index = {}
heap_size = 0
hooks.process = "process_event"
# This interval governs over what window we enforce the (approximate) top-k limit and how long we remember
# about previous peaks as a benchmark for new clients. Should be several times larger than aggregate interval.
# Metrics from new very high peak rate clients will be included even if we've included 1000 clients in this
# interval, so longer is better and won't harm responsiveness.
interval_seconds = 60
handler = "cycle_top_client_buffers" Here's just the Lua for better syntax highlighting: -- Top-k (k = 1000) double-buffer algorithm using a min-heap.
-- This is designed for efficient stream processing so:
-- - event should be processed in <= O(log(k)) time
-- - the whole thing needs only O(k) memory
-- By design, if lots of new top-rate clients are coming online quickly, we
-- can burst and allow more than k metrics to be omitted within a given flush window,
-- but this should stablise quickly. Since we're using this for tracking top client
-- IPs for DDoS protection, responsiveness is a priority.
-- Relies on aggregation in the previous step (per-second seems ideal) so that we can
-- treat each the count on new event that comes in as a peak-rate value for use in
-- determining the top clients based on that peak rate.
-- The k limit is (approximately) enforced within each flush interval. This interval
-- also determines how long we remember about old high peak rates.
-- We maintain a current heap (for fast O(log(k)) checks before emitting each event)
-- and an old_map (taken the old heap after each flush) to ensure that we don't forget
-- about the old top IPs after each flush.
-- The flushing mechanism ensures that we forget about peak traffic after two flushes
-- and don't keep requiring new IPs to send more than the all-time peak.
k = 1000
-- Optimisation: don't even consider any new events below this size.
-- ie. below this per-batch (per second since aggregate interval_ms=1000) rate.
-- This stops us from pummeling prometheus with lots of different random metrics for
-- low rate clients that happen to be the highest in a given window and then disappear.
-- This should be low enough that we usually have some clients exceeding it every few
-- minutes — even during low traffic periods — otherwise we won't export any metrics to
-- prometheus during those periods and our absent-metric alerts will fire.
min_count = 10
-- Current heap: an array of elements, each element is { ip = <ip>, count = <number> }
-- Also maintain a hash mapping: ip -> index in the heap.
heap = {}
heap_index = {}
heap_size = 0
-- Old map: ip -> count from previous flush; used to preserve memory between rotations
old_map = {}
-- Track the lowest value from the old heap, starting with the minimum size limit
minimum_count_worth_considering = min_count
-- Helper: swap elements in the heap and update indices.
function swap(i, j)
heap[i], heap[j] = heap[j], heap[i]
heap_index[heap[i].ip] = i
heap_index[heap[j].ip] = j
-- Heapify-up from index i.
function heapify_up(i)
while i > 1 do
local parent = math.floor(i / 2)
if heap[i].count < heap[parent].count then
swap(i, parent)
i = parent
-- Heapify-down from index i.
function heapify_down(i)
while true do
local left = 2 * i
local right = 2 * i + 1
local smallest = i
if left <= heap_size and heap[left].count < heap[smallest].count then
smallest = left
if right <= heap_size and heap[right].count < heap[smallest].count then
smallest = right
if smallest ~= i then
swap(i, smallest)
i = smallest
-- Insert a new element into the heap.
function heap_insert(ip, count)
heap_size = heap_size + 1
heap[heap_size] = {ip = ip, count = count}
heap_index[ip] = heap_size
-- Remove the smallest (root) element from the heap.
function heap_remove_root()
local root = heap[1]
heap[1] = heap[heap_size]
heap_index[heap[1].ip] = 1
heap[heap_size] = nil
heap_index[root.ip] = nil
heap_size = heap_size - 1
return root
-- Update the count for an IP already in the heap.
function heap_update(ip, new_count)
local i = heap_index[ip]
if not i then return end
heap[i].count = new_count
-- Process each incoming event
function process_event(event, emit)
local count = event.metric.counter.value
if not count then return end
-- Only consider doing anything with this event if it's at least as large as the
-- smallest of the old IPs (since we don't want to include a bunch of tiny nonsense
-- just because we see it first). If there really is a new-normal low level of traffic from
-- now on, then we will start adding clients again in the next cycle.
if count < minimum_count_worth_considering then return end
local ip = event.metric.tags.ip
if not ip then return end
-- The logic below can decide to emit for multiple reasons, so track whether we
-- intend to emit using a flag
local will_emit = false
-- If IP is already in the current heap, update its count (using max semantics).
if heap_index[ip] then
if count > heap[heap_index[ip]].count then
heap_update(ip, count)
will_emit = true
-- If the IP is in the old map (but not in current), we still emit it
if old_map[ip] then
will_emit = true
-- The IP is new. If there is room in the heap, then insert it
if heap_size < k then
heap_insert(ip, count)
will_emit = true
-- Heap is full; compare against the smallest element.
if count > heap[1].count then
heap_insert(ip, count)
will_emit = true
-- If we decided to emit, then do it
if will_emit then
metric = {
name = "haproxy:requests_from_top_clients",
kind = "incremental",
tags = { ip = ip },
counter = { value = count }
-- We're done
-- Timer handler: flush the current heap and rotate buffers.
-- Note: this can't be pre-empted by process_event - all of the Lua is
-- single-threaded with locking around each function, so don't worry
-- about races.
function cycle_top_client_buffers()
-- If there was anything in the old heap
old_root = heap[1]
if old_root then
-- Use the smallest value from the old heap as the new minimum-value for consideration
-- so that we can adapt to increasing values and more efficiently discard low value
-- events in the next window.
minimum_count_worth_considering = old_root.count
-- We didn't find any events during this flush interval that were as big as the smallest
-- from the last interval - the last one must have been anomalously large.
-- Always decrease our threshold for inclusion by 10% for the next window. This lets us track
-- decreases in the top rate as well as increases. If we didn't do this, then the floor for
-- inclusion would increase monotonically.
-- This acts as an exponential adaptive threshold allowing us to quickly find a new floor as
-- traffic rates change over time.
minimum_count_worth_considering = math.ceil(minimum_count_worth_considering * 0.9)
-- Respect the hard-coded abolute min_count threshold
if minimum_count_worth_considering < min_count then
minimum_count_worth_considering = min_count
-- Rotate: copy current heap data into old_map
old_map = {}
for i = 1, heap_size do
local entry = heap[i]
old_map[entry.ip] = entry.count
-- Reset the current heap
heap = {}
heap_index = {}
heap_size = 0
end |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi! I'm trying to extract the top client IP addresses from our request logs and export them as a metric to prometheus. Filtering for the top clients only is important to avoid overwhelming prometheus with labels.
I can convert our logs to metrics and aggregate counts by IP easily enough, but I'm really struggling to find a way to filter for only the top K values in each aggregated batch (or even with a second layer of aggregation).
Am I missing a trick here, or is this a missing feature?
It seems like this would be efficient to implement:
This could be an extension of
, but it's probably better as a different thing since the tags can change every interval.It probably makes most sense as an extension to
so that duplicates can be properly summed/max-ed/mean-ed etc. rather than always implicitly doingmax
.Beta Was this translation helpful? Give feedback.
All reactions