Skip to content

Commit

Permalink
Merge branch 'main' into metric-query-batching
Browse files Browse the repository at this point in the history
  • Loading branch information
LeaveMyYard authored Aug 10, 2023
2 parents 8f0cfab + 842d97e commit 123e8e1
Show file tree
Hide file tree
Showing 19 changed files with 1,345 additions and 1,023 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,38 @@ Than run the following command with PROMETHEUS_URL substituted for your Azure Ma
```sh
python krr.py simple --namespace default -p PROMETHEUS_URL --prometheus-auth-header "Bearer $AZURE_BEARER"
```
<p ><a href="#scanning-with-a-centralized-prometheus">See here about configuring labels for centralized prometheus</a></p>

<p align="right">(<a href="#readme-top">back to top</a>)</p>

## EKS managed Prometheus

For EKS managed Prometheus you need to add your prometheus link and the flag --eks-managed-prom and krr will automatically use your aws credentials

```sh
python krr.py simple -p "https://aps-workspaces.REGION.amazonaws.com/workspaces/..." --eks-managed-prom
```
Additional optional parameters are:
```sh
--eks-profile-name PROFILE_NAME_HERE # to specify the profile to use from your config
--eks-access-key ACCESS_KEY # to specify your access key
--eks-secret-key SECRET_KEY # to specify your secret key
--eks-service-name SERVICE_NAME # to use a specific service name in the signature
--eks-managed-prom-region REGION_NAME # to specify the region the prometheus is in
```
<p ><a href="#scanning-with-a-centralized-prometheus">See here about configuring labels for centralized prometheus</a></p>

<p align="right">(<a href="#readme-top">back to top</a>)</p>

## Coralogix managed Prometheus

For Coralogix managed Prometheus you need to specify your prometheus link and add the flag coralogix_token with your Logs Query Key

```sh
python krr.py simple -p "https://prom-api.coralogix..." --coralogix_token
```

<p ><a href="#scanning-with-a-centralized-prometheus">See here about configuring labels for centralized prometheus</a></p>

<p align="right">(<a href="#readme-top">back to top</a>)</p>

Expand Down
1,464 changes: 793 additions & 671 deletions poetry.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ kubernetes = "^26.1.0"
prometheus-api-client = "^0.5.3"
numpy = "^1.24.2"
alive-progress = "^3.1.2"
prometrix = "^0.1.10"
slack-sdk = "^3.21.3"
aiostream = "^0.4.5"


[tool.poetry.group.dev.dependencies]
Expand All @@ -42,13 +45,11 @@ types-cachetools = "^5.3.0.4"
types-requests = "^2.28.11.15"
pyinstaller = "^5.9.0"
pytest = "^7.2.2"
aiostream = "^0.4.5"
slack-sdk = "^3.21.3"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"


