Skip to content

Commit

Permalink
Merge pull request #122 from robusta-dev/namspaces-filtering-fix
Browse files Browse the repository at this point in the history
Fix listing namespaced workflows
  • Loading branch information
LeaveMyYard authored Aug 10, 2023
2 parents ea01100 + 90e8af6 commit 842d97e
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 198 deletions.
206 changes: 96 additions & 110 deletions robusta_krr/core/integrations/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,24 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import AsyncGenerator, Optional, Union
from typing import AsyncGenerator, Optional, Union, Callable, AsyncIterator
import aiostream

from kubernetes import client, config # type: ignore
from kubernetes.client import ApiException
from kubernetes.client.models import (
V1Container,
V1DaemonSet,
V1DaemonSetList,
V1Deployment,
V1DeploymentList,
V1Job,
V1JobList,
V1LabelSelector,
V1Pod,
V1PodList,
V1StatefulSet,
V1StatefulSetList,
V1HorizontalPodAutoscalerList,
V2HorizontalPodAutoscaler,
V2HorizontalPodAutoscalerList,
)

from robusta_krr.core.models.objects import HPAData, K8sObjectData
from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.configurable import Configurable

Expand Down Expand Up @@ -53,6 +48,8 @@ def __init__(self, cluster: Optional[str], *args, **kwargs):
self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client)
self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client)

self.__rollouts_available = True

