From 3d0b3bcfec2a1c08d33cec308124364f607158b8 Mon Sep 17 00:00:00 2001 From: Pavel Zhukov <33721692+LeaveMyYard@users.noreply.github.com> Date: Thu, 24 Aug 2023 09:50:43 +0300 Subject: [PATCH] Rework current pods query to filter finished ones (#133) --- .../prometheus_metrics_service.py | 41 +++++++++---------- robusta_krr/utils/batched.py | 15 +++++++ 2 files changed, 35 insertions(+), 21 deletions(-) create mode 100644 robusta_krr/utils/batched.py diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 205fc783..4fd6e24f 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -11,6 +11,7 @@ from robusta_krr.core.models.config import Config from robusta_krr.core.models.objects import K8sObjectData, PodData from robusta_krr.utils.service_discovery import MetricsServiceDiscovery +from robusta_krr.utils.batched import batched from ..metrics import PrometheusMetric from ..prometheus_utils import ClusterNotSpecifiedException, generate_prometheus_config @@ -180,7 +181,7 @@ async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> pod_owner_kind = object.kind owners_regex = "|".join(pod_owners) - related_pods = await self.query( + related_pods_result = await self.query( f""" last_over_time( kube_pod_owner{{ @@ -193,29 +194,27 @@ async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> """ ) - if related_pods == []: + if related_pods_result == []: self.debug(f"No pods found for {object}") return - current_pods = await self.query( - f""" - present_over_time( - kube_pod_owner{{ - owner_name=~"{owners_regex}", - owner_kind="{pod_owner_kind}", + related_pods = [pod["metric"]["pod"] for pod in related_pods_result] + current_pods_set = set() + del related_pods_result + + for pod_group in batched(related_pods, 100): + group_regex = "|".join(pod_group) + pods_status_result = await self.query( + f""" + kube_pod_status_phase{{ + phase="Running", + pod=~"{group_regex}", namespace="{object.namespace}" {cluster_label} - }}[1m] - ) - """ - ) - - current_pods_set = {pod["metric"]["pod"] for pod in current_pods} - del current_pods + }} == 1 + """ + ) + current_pods_set |= {pod["metric"]["pod"] for pod in pods_status_result} + del pods_status_result - object.pods += set( - [ - PodData(name=pod["metric"]["pod"], deleted=pod["metric"]["pod"] not in current_pods_set) - for pod in related_pods - ] - ) + object.pods = list({PodData(name=pod, deleted=pod not in current_pods_set) for pod in related_pods}) diff --git a/robusta_krr/utils/batched.py b/robusta_krr/utils/batched.py new file mode 100644 index 00000000..b56790be --- /dev/null +++ b/robusta_krr/utils/batched.py @@ -0,0 +1,15 @@ +import itertools +from typing import Iterable, TypeVar + + +_T = TypeVar("_T") + + +def batched(iterable: Iterable[_T], n: int) -> Iterable[list[_T]]: + "Batch data into tuples of length n. The last batch may be shorter." + # batched('ABCDEFG', 3) --> ABC DEF G + if n < 1: + raise ValueError("n must be at least one") + it = iter(iterable) + while batch := list(itertools.islice(it, n)): + yield batch