[project]
name = "robusta_krr"
name = "robusta_krr"
9 changes: 7 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
about-time==4.2.1 ; python_version >= "3.9" and python_version < "3.12"
aiostream==0.4.5 ; python_version >= "3.9" and python_version < "3.12"
alive-progress==3.1.2 ; python_version >= "3.9" and python_version < "3.12"
boto3==1.28.21 ; python_version >= "3.9" and python_version < "3.12"
botocore==1.31.21 ; python_version >= "3.9" and python_version < "3.12"
cachetools==5.3.0 ; python_version >= "3.9" and python_version < "3.12"
certifi==2022.12.7 ; python_version >= "3.9" and python_version < "3.12"
charset-normalizer==3.0.1 ; python_version >= "3.9" and python_version < "3.12"
click==8.1.3 ; python_version >= "3.9" and python_version < "3.12"
colorama==0.4.6 ; python_version >= "3.9" and python_version < "3.12"
colorama==0.4.6 ; python_version >= "3.9" and python_version < "3.12" and platform_system == "Windows"
commonmark==0.9.1 ; python_version >= "3.9" and python_version < "3.12"
contourpy==1.0.7 ; python_version >= "3.9" and python_version < "3.12"
cycler==0.11.0 ; python_version >= "3.9" and python_version < "3.12"
Expand All @@ -16,6 +18,7 @@ grapheme==0.6.0 ; python_version >= "3.9" and python_version < "3.12"
httmock==1.4.0 ; python_version >= "3.9" and python_version < "3.12"
idna==3.4 ; python_version >= "3.9" and python_version < "3.12"
importlib-resources==5.12.0 ; python_version >= "3.9" and python_version < "3.10"
jmespath==1.0.1 ; python_version >= "3.9" and python_version < "3.12"
kiwisolver==1.4.4 ; python_version >= "3.9" and python_version < "3.12"
kubernetes==26.1.0 ; python_version >= "3.9" and python_version < "3.12"
matplotlib==3.7.1 ; python_version >= "3.9" and python_version < "3.12"
Expand All @@ -25,6 +28,7 @@ packaging==23.0 ; python_version >= "3.9" and python_version < "3.12"
pandas==1.5.3 ; python_version >= "3.9" and python_version < "3.12"
pillow==9.4.0 ; python_version >= "3.9" and python_version < "3.12"
prometheus-api-client==0.5.3 ; python_version >= "3.9" and python_version < "3.12"
prometrix==0.1.10 ; python_version >= "3.9" and python_version < "3.12"
pyasn1-modules==0.2.8 ; python_version >= "3.9" and python_version < "3.12"
pyasn1==0.4.8 ; python_version >= "3.9" and python_version < "3.12"
pydantic==1.10.7 ; python_version >= "3.9" and python_version < "3.12"
Expand All @@ -39,14 +43,15 @@ requests-oauthlib==1.3.1 ; python_version >= "3.9" and python_version < "3.12"
requests==2.28.2 ; python_version >= "3.9" and python_version < "3.12"
rich==12.6.0 ; python_version >= "3.9" and python_version < "3.12"
rsa==4.9 ; python_version >= "3.9" and python_version < "3.12"
s3transfer==0.6.1 ; python_version >= "3.9" and python_version < "3.12"
setuptools==67.4.0 ; python_version >= "3.9" and python_version < "3.12"
shellingham==1.5.0.post1 ; python_version >= "3.9" and python_version < "3.12"
six==1.16.0 ; python_version >= "3.9" and python_version < "3.12"
slack-sdk==3.21.3 ; python_version >= "3.9" and python_version < "3.12"
typer[all]==0.7.0 ; python_version >= "3.9" and python_version < "3.12"
typing-extensions==4.5.0 ; python_version >= "3.9" and python_version < "3.12"
tzdata==2022.7 ; python_version >= "3.9" and python_version < "3.12"
tzlocal==4.2 ; python_version >= "3.9" and python_version < "3.12"
urllib3==1.26.14 ; python_version >= "3.9" and python_version < "3.12"
websocket-client==1.5.1 ; python_version >= "3.9" and python_version < "3.12"
zipp==3.15.0 ; python_version >= "3.9" and python_version < "3.10"
slack-sdk==3.21.3 ; python_version >= "3.9" and python_version < "3.12"
206 changes: 96 additions & 110 deletions robusta_krr/core/integrations/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,24 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import AsyncGenerator, Optional, Union
from typing import AsyncGenerator, Optional, Union, Callable, AsyncIterator
import aiostream

from kubernetes import client, config # type: ignore
from kubernetes.client import ApiException
from kubernetes.client.models import (
V1Container,
V1DaemonSet,
V1DaemonSetList,
V1Deployment,
V1DeploymentList,
V1Job,
V1JobList,
V1LabelSelector,
V1Pod,
V1PodList,
V1StatefulSet,
V1StatefulSetList,
V1HorizontalPodAutoscalerList,
V2HorizontalPodAutoscaler,
V2HorizontalPodAutoscalerList,
)