async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:
"""List all scannable objects.
Expand All @@ -62,31 +59,25 @@ async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:

self.info(f"Listing scannable objects in {self.cluster}")
self.debug(f"Namespaces: {self.config.namespaces}")
self.debug(f"Resources: {self.config.resources}")

self.__hpa_list = await self._try_list_hpa()

tasks = [
# https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python
# This will merge all the streams from all the cluster loaders into a single stream
objects_combined = aiostream.stream.merge(
self._list_deployments(),
self._list_rollouts(),
self._list_all_statefulsets(),
self._list_all_daemon_set(),
self._list_all_jobs(),
]

for fut in asyncio.as_completed(tasks):
try:
object_list = await fut
except Exception as e:
self.error(f"Error {e.__class__.__name__} listing objects in cluster {self.cluster}: {e}")
self.debug_exception()
self.error("Will skip this object type and continue.")
continue
)

for object in object_list:
async with objects_combined.stream() as streamer:
async for object in streamer:
# NOTE: By default we will filter out kube-system namespace
if self.config.namespaces == "*" and object.namespace == "kube-system":
continue
elif self.config.namespaces != "*" and object.namespace not in self.config.namespaces:
continue
yield object

@staticmethod
Expand Down Expand Up @@ -127,108 +118,103 @@ def __build_obj(
hpa=self.__hpa_list.get((namespace, kind, name)),
)

async def _list_deployments(self) -> list[K8sObjectData]:
self.debug(f"Listing deployments in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1DeploymentList = await loop.run_in_executor(
self.executor,
lambda: self.apps.list_deployment_for_all_namespaces(
watch=False,
label_selector=self.config.selector,
),
)
self.debug(f"Found {len(ret.items)} deployments in {self.cluster}")
def _should_list_resource(self, resource: str):
if self.config.resources == "*":
return True
return resource.lower() in self.config.resources

async def _list_workflows(
self, kind: KindLiteral, all_namespaces_request: Callable, namespaced_request: Callable
) -> AsyncIterator[K8sObjectData]:
if not self._should_list_resource(kind):
self.debug(f"Skipping {kind}s in {self.cluster}")
return

return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]
if kind == "rollout" and not self.__rollouts_available:
return

async def _list_rollouts(self) -> list[K8sObjectData]:
self.debug(f"Listing ArgoCD rollouts in {self.cluster}")
self.debug(f"Listing {kind}s in {self.cluster}")
loop = asyncio.get_running_loop()

try:
ret: V1DeploymentList = await loop.run_in_executor(
self.executor,
lambda: self.rollout.list_rollout_for_all_namespaces(
watch=False,
label_selector=self.config.selector,
),
)
if self.config.namespaces == "*":
ret_multi = await loop.run_in_executor(
self.executor,
lambda: all_namespaces_request(
watch=False,
label_selector=self.config.selector,
),
)
self.debug(f"Found {len(ret_multi.items)} {kind} in {self.cluster}")
for item in ret_multi.items:
for container in item.spec.template.spec.containers:
yield self.__build_obj(item, container)
else:
tasks = [
loop.run_in_executor(
self.executor,
lambda: namespaced_request(
namespace=namespace,
watch=False,
label_selector=self.config.selector,
),
)
for namespace in self.config.namespaces
]

total_items = 0
for task in asyncio.as_completed(tasks):
ret_single = await task
total_items += len(ret_single.items)
for item in ret_single.items:
for container in item.spec.template.spec.containers:
yield self.__build_obj(item, container)

self.debug(f"Found {total_items} {kind} in {self.cluster}")
except ApiException as e:
if e.status in [400, 401, 403, 404]:
self.debug(f"Rollout API not available in {self.cluster}")
return []
raise

self.debug(f"Found {len(ret.items)} rollouts in {self.cluster}")

return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]
if kind == "rollout" and e.status in [400, 401, 403, 404]:
if self.__rollouts_available:
self.debug(f"Rollout API not available in {self.cluster}")
self.__rollouts_available = False
else:
self.error(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}")
self.debug_exception()
self.error("Will skip this object type and continue.")

async def _list_all_statefulsets(self) -> list[K8sObjectData]:
self.debug(f"Listing statefulsets in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1StatefulSetList = await loop.run_in_executor(
self.executor,
lambda: self.apps.list_stateful_set_for_all_namespaces(
watch=False,
label_selector=self.config.selector,
),
def _list_deployments(self) -> AsyncIterator[K8sObjectData]:
return self._list_workflows(
kind="deployment",
all_namespaces_request=self.apps.list_deployment_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_deployment,
)
self.debug(f"Found {len(ret.items)} statefulsets in {self.cluster}")

return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_all_daemon_set(self) -> list[K8sObjectData]:
self.debug(f"Listing daemonsets in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1DaemonSetList = await loop.run_in_executor(
self.executor,
lambda: self.apps.list_daemon_set_for_all_namespaces(
watch=False,
label_selector=self.config.selector,
),
def _list_rollouts(self) -> AsyncIterator[K8sObjectData]:
return self._list_workflows(
kind="rollout",
all_namespaces_request=self.rollout.list_rollout_for_all_namespaces,
namespaced_request=self.rollout.list_namespaced_rollout,
)
self.debug(f"Found {len(ret.items)} daemonsets in {self.cluster}")

return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_all_jobs(self) -> list[K8sObjectData]:
self.debug(f"Listing jobs in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1JobList = await loop.run_in_executor(
self.executor,
lambda: self.batch.list_job_for_all_namespaces(
watch=False,
label_selector=self.config.selector,
),
def _list_all_statefulsets(self) -> AsyncIterator[K8sObjectData]:
return self._list_workflows(
kind="statefulset",
all_namespaces_request=self.apps.list_stateful_set_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_stateful_set,
)
self.debug(f"Found {len(ret.items)} jobs in {self.cluster}")

return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_pods(self) -> list[K8sObjectData]:
"""For future use, not supported yet."""

self.debug(f"Listing pods in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1PodList = await loop.run_in_executor(
self.executor,
lambda: self.apps.list_pod_for_all_namespaces(
watch=False,
label_selector=self.config.selector,
),
def _list_all_daemon_set(self) -> AsyncIterator[K8sObjectData]:
return self._list_workflows(
kind="daemonset",
all_namespaces_request=self.apps.list_daemon_set_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_daemon_set,
)
self.debug(f"Found {len(ret.items)} pods in {self.cluster}")

return [self.__build_obj(item, container) for item in ret.items for container in item.spec.containers]
def _list_all_jobs(self) -> AsyncIterator[K8sObjectData]:
return self._list_workflows(
kind="job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
)

async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
loop = asyncio.get_running_loop()
Expand Down Expand Up @@ -367,7 +353,7 @@ def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[Cluster
self.error(f"Could not load cluster {cluster} and will skip it: {e}")
return None

async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncGenerator[K8sObjectData, None]:
async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIterator[K8sObjectData]:
"""List all scannable objects.
Yields:
Expand Down
Loading

0 comments on commit 842d97e

Please sign in to comment.