-
Notifications
You must be signed in to change notification settings - Fork 222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
External Cluster Environments #1244
base: main
Are you sure you want to change the base?
Changes from all commits
49985f5
d491508
686ed39
7e90611
2daf2cc
ff5aab5
20a5c56
0d43bec
875fafa
5d1d732
19f92c7
e0df4e2
5e23382
c9b90ce
0e0ae6f
9a9416e
9ba9ab7
0a4c6fb
d8b55b6
94ab66e
3514129
4fbe8b7
51256b9
8107dac
26cba1a
55065ca
3282e14
047538e
7c06f2b
c909106
a1a9e46
8554343
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -10,10 +10,15 @@ | |||||
from typing import Any | ||||||
|
||||||
import urllib3 | ||||||
from kubernetes import client, config | ||||||
import yaml | ||||||
from kubernetes import client | ||||||
from kubernetes.utils.create_from_yaml import create_from_yaml_single_item | ||||||
|
||||||
from enterprise_gateway.services.processproxies.k8s_client import kubernetes_client | ||||||
|
||||||
from ..kernels.remotemanager import RemoteKernelManager | ||||||
from ..sessions.kernelsessionmanager import KernelSessionManager | ||||||
from ..utils.envutils import is_env_true | ||||||
from .container import ContainerProcessProxy | ||||||
|
||||||
urllib3.disable_warnings() | ||||||
|
@@ -29,8 +34,6 @@ | |||||
share_gateway_namespace = bool(os.environ.get("EG_SHARED_NAMESPACE", "False").lower() == "true") | ||||||
kpt_dir = os.environ.get("EG_POD_TEMPLATE_DIR", "/tmp") # noqa | ||||||
|
||||||
config.load_incluster_config() | ||||||
|
||||||
|
||||||
class KubernetesProcessProxy(ContainerProcessProxy): | ||||||
""" | ||||||
|
@@ -56,6 +59,12 @@ async def launch_process( | |||||
) -> KubernetesProcessProxy: | ||||||
"""Launches the specified process within a Kubernetes environment.""" | ||||||
# Set env before superclass call, so we can see these in the debug output | ||||||
use_remote_cluster = os.getenv("EG_USE_REMOTE_CLUSTER") | ||||||
if use_remote_cluster: | ||||||
kwargs["env"]["EG_USE_REMOTE_CLUSTER"] = 'true' | ||||||
kwargs["env"]["EG_REMOTE_CLUSTER_KUBECONFIG_PATH"] = os.getenv( | ||||||
"EG_REMOTE_CLUSTER_KUBECONFIG_PATH" | ||||||
) | ||||||
|
||||||
# Kubernetes relies on internal env variables to determine its configuration. When | ||||||
# running within a K8s cluster, these start with KUBERNETES_SERVICE, otherwise look | ||||||
|
@@ -85,7 +94,7 @@ def get_container_status(self, iteration: int | None) -> str: | |||||
# is used for the assigned_ip. | ||||||
pod_status = "" | ||||||
kernel_label_selector = "kernel_id=" + self.kernel_id + ",component=kernel" | ||||||
ret = client.CoreV1Api().list_namespaced_pod( | ||||||
ret = client.CoreV1Api(api_client=kubernetes_client).list_namespaced_pod( | ||||||
namespace=self.kernel_namespace, label_selector=kernel_label_selector | ||||||
) | ||||||
if ret and ret.items: | ||||||
|
@@ -121,7 +130,7 @@ def delete_managed_object(self, termination_stati: list[str]) -> bool: | |||||
# Deleting a Pod will return a v1.Pod if found and its status will be a PodStatus containing | ||||||
# a phase string property | ||||||
# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podstatus-v1-core | ||||||
v1_pod = client.CoreV1Api().delete_namespaced_pod( | ||||||
v1_pod = client.CoreV1Api(api_client=kubernetes_client).delete_namespaced_pod( | ||||||
namespace=self.kernel_namespace, body=body, name=self.container_name | ||||||
) | ||||||
status = None | ||||||
|
@@ -168,7 +177,7 @@ def terminate_container_resources(self) -> bool | None: | |||||
body = client.V1DeleteOptions( | ||||||
grace_period_seconds=0, propagation_policy="Background" | ||||||
) | ||||||
v1_status = client.CoreV1Api().delete_namespace( | ||||||
v1_status = client.CoreV1Api(api_client=kubernetes_client).delete_namespace( | ||||||
name=self.kernel_namespace, body=body | ||||||
) | ||||||
status = None | ||||||
|
@@ -239,7 +248,7 @@ def _determine_kernel_namespace(self, **kwargs: dict[str, Any] | None) -> str: | |||||
|
||||||
# If KERNEL_NAMESPACE was provided, then we assume it already exists. If not provided, then we'll | ||||||
# create the namespace and record that we'll want to delete it as well. | ||||||
namespace = kwargs["env"].get("KERNEL_NAMESPACE") | ||||||
namespace = os.environ.get("KERNEL_NAMESPACE") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prior to launch,
Suggested change
|
||||||
if namespace is None: | ||||||
# check if share gateway namespace is configured... | ||||||
if share_gateway_namespace: # if so, set to EG namespace | ||||||
|
@@ -283,10 +292,17 @@ def _create_kernel_namespace(self, service_account_name: str) -> str: | |||||
|
||||||
# create the namespace | ||||||
try: | ||||||
client.CoreV1Api().create_namespace(body=body) | ||||||
client.CoreV1Api(api_client=kubernetes_client).create_namespace(body=body) | ||||||
self.delete_kernel_namespace = True | ||||||
self.log.info(f"Created kernel namespace: {namespace}") | ||||||
|
||||||
# If remote cluster is being used, service account may not be present, create before role binding | ||||||
# If creating service account is disabled, operator must manually create svc account | ||||||
if is_env_true('EG_USE_REMOTE_CLUSTER') and is_env_true('EG_CREATE_REMOTE_SVC_ACCOUNT'): | ||||||
self._create_service_account_if_not_exists( | ||||||
namespace=namespace, service_account_name=service_account_name | ||||||
) | ||||||
|
||||||
# Now create a RoleBinding for this namespace for the default ServiceAccount. We'll reference | ||||||
# the ClusterRole, but that will only be applied for this namespace. This prevents the need for | ||||||
# creating a role each time. | ||||||
|
@@ -310,14 +326,66 @@ def _create_kernel_namespace(self, service_account_name: str) -> str: | |||||
body = client.V1DeleteOptions( | ||||||
grace_period_seconds=0, propagation_policy="Background" | ||||||
) | ||||||
client.CoreV1Api().delete_namespace(name=namespace, body=body) | ||||||
client.CoreV1Api(api_client=kubernetes_client).delete_namespace( | ||||||
name=namespace, body=body | ||||||
) | ||||||
self.log.warning(f"Deleted kernel namespace: {namespace}") | ||||||
else: | ||||||
reason = f"Error occurred creating namespace '{namespace}': {err}" | ||||||
self.log_and_raise(http_status_code=500, reason=reason) | ||||||
|
||||||
return namespace | ||||||
|
||||||
def _create_service_account_if_not_exists( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is strictly for external clusters, could we rename this to something like: |
||||||
self, namespace: str, service_account_name: str | ||||||
) -> None: | ||||||
"""If service account doesn't exist in target cluster, create one. Occurs if a remote cluster is being used.""" | ||||||
service_account_list_in_namespace: client.V1ServiceAccountList = client.CoreV1Api( | ||||||
api_client=kubernetes_client | ||||||
).list_namespaced_service_account(namespace=namespace) | ||||||
|
||||||
service_accounts_in_namespace: list[ | ||||||
client.V1ServiceAccount | ||||||
] = service_account_list_in_namespace.items | ||||||
service_account_names_in_namespace: list[str] = [ | ||||||
svcaccount.metadata.name for svcaccount in service_accounts_in_namespace | ||||||
] | ||||||
|
||||||
if service_account_name not in service_account_names_in_namespace: | ||||||
service_account_metadata = {"name": service_account_name} | ||||||
service_account_to_create: client.V1ServiceAccount = client.V1ServiceAccount( | ||||||
kind="ServiceAccount", metadata=service_account_metadata | ||||||
) | ||||||
|
||||||
client.CoreV1Api(api_client=kubernetes_client).create_namespaced_service_account( | ||||||
namespace=namespace, body=service_account_to_create | ||||||
) | ||||||
|
||||||
self.log.info( | ||||||
f"Created service account {service_account_name} in namespace {namespace}" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any way to access the "name" of the external cluster? Seems really helpful to include that here.
Suggested change
|
||||||
) | ||||||
|
||||||
def _create_role_if_not_exists(self, namespace: str) -> None: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A similar name change to that suggested previously would be nice here. |
||||||
"""If role doesn't exist in target cluster, create one. Occurs if a remote cluster is being used""" | ||||||
role_yaml_path = os.getenv('EG_REMOTE_CLUSTER_ROLE_PATH') | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should This line (and any validation) should be moved within the |
||||||
|
||||||
# Get Roles in remote cluster | ||||||
remote_cluster_roles: client.V1RoleList = client.RbacAuthorizationV1Api( | ||||||
api_client=kubernetes_client | ||||||
).list_namespaced_role(namespace=namespace) | ||||||
remote_cluster_role_names = [role.metadata.name for role in remote_cluster_roles.items] | ||||||
|
||||||
# If the kernel Role does not exist in the remote cluster. | ||||||
if kernel_cluster_role not in remote_cluster_role_names: | ||||||
with open(role_yaml_path) as f: | ||||||
role_yaml = yaml.safe_load(f) | ||||||
role_yaml["metadata"]["namespace"] = namespace | ||||||
create_from_yaml_single_item(yml_object=role_yaml, k8s_client=kubernetes_client) | ||||||
|
||||||
self.log.info(f"Created role {kernel_cluster_role} in namespace {namespace}") | ||||||
else: | ||||||
self.log.info(f"Found role {kernel_cluster_role} in namespace {namespace}") | ||||||
|
||||||
def _create_role_binding(self, namespace: str, service_account_name: str) -> None: | ||||||
# Creates RoleBinding instance for the given namespace. The role used will be the ClusterRole named by | ||||||
# EG_KERNEL_CLUSTER_ROLE. | ||||||
|
@@ -330,9 +398,17 @@ def _create_role_binding(self, namespace: str, service_account_name: str) -> Non | |||||
role_binding_name = kernel_cluster_role # use same name for binding as cluster role | ||||||
labels = {"app": "enterprise-gateway", "component": "kernel", "kernel_id": self.kernel_id} | ||||||
binding_metadata = client.V1ObjectMeta(name=role_binding_name, labels=labels) | ||||||
binding_role_ref = client.V1RoleRef( | ||||||
api_group="", kind="ClusterRole", name=kernel_cluster_role | ||||||
) | ||||||
|
||||||
# If remote cluster is used, we need to create a role on that cluster | ||||||
if is_env_true('EG_USE_REMOTE_CLUSTER'): | ||||||
self._create_role_if_not_exists(namespace=namespace) | ||||||
# We use namespaced roles on remote clusters rather than a ClusterRole | ||||||
binding_role_ref = client.V1RoleRef(api_group="", kind="Role", name=kernel_cluster_role) | ||||||
else: | ||||||
binding_role_ref = client.V1RoleRef( | ||||||
api_group="", kind="ClusterRole", name=kernel_cluster_role | ||||||
) | ||||||
|
||||||
binding_subjects = client.V1Subject( | ||||||
api_group="", kind="ServiceAccount", name=service_account_name, namespace=namespace | ||||||
) | ||||||
|
@@ -344,7 +420,7 @@ def _create_role_binding(self, namespace: str, service_account_name: str) -> Non | |||||
subjects=[binding_subjects], | ||||||
) | ||||||
|
||||||
client.RbacAuthorizationV1Api().create_namespaced_role_binding( | ||||||
client.RbacAuthorizationV1Api(api_client=kubernetes_client).create_namespaced_role_binding( | ||||||
namespace=namespace, body=body | ||||||
) | ||||||
self.log.info( | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
"""Instantiates a static global factory and a single atomic client""" | ||
from enterprise_gateway.services.processproxies.k8s_client_factory import KubernetesClientFactory | ||
|
||
KUBERNETES_CLIENT_FACTORY = KubernetesClientFactory() | ||
kubernetes_client = KUBERNETES_CLIENT_FACTORY.get_kubernetes_client() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
"""Contains factory to create kubernetes api client instances using a single confguration""" | ||
import os | ||
|
||
from kubernetes import client, config | ||
from traitlets.config import SingletonConfigurable | ||
|
||
from enterprise_gateway.services.utils.envutils import is_env_true | ||
|
||
|
||
class KubernetesClientFactory(SingletonConfigurable): | ||
"""Manages kubernetes client creation from environment variables""" | ||
|
||
def get_kubernetes_client(self) -> client.ApiClient: | ||
"""Get kubernetes api client with appropriate configuration | ||
Returns: | ||
ApiClient: Kubernetes API client for appropriate cluster | ||
""" | ||
kubernetes_config: client.Configuration = client.Configuration() | ||
if os.getenv("KUBERNETES_SERVICE_HOST"): | ||
# Running inside cluster | ||
if is_env_true('EG_USE_REMOTE_CLUSTER') and not is_env_true('EG_SHARED_NAMESPACE'): | ||
kubeconfig_path = os.getenv( | ||
'EG_REMOTE_CLUSTER_KUBECONFIG_PATH', '/etc/kube/config/kubeconfig' | ||
) | ||
context = os.getenv('EG_REMOTE_CLUSTER_CONTEXT', None) | ||
config.load_kube_config( | ||
client_configuration=kubernetes_config, | ||
config_file=kubeconfig_path, | ||
context=context, | ||
) | ||
else: | ||
if is_env_true('EG_USE_REMOTE_CLUSTER'): | ||
self.log.warning( | ||
"Cannot use EG_USE_REMOTE_CLUSTER and EG_SHARED_NAMESPACE at the same time. Using local cluster...." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
) | ||
|
||
config.load_incluster_config(client_configuration=kubernetes_config) | ||
else: | ||
config.load_kube_config(client_configuration=kubernetes_config) | ||
|
||
self.log.debug(f"Created kubernetes client for host {kubernetes_config.host}") | ||
return client.ApiClient(kubernetes_config) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
"""""Utilities to make checking environment variables easier""" | ||
import os | ||
|
||
|
||
def is_env_true(env_variable_name: str) -> bool: | ||
"""If environment variable is set and value is case-insensitively "true", then return true. Else return false""" | ||
return bool(os.getenv(env_variable_name, "False").lower() == "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we refactor this filename to include an underscore for word separation?