from robusta_krr.core.models.objects import HPAData, K8sObjectData
from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.configurable import Configurable

Expand Down Expand Up @@ -53,6 +48,8 @@ def __init__(self, cluster: Optional[str], *args, **kwargs):
self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client)
self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client)

self.__rollouts_available = True

async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:
"""List all scannable objects.
Expand All @@ -62,31 +59,25 @@ async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:

self.info(f"Listing scannable objects in {self.cluster}")
self.debug(f"Namespaces: {self.config.namespaces}")
self.debug(f"Resources: {self.config.resources}")

self.__hpa_list = await self._try_list_hpa()

tasks = [
# https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python
# This will merge all the streams from all the cluster loaders into a single stream
objects_combined = aiostream.stream.merge(
self._list_deployments(),
self._list_rollouts(),
self._list_all_statefulsets(),
self._list_all_daemon_set(),
self._list_all_jobs(),
]

for fut in asyncio.as_completed(tasks):
try:
object_list = await fut
except Exception as e:
self.error(f"Error {e.__class__.__name__} listing objects in cluster {self.cluster}: {e}")
self.debug_exception()
self.error("Will skip this object type and continue.")
continue
)

for object in object_list:
async with objects_combined.stream() as streamer:
async for object in streamer:
# NOTE: By default we will filter out kube-system namespace
if self.config.namespaces == "*" and object.namespace == "kube-system":
continue
elif self.config.namespaces != "*" and object.namespace not in self.config.namespaces:
continue
yield object

@staticmethod
Expand Down Expand Up @@ -127,108 +118,103 @@ def __build_obj(
hpa=self.__hpa_list.get((namespace, kind, name)),
)

async def _list_deployments(self) -> list[K8sObjectData]:
self.debug(f"Listing deployments in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1DeploymentList = await loop.run_in_executor(
self.executor,
lambda: self.apps.list_deployment_for_all_namespaces(
watch=False,
label_selector=self.config.selector,
),
)
self.debug(f"Found {len(ret.items)} deployments in {self.cluster}")
def _should_list_resource(self, resource: str):
if self.config.resources == "*":
return True
return resource.lower() in self.config.resources

async def _list_workflows(
self, kind: KindLiteral, all_namespaces_request: Callable, namespaced_request: Callable
) -> AsyncIterator[K8sObjectData]:
if not self._should_list_resource(kind):
self.debug(f"Skipping {kind}s in {self.cluster}")
return

return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]
if kind == "rollout" and not self.__rollouts_available:
return

async def _list_rollouts(self) -> list[K8sObjectData]:
self.debug(f"Listing ArgoCD rollouts in {self.cluster}")
self.debug(f"Listing {kind}s in {self.cluster}")
loop = asyncio.get_running_loop()

try:
ret: V1DeploymentList = await loop.run_in_executor(
self.executor,
lambda: self.rollout.list_rollout_for_all_namespaces(
watch=False,
label_selector=self.config.selector,
),
)
if self.config.namespaces == "*":
ret_multi = await loop.run_in_executor(
self.executor,
lambda: all_namespaces_request(
watch=False,
label_selector=self.config.selector,
),
)
self.debug(f"Found {len(ret_multi.items)} {kind} in {self.cluster}")
for item in ret_multi.items:
for container in item.spec.template.spec.containers:
yield self.__build_obj(item, container)
else:
tasks = [
loop.run_in_executor(
self.executor,
lambda: namespaced_request(
namespace=namespace,
watch=False,
label_selector=self.config.selector,
),
)
for namespace in self.config.namespaces
]

total_items = 0
for task in asyncio.as_completed(tasks):
ret_single = await task
total_items += len(ret_single.items)
for item in ret_single.items:
for container in item.spec.template.spec.containers:
yield self.__build_obj(item, container)

self.debug(f"Found {total_items} {kind} in {self.cluster}")
except ApiException as e:
if e.status in [400, 401, 403, 404]:
self.debug(f"Rollout API not available in {self.cluster}")
return []
raise

self.debug(f"Found {len(ret.items)} rollouts in {self.cluster}")

return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]
if kind == "rollout" and e.status in [400, 401, 403, 404]:
if self.__rollouts_available:
self.debug(f"Rollout API not available in {self.cluster}")
self.__rollouts_available = False
else:
self.error(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}")
self.debug_exception()
self.error("Will skip this object type and continue.")

async def _list_all_statefulsets(self) -> list[K8sObjectData]:
self.debug(f"Listing statefulsets in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1StatefulSetList = await loop.run_in_executor(
self.executor,
lambda: self.apps.list_stateful_set_for_all_namespaces(
watch=False,
label_selector=self.config.selector,
),
def _list_deployments(self) -> AsyncIterator[K8sObjectData]:
return self._list_workflows(
kind="deployment",
all_namespaces_request=self.apps.list_deployment_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_deployment,
)
self.debug(f"Found {len(ret.items)} statefulsets in {self.cluster}")

return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_all_daemon_set(self) -> list[K8sObjectData]:
self.debug(f"Listing daemonsets in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1DaemonSetList = await loop.run_in_executor(
self.executor,
lambda: self.apps.list_daemon_set_for_all_namespaces(
watch=False,
label_selector=self.config.selector,
),
def _list_rollouts(self) -> AsyncIterator[K8sObjectData]:
return self._list_workflows(
kind="rollout",
all_namespaces_request=self.rollout.list_rollout_for_all_namespaces,
namespaced_request=self.rollout.list_namespaced_rollout,
)
self.debug(f"Found {len(ret.items)} daemonsets in {self.cluster}")

return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_all_jobs(self) -> list[K8sObjectData]:
self.debug(f"Listing jobs in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1JobList = await loop.run_in_executor(
self.executor,
lambda: self.batch.list_job_for_all_namespaces(
watch=False,
label_selector=self.config.selector,
),
def _list_all_statefulsets(self) -> AsyncIterator[K8sObjectData]:
return self._list_workflows(
kind="statefulset",
all_namespaces_request=self.apps.list_stateful_set_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_stateful_set,
)
self.debug(f"Found {len(ret.items)} jobs in {self.cluster}")

return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_pods(self) -> list[K8sObjectData]:
"""For future use, not supported yet."""

self.debug(f"Listing pods in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1PodList = await loop.run_in_executor(
self.executor,
lambda: self.apps.list_pod_for_all_namespaces(
watch=False,
label_selector=self.config.selector,
),
def _list_all_daemon_set(self) -> AsyncIterator[K8sObjectData]:
return self._list_workflows(
kind="daemonset",
all_namespaces_request=self.apps.list_daemon_set_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_daemon_set,
)
self.debug(f"Found {len(ret.items)} pods in {self.cluster}")

return [self.__build_obj(item, container) for item in ret.items for container in item.spec.containers]
def _list_all_jobs(self) -> AsyncIterator[K8sObjectData]:
return self._list_workflows(
kind="job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
)

async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
loop = asyncio.get_running_loop()
Expand Down Expand Up @@ -367,7 +353,7 @@ def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[Cluster
self.error(f"Could not load cluster {cluster} and will skip it: {e}")
return None

async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncGenerator[K8sObjectData, None]:
async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIterator[K8sObjectData]:
"""List all scannable objects.
Yields:
Expand Down
2 changes: 1 addition & 1 deletion robusta_krr/core/integrations/prometheus/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .loader import PrometheusMetricsLoader
from .metrics_service.prometheus_metrics_service import PrometheusDiscovery, PrometheusNotFound
from .prometheus_client import CustomPrometheusConnect, ClusterNotSpecifiedException
from .prometheus_utils import ClusterNotSpecifiedException
Loading

0 comments on commit 123e8e1

Please sign in to comment.