From 059bd225fbb9027b9a4dab08a1a360781a0d9db0 Mon Sep 17 00:00:00 2001 From: Mecoli1219 Date: Fri, 21 Feb 2025 00:56:04 -0800 Subject: [PATCH] [Feature] Enable memory increase on OOM failure Signed-off-by: Mecoli1219 --- flytekit/core/python_auto_container.py | 2 ++ flytekit/core/resources.py | 5 +++++ flytekit/core/utils.py | 18 ++++++++++++++++++ flytekit/models/task.py | 1 + 4 files changed, 26 insertions(+) diff --git a/flytekit/core/python_auto_container.py b/flytekit/core/python_auto_container.py index dfbd678fb6..306a880aac 100644 --- a/flytekit/core/python_auto_container.py +++ b/flytekit/core/python_auto_container.py @@ -223,10 +223,12 @@ def _get_container(self, settings: SerializationSettings) -> _task_model.Contain cpu_request=self.resources.requests.cpu, gpu_request=self.resources.requests.gpu, memory_request=self.resources.requests.mem, + oom_reserved_memory_request = self.resources.requests.oom_reserved_mem, ephemeral_storage_limit=self.resources.limits.ephemeral_storage, cpu_limit=self.resources.limits.cpu, gpu_limit=self.resources.limits.gpu, memory_limit=self.resources.limits.mem, + oom_reserved_memory_limit=self.resources.limits.oom_reserved_mem, ) def get_k8s_pod(self, settings: SerializationSettings) -> _task_model.K8sPod: diff --git a/flytekit/core/resources.py b/flytekit/core/resources.py index f64b7d23dc..1f214a113b 100644 --- a/flytekit/core/resources.py +++ b/flytekit/core/resources.py @@ -34,6 +34,7 @@ class Resources(DataClassJSONMixin): mem: Optional[Union[str, int]] = None gpu: Optional[Union[str, int]] = None ephemeral_storage: Optional[Union[str, int]] = None + oom_reserved_mem: Optional[Union[str, int]] = None def __post_init__(self): def _check_cpu(value): @@ -52,6 +53,7 @@ def _check_others(value): _check_others(self.mem) _check_others(self.gpu) _check_others(self.ephemeral_storage) + _check_others(self.oom_reserved_mem) @dataclass @@ -79,6 +81,8 @@ def _convert_resources_to_resource_entries(resources: Resources) -> List[_Resour value=str(resources.ephemeral_storage), ) ) + if resources.oom_reserved_mem is not None: + resource_entries.append(_ResourceEntry(name=_ResourceName.OOM_RESERVED_MEMORY, value=str(resources.oom_reserved_mem))) return resource_entries @@ -117,6 +121,7 @@ def _construct_k8s_pods_resources(resources: Optional[Resources], k8s_gpu_resour "mem": "memory", "gpu": k8s_gpu_resource_key, "ephemeral_storage": "ephemeral-storage", + "oom_reserved_mem": "oom-reserved-memory", } k8s_pod_resources = {} diff --git a/flytekit/core/utils.py b/flytekit/core/utils.py index 9f1967d2f9..0dd2587c52 100644 --- a/flytekit/core/utils.py +++ b/flytekit/core/utils.py @@ -69,10 +69,12 @@ def _get_container_definition( cpu_request: Optional[Union[str, int, float]] = None, gpu_request: Optional[Union[str, int]] = None, memory_request: Optional[Union[str, int]] = None, + oom_reserved_memory_request: Optional[Union[str, int]] = None, ephemeral_storage_limit: Optional[Union[str, int]] = None, cpu_limit: Optional[Union[str, int, float]] = None, gpu_limit: Optional[Union[str, int]] = None, memory_limit: Optional[Union[str, int]] = None, + oom_reserved_memory_limit: Optional[Union[str, int]] = None, environment: Optional[Dict[str, str]] = None, ) -> "task_models.Container": ephemeral_storage_limit = ephemeral_storage_limit @@ -83,6 +85,8 @@ def _get_container_definition( gpu_request = gpu_request memory_limit = memory_limit memory_request = memory_request + oom_reserved_memory_limit = oom_reserved_memory_limit + oom_reserved_memory_request = oom_reserved_memory_request from flytekit.models import task as task_models @@ -101,6 +105,13 @@ def _get_container_definition( requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.GPU, gpu_request)) if memory_request: requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.MEMORY, memory_request)) + if oom_reserved_memory_request: + requests.append( + task_models.Resources.ResourceEntry( + task_models.Resources.ResourceName.OOM_RESERVED_MEMORY, + oom_reserved_memory_request, + ) + ) limits = [] if ephemeral_storage_limit: @@ -116,6 +127,13 @@ def _get_container_definition( limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.GPU, gpu_limit)) if memory_limit: limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.MEMORY, memory_limit)) + if oom_reserved_memory_limit: + limits.append( + task_models.Resources.ResourceEntry( + task_models.Resources.ResourceName.OOM_RESERVED_MEMORY, + oom_reserved_memory_limit, + ) + ) if environment is None: environment = {} diff --git a/flytekit/models/task.py b/flytekit/models/task.py index 52d3b92187..8e864b944f 100644 --- a/flytekit/models/task.py +++ b/flytekit/models/task.py @@ -29,6 +29,7 @@ class ResourceName(object): GPU = _core_task.Resources.GPU MEMORY = _core_task.Resources.MEMORY EPHEMERAL_STORAGE = _core_task.Resources.EPHEMERAL_STORAGE + OOM_RESERVED_MEMORY = _core_task.Resources.OOM_RESERVED_MEMORY class ResourceEntry(_common.FlyteIdlEntity): def __init__(self, name, value):