Filter for topk metrics #22608
Unanswered
djrodgerspryor
asked this question in
Q&A
Replies: 1 comment
-
I ended up implementing an efficient version of this in Lua. Here's the code in case anyone else finds it useful. I think the same algorithm would be worth implementing natively in Vector; I'd be happy to have a swing at that if there's interest! [transforms.requestlog_metrics]
type = "log_to_metric"
inputs = ["haproxy_requestlogs"]
# Count the number of requests by client IP
[[transforms.requestlog_metrics.metrics]]
type = "counter"
field = "client_ip"
name = "haproxy:requests_from_top_clients"
tags.ip = "{{client_ip}}"
# Reduce Vector's overhead by aggregating summing all of the metric events once every few seconds so
# that it can just store the sum and not a full event per request. This is also crucial to the requestlog_top_clients
# transforms below because it allows per-second rates to be inferred from individual events
[transforms.requestlog_aggregate]
type = "aggregate"
inputs = ["requestlog_metrics"]
# Turn individual events into per-second counts.
# This interval is the amount of aggregation we do before looking for high aggregated counts as part of the
# top-k calculation below.
# Increasing this will be more taxing for Vector (since it needs more memory for more series in the aggregation
# window), but will provide a longer window for capturing high-volume traffic so that the top-k
# algorithm can prioritise more sustained traffic over short bursts.
# Decreasing this will be less taxing for Vector, but will bias the top-k algorithm more towards shorter
# spikes in traffic.
# 1 second is a nice number that will show high-rate clients clearly, isn't too expensive, and makes the
# resulting counts easy to reason about (since they're requests/second)
interval_ms = 1000
# Sort and keep only the top ~1000 IPs using some custom stateful lua heap magic since there's no built in
# transform for what I want.
[transforms.requestlog_top_clients]
type = "lua"
version = "2"
inputs = ["requestlog_aggregate"]
source = '''
-- Top-k (k = 1000) double-buffer algorithm using a min-heap.
-- This is designed for efficient stream processing so:
-- - event should be processed in <= O(log(k)) time
-- - the whole thing needs only O(k) memory
--
-- By design, if lots of new top-rate clients are coming online quickly, we
-- can burst and allow more than k metrics to be omitted within a given flush window,
-- but this should stablise quickly. Since we're using this for tracking top client
-- IPs for DDoS protection, responsiveness is a priority.
--
-- Relies on aggregation in the previous step (per-second seems ideal) so that we can
-- treat each the count on new event that comes in as a peak-rate value for use in
-- determining the top clients based on that peak rate.
-- The k limit is (approximately) enforced within each flush interval. This interval
-- also determines how long we remember about old high peak rates.
--
-- We maintain a current heap (for fast O(log(k)) checks before emitting each event)
-- and an old_map (taken the old heap after each flush) to ensure that we don't forget
-- about the old top IPs after each flush.
-- The flushing mechanism ensures that we forget about peak traffic after two flushes
-- and don't keep requiring new IPs to send more than the all-time peak.
k = 1000
-- Optimisation: don't even consider any new events below this size.
-- ie. below this per-batch (per second since aggregate interval_ms=1000) rate.
-- This stops us from pummeling prometheus with lots of different random metrics for
-- low rate clients that happen to be the highest in a given window and then disappear.
-- This should be low enough that we usually have some clients exceeding it every few
-- minutes — even during low traffic periods — otherwise we won't export any metrics to
-- prometheus during those periods and our absent-metric alerts will fire.
min_count = 10
-- Current heap: an array of elements, each element is { ip = <ip>, count = <number> }
-- Also maintain a hash mapping: ip -> index in the heap.
heap = {}
heap_index = {}
heap_size = 0
-- Old map: ip -> count from previous flush; used to preserve memory between rotations
old_map = {}
-- Track the lowest value from the old heap, starting with the minimum size limit
minimum_count_worth_considering = min_count
-- Helper: swap elements in the heap and update indices.
function swap(i, j)
heap[i], heap[j] = heap[j], heap[i]
heap_index[heap[i].ip] = i
heap_index[heap[j].ip] = j
end
-- Heapify-up from index i.
function heapify_up(i)
while i > 1 do
local parent = math.floor(i / 2)
if heap[i].count < heap[parent].count then
swap(i, parent)
i = parent
else
break
end
end
end
-- Heapify-down from index i.
function heapify_down(i)
while true do
local left = 2 * i
local right = 2 * i + 1
local smallest = i
if left <= heap_size and heap[left].count < heap[smallest].count then
smallest = left
end
if right <= heap_size and heap[right].count < heap[smallest].count then
smallest = right
end
if smallest ~= i then
swap(i, smallest)
i = smallest
else
break
end
end
end
-- Insert a new element into the heap.
function heap_insert(ip, count)
heap_size = heap_size + 1
heap[heap_size] = {ip = ip, count = count}
heap_index[ip] = heap_size
heapify_up(heap_size)
end
-- Remove the smallest (root) element from the heap.
function heap_remove_root()
local root = heap[1]
heap[1] = heap[heap_size]
heap_index[heap[1].ip] = 1
heap[heap_size] = nil
heap_index[root.ip] = nil
heap_size = heap_size - 1
heapify_down(1)
return root
end
-- Update the count for an IP already in the heap.
function heap_update(ip, new_count)
local i = heap_index[ip]
if not i then return end
heap[i].count = new_count
heapify_down(i)
heapify_up(i)
end
-- Process each incoming event
function process_event(event, emit)
local count = event.metric.counter.value
if not count then return end
-- Only consider doing anything with this event if it's at least as large as the
-- smallest of the old IPs (since we don't want to include a bunch of tiny nonsense
-- just because we see it first). If there really is a new-normal low level of traffic from
-- now on, then we will start adding clients again in the next cycle.
if count < minimum_count_worth_considering then return end
local ip = event.metric.tags.ip
if not ip then return end
-- The logic below can decide to emit for multiple reasons, so track whether we
-- intend to emit using a flag
local will_emit = false
-- If IP is already in the current heap, update its count (using max semantics).
if heap_index[ip] then
if count > heap[heap_index[ip]].count then
heap_update(ip, count)
end
will_emit = true
end
-- If the IP is in the old map (but not in current), we still emit it
if old_map[ip] then
will_emit = true
end
-- The IP is new. If there is room in the heap, then insert it
if heap_size < k then
heap_insert(ip, count)
will_emit = true
else
-- Heap is full; compare against the smallest element.
if count > heap[1].count then
heap_remove_root()
heap_insert(ip, count)
will_emit = true
end
end
-- If we decided to emit, then do it
if will_emit then
emit({
metric = {
name = "haproxy:requests_from_top_clients",
kind = "incremental",
tags = { ip = ip },
counter = { value = count }
}
})
end
-- We're done
return
end
-- Timer handler: flush the current heap and rotate buffers.
-- Note: this can't be pre-empted by process_event - all of the Lua is
-- single-threaded with locking around each function, so don't worry
-- about races.
function cycle_top_client_buffers()
-- If there was anything in the old heap
old_root = heap[1]
if old_root then
-- Use the smallest value from the old heap as the new minimum-value for consideration
-- so that we can adapt to increasing values and more efficiently discard low value
-- events in the next window.
minimum_count_worth_considering = old_root.count
else
-- We didn't find any events during this flush interval that were as big as the smallest
-- from the last interval - the last one must have been anomalously large.
end
-- Always decrease our threshold for inclusion by 10% for the next window. This lets us track
-- decreases in the top rate as well as increases. If we didn't do this, then the floor for
-- inclusion would increase monotonically.
-- This acts as an exponential adaptive threshold allowing us to quickly find a new floor as
-- traffic rates change over time.
minimum_count_worth_considering = math.ceil(minimum_count_worth_considering * 0.9)
-- Respect the hard-coded abolute min_count threshold
if minimum_count_worth_considering < min_count then
minimum_count_worth_considering = min_count
end
-- Rotate: copy current heap data into old_map
old_map = {}
for i = 1, heap_size do
local entry = heap[i]
old_map[entry.ip] = entry.count
end
-- Reset the current heap
heap = {}
heap_index = {}
heap_size = 0
end
'''
hooks.process = "process_event"
[[transforms.requestlog_top_clients.timers]]
# This interval governs over what window we enforce the (approximate) top-k limit and how long we remember
# about previous peaks as a benchmark for new clients. Should be several times larger than aggregate interval.
# Metrics from new very high peak rate clients will be included even if we've included 1000 clients in this
# interval, so longer is better and won't harm responsiveness.
interval_seconds = 60
handler = "cycle_top_client_buffers" Here's just the Lua for better syntax highlighting: -- Top-k (k = 1000) double-buffer algorithm using a min-heap.
-- This is designed for efficient stream processing so:
-- - event should be processed in <= O(log(k)) time
-- - the whole thing needs only O(k) memory
--
-- By design, if lots of new top-rate clients are coming online quickly, we
-- can burst and allow more than k metrics to be omitted within a given flush window,
-- but this should stablise quickly. Since we're using this for tracking top client
-- IPs for DDoS protection, responsiveness is a priority.
--
-- Relies on aggregation in the previous step (per-second seems ideal) so that we can
-- treat each the count on new event that comes in as a peak-rate value for use in
-- determining the top clients based on that peak rate.
-- The k limit is (approximately) enforced within each flush interval. This interval
-- also determines how long we remember about old high peak rates.
--
-- We maintain a current heap (for fast O(log(k)) checks before emitting each event)
-- and an old_map (taken the old heap after each flush) to ensure that we don't forget
-- about the old top IPs after each flush.
-- The flushing mechanism ensures that we forget about peak traffic after two flushes
-- and don't keep requiring new IPs to send more than the all-time peak.
k = 1000
-- Optimisation: don't even consider any new events below this size.
-- ie. below this per-batch (per second since aggregate interval_ms=1000) rate.
-- This stops us from pummeling prometheus with lots of different random metrics for
-- low rate clients that happen to be the highest in a given window and then disappear.
-- This should be low enough that we usually have some clients exceeding it every few
-- minutes — even during low traffic periods — otherwise we won't export any metrics to
-- prometheus during those periods and our absent-metric alerts will fire.
min_count = 10
-- Current heap: an array of elements, each element is { ip = <ip>, count = <number> }
-- Also maintain a hash mapping: ip -> index in the heap.
heap = {}
heap_index = {}
heap_size = 0
-- Old map: ip -> count from previous flush; used to preserve memory between rotations
old_map = {}
-- Track the lowest value from the old heap, starting with the minimum size limit
minimum_count_worth_considering = min_count
-- Helper: swap elements in the heap and update indices.
function swap(i, j)
heap[i], heap[j] = heap[j], heap[i]
heap_index[heap[i].ip] = i
heap_index[heap[j].ip] = j
end
-- Heapify-up from index i.
function heapify_up(i)
while i > 1 do
local parent = math.floor(i / 2)
if heap[i].count < heap[parent].count then
swap(i, parent)
i = parent
else
break
end
end
end
-- Heapify-down from index i.
function heapify_down(i)
while true do
local left = 2 * i
local right = 2 * i + 1
local smallest = i
if left <= heap_size and heap[left].count < heap[smallest].count then
smallest = left
end
if right <= heap_size and heap[right].count < heap[smallest].count then
smallest = right
end
if smallest ~= i then
swap(i, smallest)
i = smallest
else
break
end
end
end
-- Insert a new element into the heap.
function heap_insert(ip, count)
heap_size = heap_size + 1
heap[heap_size] = {ip = ip, count = count}
heap_index[ip] = heap_size
heapify_up(heap_size)
end
-- Remove the smallest (root) element from the heap.
function heap_remove_root()
local root = heap[1]
heap[1] = heap[heap_size]
heap_index[heap[1].ip] = 1
heap[heap_size] = nil
heap_index[root.ip] = nil
heap_size = heap_size - 1
heapify_down(1)
return root
end
-- Update the count for an IP already in the heap.
function heap_update(ip, new_count)
local i = heap_index[ip]
if not i then return end
heap[i].count = new_count
heapify_down(i)
heapify_up(i)
end
-- Process each incoming event
function process_event(event, emit)
local count = event.metric.counter.value
if not count then return end
-- Only consider doing anything with this event if it's at least as large as the
-- smallest of the old IPs (since we don't want to include a bunch of tiny nonsense
-- just because we see it first). If there really is a new-normal low level of traffic from
-- now on, then we will start adding clients again in the next cycle.
if count < minimum_count_worth_considering then return end
local ip = event.metric.tags.ip
if not ip then return end
-- The logic below can decide to emit for multiple reasons, so track whether we
-- intend to emit using a flag
local will_emit = false
-- If IP is already in the current heap, update its count (using max semantics).
if heap_index[ip] then
if count > heap[heap_index[ip]].count then
heap_update(ip, count)
end
will_emit = true
end
-- If the IP is in the old map (but not in current), we still emit it
if old_map[ip] then
will_emit = true
end
-- The IP is new. If there is room in the heap, then insert it
if heap_size < k then
heap_insert(ip, count)
will_emit = true
else
-- Heap is full; compare against the smallest element.
if count > heap[1].count then
heap_remove_root()
heap_insert(ip, count)
will_emit = true
end
end
-- If we decided to emit, then do it
if will_emit then
emit({
metric = {
name = "haproxy:requests_from_top_clients",
kind = "incremental",
tags = { ip = ip },
counter = { value = count }
}
})
end
-- We're done
return
end
-- Timer handler: flush the current heap and rotate buffers.
-- Note: this can't be pre-empted by process_event - all of the Lua is
-- single-threaded with locking around each function, so don't worry
-- about races.
function cycle_top_client_buffers()
-- If there was anything in the old heap
old_root = heap[1]
if old_root then
-- Use the smallest value from the old heap as the new minimum-value for consideration
-- so that we can adapt to increasing values and more efficiently discard low value
-- events in the next window.
minimum_count_worth_considering = old_root.count
else
-- We didn't find any events during this flush interval that were as big as the smallest
-- from the last interval - the last one must have been anomalously large.
end
-- Always decrease our threshold for inclusion by 10% for the next window. This lets us track
-- decreases in the top rate as well as increases. If we didn't do this, then the floor for
-- inclusion would increase monotonically.
-- This acts as an exponential adaptive threshold allowing us to quickly find a new floor as
-- traffic rates change over time.
minimum_count_worth_considering = math.ceil(minimum_count_worth_considering * 0.9)
-- Respect the hard-coded abolute min_count threshold
if minimum_count_worth_considering < min_count then
minimum_count_worth_considering = min_count
end
-- Rotate: copy current heap data into old_map
old_map = {}
for i = 1, heap_size do
local entry = heap[i]
old_map[entry.ip] = entry.count
end
-- Reset the current heap
heap = {}
heap_index = {}
heap_size = 0
end |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hi! I'm trying to extract the top client IP addresses from our request logs and export them as a metric to prometheus. Filtering for the top clients only is important to avoid overwhelming prometheus with labels.
I can convert our logs to metrics and aggregate counts by IP easily enough, but I'm really struggling to find a way to filter for only the top K values in each aggregated batch (or even with a second layer of aggregation).
Am I missing a trick here, or is this a missing feature?
It seems like this would be efficient to implement:
This could be an extension of
tag_cardinality_limit
, but it's probably better as a different thing since the tags can change every interval.It probably makes most sense as an extension to
aggregate
so that duplicates can be properly summed/max-ed/mean-ed etc. rather than always implicitly doingmax
.Beta Was this translation helpful? Give feedback.
All reactions