diff --git a/.changes/next-release/api-change-emrcontainers-7930.json b/.changes/next-release/api-change-emrcontainers-7930.json new file mode 100644 index 000000000000..98915674e2e8 --- /dev/null +++ b/.changes/next-release/api-change-emrcontainers-7930.json @@ -0,0 +1,5 @@ +{ + "type": "api-change", + "category": "``emr-containers``", + "description": "Add custom ``create-role-associations`` and ``delete-role-associations`` commands to create/delete role associations for EMR service accounts and provided IAM role." +} diff --git a/awscli/customizations/emrcontainers/__init__.py b/awscli/customizations/emrcontainers/__init__.py index dc93cf5c1c3d..259864cdfacb 100644 --- a/awscli/customizations/emrcontainers/__init__.py +++ b/awscli/customizations/emrcontainers/__init__.py @@ -13,6 +13,10 @@ from awscli.customizations.emrcontainers.update_role_trust_policy \ import UpdateRoleTrustPolicyCommand +from awscli.customizations.emrcontainers.create_role_associations \ + import CreateRoleAssociationsCommand +from awscli.customizations.emrcontainers.delete_role_associations \ + import DeleteRoleAssociationsCommand def initialize(cli): @@ -29,3 +33,8 @@ def inject_commands(command_table, session, **kwargs): """ command_table['update-role-trust-policy'] = UpdateRoleTrustPolicyCommand( session) + command_table['create-role-associations'] = CreateRoleAssociationsCommand( + session) + command_table['delete-role-associations'] = DeleteRoleAssociationsCommand( + session) + diff --git a/awscli/customizations/emrcontainers/constants.py b/awscli/customizations/emrcontainers/constants.py index a8e23e95f941..c4198b371ad7 100644 --- a/awscli/customizations/emrcontainers/constants.py +++ b/awscli/customizations/emrcontainers/constants.py @@ -14,6 +14,8 @@ # Declare all the constants used by Lifecycle in this file # Lifecycle role names +from enum import Enum + TRUST_POLICY_STATEMENT_FORMAT = '{ \ "Effect": "Allow", \ "Principal": { \ @@ -35,3 +37,11 @@ "were made!" TRUST_POLICY_UPDATE_SUCCESSFUL = "Successfully updated trust policy of role %s" + +SERVICE_ACCOUNT_NAMING = "emr-containers-sa-%(FRAMEWORK)s-%(COMPONENT)s-%(AWS_ACCOUNT_ID)s-%(BASE36_ENCODED_ROLE_NAME)s" + +class ServiceAccount(Enum): + SPARK_OPERATOR_SERVICE_ACCOUNT = "emr-containers-sa-spark-operator" + FLINK_OPERATOR_SERVICE_ACCOUNT = "emr-containers-sa-flink-operator" + LIVY_SERVICE_ACCOUNT = "emr-containers-sa-livy" + LIVY_SPARK_SERVICE_ACCOUNT = "emr-container-sa-spark-livy" \ No newline at end of file diff --git a/awscli/customizations/emrcontainers/create_role_associations.py b/awscli/customizations/emrcontainers/create_role_associations.py new file mode 100644 index 000000000000..ae2ef82cf152 --- /dev/null +++ b/awscli/customizations/emrcontainers/create_role_associations.py @@ -0,0 +1,298 @@ +# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +import logging, json, sys +import botocore + +from awscli.customizations.commands import BasicCommand +from awscli.customizations.emrcontainers.constants import ( + SERVICE_ACCOUNT_NAMING, + ServiceAccount, +) +from awscli.customizations.emrcontainers.base36 import Base36 +from awscli.customizations.emrcontainers.eks import EKS +from awscli.customizations.utils import uni_print, get_policy_arn_suffix +from awscli.customizations.emrcontainers.utils import get_region + +LOG = logging.getLogger(__name__) + + +class CreateRoleAssociationsCommand(BasicCommand): + NAME = "create-role-associations" + + DESCRIPTION = BasicCommand.FROM_FILE( + "emr-containers", "create-role-associations", "_description.rst" + ) + + ARG_TABLE = [ + { + "name": "cluster-name", + "help_text": ( + "Specify the name of the Amazon EKS cluster with " + "which the IAM Role would be associated." + ), + "required": True, + }, + { + "name": "namespace", + "help_text": ( + "Specify the job/application namespace from the Amazon EKS cluster " + "with which the IAM Role would be associated." + ), + "required": True, + }, + { + "name": "role-name", + "help_text": ( + "Specify the IAM Role name that you want to associate " + "with Amazon EMR on EKS." + ), + "required": True, + }, + { + "name": "type", + "help_text": ( + "Specify the EMR on EKS submission model and choose service accounts that you want to associate " + "with Amazon EMR on EKS. By default is start_job_run. Supported Types: start_job_run, " + "interactive_endpoint, spark_operator, flink_operator, livy." + ), + "required": False, + "choices": [ + "start_job_run", + "interactive_endpoint", + "spark_operator", + "flink_operator", + "livy", + ], + }, + { + "name": "operator-namespace", + "help_text": ( + "Specify the namespace that you want to associate the operator service account " + "with the IAM role. Default to the job/application namespace specified. Note: " + "If jobs are running in a different namespace than the operator installation namespace, " + "this parameter needs to be set as where the operator is running on." + ), + "required": False, + }, + { + "name": "service-account-name", + "help_text": ( + "Specify the service account name that you want to associate " + "with the IAM role. Default to Amazon EMR on EKS service accounts." + ), + "required": False, + }, + ] + + def _run_main(self, parsed_args, parsed_globals): + """Call to run the commands""" + + self._cluster_name = parsed_args.cluster_name + self._namespace = parsed_args.namespace + self._role_name = parsed_args.role_name + self._type = parsed_args.type or "start_job_run" + self._operator_namespace = parsed_args.operator_namespace + self._service_account_name = parsed_args.service_account_name + self._region = get_region(self._session, parsed_globals) + + result = self._create_role_associations(parsed_globals) + if result: + uni_print(json.dumps(result, indent=4)) + + return 0 + + def _create_role_associations(self, parsed_globals): + """Method to create role associations if not done already""" + eks_client = EKS( + self._session.create_client( + "eks", + region_name=self._region, + verify=parsed_globals.verify_ssl, + ) + ) + account_id = eks_client.get_account_id(self._cluster_name) + role_arn = f"arn:{get_policy_arn_suffix(self._region)}:iam::{account_id}:role/{self._role_name}" + + results = [] + # If service account provided, create association with provided service account and role + if self._service_account_name: + service_account_namespace_mapping = [ + (self._service_account_name, self._namespace) + ] + else: + # By default, create associations with EMR on EKS service accounts + base36 = Base36() + + base36_encoded_role_name = base36.encode(self._role_name) + LOG.debug(f"Base36 encoded role name: {base36_encoded_role_name}") + service_account_namespace_mapping = ( + self._get_emr_service_account_namespace_mapping( + account_id, base36_encoded_role_name + ) + ) + + for ( + service_account_name, + namespace, + ) in service_account_namespace_mapping: + LOG.debug( + f"Creating pod identity association for service account {service_account_name} and " + + f"role {self._role_name} in namespace {namespace}" + ) + try: + result = eks_client.create_pod_identity_association( + self._cluster_name, + namespace, + role_arn, + service_account_name, + ) + results.append( + result["association"] if "association" in result else result + ) + except botocore.exceptions.ClientError as error: + + # Raise the error if EKS throws exceptions other than ResourceInUseException + if error.response["Error"]["Code"] != "ResourceInUseException": + for result in results: + uni_print( + f"Rolling back association for service account {service_account_name} " + + f"and role {self._role_name} in namespace {namespace} as encountered an error", + out_file=sys.stderr, + ) + eks_client.delete_pod_identity_association( + cluster_name=result["clusterName"], + association_id=result["associationId"], + ) + raise Exception( + f"Failed to create pod identity association for service account {service_account_name}, " + + f"role {self._role_name} in namespace {namespace}: {error.response['Error']['Message']}" + ) from error + uni_print( + f"Skipping pod identity association creation because pod identity association already exists for " + + f"service account {service_account_name}and role {self._role_name} in namespace {namespace}: {error.response['Error']['Message']}\n", + out_file=sys.stderr, + ) + return results + + def _get_emr_service_account_namespace_mapping( + self, account_id, base36_encoded_role_name + ): + return getattr(self, f"_get_{self._type}_mapping")( + account_id, base36_encoded_role_name + ) + + def _get_start_job_run_mapping(self, account_id, base36_encoded_role_name): + emr_spark_components = ["client", "driver", "executor"] + return [ + ( + SERVICE_ACCOUNT_NAMING + % { + "FRAMEWORK": "spark", + "COMPONENT": component, + "AWS_ACCOUNT_ID": account_id, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name, + }, + self._namespace, + ) + for component in emr_spark_components + ] + + def _get_interactive_endpoint_mapping( + self, account_id, base36_encoded_role_name + ): + emr_spark_endpoint_components = ["jeg", "jeg-kernel", "session"] + return [ + ( + SERVICE_ACCOUNT_NAMING + % { + "FRAMEWORK": "spark", + "COMPONENT": component, + "AWS_ACCOUNT_ID": account_id, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name, + }, + self._namespace, + ) + for component in emr_spark_endpoint_components + ] + + def _get_spark_operator_mapping(self, account_id, base36_encoded_role_name): + emr_spark_operator_components = ["driver", "executor"] + service_accounts = [ + ( + ServiceAccount.SPARK_OPERATOR_SERVICE_ACCOUNT.value, + ( + self._operator_namespace + if self._operator_namespace + else self._namespace + ), + ) + ] + service_accounts.extend( + [ + ( + SERVICE_ACCOUNT_NAMING + % { + "FRAMEWORK": "spark", + "COMPONENT": component, + "AWS_ACCOUNT_ID": account_id, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name, + }, + self._namespace, + ) + for component in emr_spark_operator_components + ] + ) + return service_accounts + + def _get_flink_operator_mapping(self, account_id, base36_encoded_role_name): + emr_flink_operator_components = ["jobmanager", "taskmanager"] + service_accounts = [ + ( + ServiceAccount.FLINK_OPERATOR_SERVICE_ACCOUNT.value, + ( + self._operator_namespace + if self._operator_namespace + else self._namespace + ), + ) + ] + service_accounts.extend( + [ + ( + SERVICE_ACCOUNT_NAMING + % { + "FRAMEWORK": "flink", + "COMPONENT": component, + "AWS_ACCOUNT_ID": account_id, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name, + }, + self._namespace, + ) + for component in emr_flink_operator_components + ] + ) + return service_accounts + + def _get_livy_mapping(self, account_id, base36_encoded_role_name): + return [ + ( + ServiceAccount.LIVY_SERVICE_ACCOUNT.value, + ( + self._operator_namespace + if self._operator_namespace + else self._namespace + ), + ), + (ServiceAccount.LIVY_SPARK_SERVICE_ACCOUNT.value, self._namespace), + ] diff --git a/awscli/customizations/emrcontainers/delete_role_associations.py b/awscli/customizations/emrcontainers/delete_role_associations.py new file mode 100644 index 000000000000..af7912bc71bb --- /dev/null +++ b/awscli/customizations/emrcontainers/delete_role_associations.py @@ -0,0 +1,287 @@ +# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +import logging, json, sys +import botocore + +from awscli.customizations.commands import BasicCommand +from awscli.customizations.emrcontainers.constants import ( + SERVICE_ACCOUNT_NAMING, + ServiceAccount, +) +from awscli.customizations.emrcontainers.base36 import Base36 +from awscli.customizations.emrcontainers.eks import EKS +from awscli.customizations.utils import uni_print +from awscli.customizations.emrcontainers.utils import get_region + +LOG = logging.getLogger(__name__) + + +class DeleteRoleAssociationsCommand(BasicCommand): + NAME = "delete-role-associations" + + DESCRIPTION = BasicCommand.FROM_FILE( + "emr-containers", "delete-role-associations", "_description.rst" + ) + + ARG_TABLE = [ + { + "name": "cluster-name", + "help_text": ( + "Specify the name of the Amazon EKS cluster with " + "which the IAM Role would be dissociated." + ), + "required": True, + }, + { + "name": "namespace", + "help_text": ( + "Specify the job/application namespace from the Amazon EKS cluster " + "with which the IAM Role would be dissociated." + ), + "required": True, + }, + { + "name": "role-name", + "help_text": ( + "Specify the IAM Role name that you want to dissociate " + "with Amazon EMR on EKS." + ), + "required": True, + }, + { + "name": "type", + "help_text": ( + "Specify the EMR on EKS submission model and choose service accounts that you want to dissociate " + "with Amazon EMR on EKS. By default is start_job_run. Supported Types: start_job_run, " + "interactive_endpoint, spark_operator, flink_operator, livy." + ), + "required": False, + "choices": [ + "start_job_run", + "interactive_endpoint", + "spark_operator", + "flink_operator", + "livy", + ], + }, + { + "name": "operator-namespace", + "help_text": ( + "Specify the namespace that you want to dissociate the operator service account " + "with the IAM role. Default to the job/application namespace specified. Note: If jobs are running " + "in a different namespace than the operator installation namespace, this parameter needs to be set as " + "where the operator is running on." + ), + "required": False, + }, + { + "name": "service-account-name", + "help_text": ( + "Specify the service account name that you want to dissociate " + "with the IAM role. Default to Amazon EMR on EKS service accounts." + ), + "required": False, + }, + ] + + def _run_main(self, parsed_args, parsed_globals): + """Call to run the commands""" + + self._cluster_name = parsed_args.cluster_name + self._namespace = parsed_args.namespace + self._role_name = parsed_args.role_name + self._type = parsed_args.type or "start_job_run" + self._operator_namespace = parsed_args.operator_namespace + self._service_account_name = parsed_args.service_account_name + self._region = get_region(self._session, parsed_globals) + + result = self._delete_role_associations(parsed_globals) + if result: + uni_print(json.dumps(result, indent=4)) + + return 0 + + def _delete_role_associations(self, parsed_globals): + """Method to delete role associations if not done already""" + eks_client = EKS( + self._session.create_client( + "eks", + region_name=self._region, + verify=parsed_globals.verify_ssl, + ) + ) + account_id = eks_client.get_account_id(self._cluster_name) + + results = [] + # If service account provided, delete association with provided service account and role + if self._service_account_name: + service_account_namespace_mapping = [ + (self._service_account_name, self._namespace) + ] + else: + # By default, delete associations with EMR on EKS service accounts + base36 = Base36() + + base36_encoded_role_name = base36.encode(self._role_name) + LOG.debug(f"Base36 encoded role name: {base36_encoded_role_name}") + service_account_namespace_mapping = ( + self._get_emr_service_account_namespace_mapping( + account_id, base36_encoded_role_name + ) + ) + + for ( + service_account_name, + namespace, + ) in service_account_namespace_mapping: + try: + association = eks_client.list_pod_identity_associations( + self._cluster_name, namespace, service_account_name + ) + if not association.get("associations", []): + uni_print( + f"Skipping deletion as no pod identity association found for service account {service_account_name} " + + f"and role {self._role_name} in namespace {namespace}\n", + out_file=sys.stderr + ) + continue + association_id = association["associations"][0]["associationId"] + LOG.debug( + f"Deleting pod identity association for service account {service_account_name}" + + f"and role {self._role_name} in {namespace} {association_id} with association id %s" + ) + result = eks_client.delete_pod_identity_association( + self._cluster_name, association_id + ) + results.append( + result["association"] if "association" in result else result + ) + except botocore.exceptions.ClientError as error: + raise Exception( + f"Failed to delete pod identity association for service account {service_account_name}, " + + f"role {self._role_name} in namespace {namespace}: {error.response['Error']['Message']}" + ) from error + return results + + def _get_emr_service_account_namespace_mapping( + self, account_id, base36_encoded_role_name + ): + return getattr(self, f"_get_{self._type}_mapping")( + account_id, base36_encoded_role_name + ) + + def _get_start_job_run_mapping(self, account_id, base36_encoded_role_name): + emr_spark_components = ["client", "driver", "executor"] + return [ + ( + SERVICE_ACCOUNT_NAMING + % { + "FRAMEWORK": "spark", + "COMPONENT": component, + "AWS_ACCOUNT_ID": account_id, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name, + }, + self._namespace, + ) + for component in emr_spark_components + ] + + def _get_interactive_endpoint_mapping( + self, account_id, base36_encoded_role_name + ): + emr_spark_endpoint_components = ["jeg", "jeg-kernel", "session"] + return [ + ( + SERVICE_ACCOUNT_NAMING + % { + "FRAMEWORK": "spark", + "COMPONENT": component, + "AWS_ACCOUNT_ID": account_id, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name, + }, + self._namespace, + ) + for component in emr_spark_endpoint_components + ] + + def _get_spark_operator_mapping(self, account_id, base36_encoded_role_name): + emr_spark_operator_components = ["driver", "executor"] + service_accounts = [ + ( + ServiceAccount.SPARK_OPERATOR_SERVICE_ACCOUNT.value, + ( + self._operator_namespace + if self._operator_namespace + else self._namespace + ), + ) + ] + service_accounts.extend( + [ + ( + SERVICE_ACCOUNT_NAMING + % { + "FRAMEWORK": "spark", + "COMPONENT": component, + "AWS_ACCOUNT_ID": account_id, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name, + }, + self._namespace, + ) + for component in emr_spark_operator_components + ] + ) + return service_accounts + + def _get_flink_operator_mapping(self, account_id, base36_encoded_role_name): + emr_flink_operator_components = ["jobmanager", "taskmanager"] + service_accounts = [ + ( + ServiceAccount.FLINK_OPERATOR_SERVICE_ACCOUNT.value, + ( + self._operator_namespace + if self._operator_namespace + else self._namespace + ), + ) + ] + service_accounts.extend( + [ + ( + SERVICE_ACCOUNT_NAMING + % { + "FRAMEWORK": "flink", + "COMPONENT": component, + "AWS_ACCOUNT_ID": account_id, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name, + }, + self._namespace, + ) + for component in emr_flink_operator_components + ] + ) + return service_accounts + + def _get_livy_mapping(self, account_id, base36_encoded_role_name): + return [ + ( + ServiceAccount.LIVY_SERVICE_ACCOUNT.value, + ( + self._operator_namespace + if self._operator_namespace + else self._namespace + ), + ), + (ServiceAccount.LIVY_SPARK_SERVICE_ACCOUNT.value, self._namespace), + ] diff --git a/awscli/customizations/emrcontainers/eks.py b/awscli/customizations/emrcontainers/eks.py index 148785193489..209abbfa39c2 100644 --- a/awscli/customizations/emrcontainers/eks.py +++ b/awscli/customizations/emrcontainers/eks.py @@ -40,3 +40,25 @@ def get_account_id(self, cluster_name): "arn", "") return cluster_arn.split(':')[4] + + def create_pod_identity_association(self, cluster_name, namespace, role_arn, service_account): + return self.eks_client.create_pod_identity_association( + clusterName = cluster_name, + namespace = namespace, + roleArn = role_arn, + serviceAccount = service_account + ) + + def delete_pod_identity_association(self, cluster_name, association_id): + return self.eks_client.delete_pod_identity_association( + clusterName = cluster_name, + associationId = association_id + ) + + def list_pod_identity_associations(self, cluster_name, namespace, service_account): + return self.eks_client.list_pod_identity_associations( + clusterName = cluster_name, + namespace = namespace, + serviceAccount = service_account + ) + \ No newline at end of file diff --git a/awscli/customizations/emrcontainers/update_role_trust_policy.py b/awscli/customizations/emrcontainers/update_role_trust_policy.py index 036382c9ec16..4cc0bdce4f4c 100644 --- a/awscli/customizations/emrcontainers/update_role_trust_policy.py +++ b/awscli/customizations/emrcontainers/update_role_trust_policy.py @@ -23,20 +23,11 @@ from awscli.customizations.emrcontainers.eks import EKS from awscli.customizations.emrcontainers.iam import IAM from awscli.customizations.utils import uni_print, get_policy_arn_suffix +from awscli.customizations.emrcontainers.utils import get_region LOG = logging.getLogger(__name__) -# Method to parse the arguments to get the region value -def get_region(session, parsed_globals): - region = parsed_globals.region - - if region is None: - region = session.get_config_variable('region') - - return region - - def check_if_statement_exists(expected_statement, actual_assume_role_document): if actual_assume_role_document is None: return False diff --git a/awscli/customizations/emrcontainers/utils.py b/awscli/customizations/emrcontainers/utils.py new file mode 100644 index 000000000000..3caa2d9166ad --- /dev/null +++ b/awscli/customizations/emrcontainers/utils.py @@ -0,0 +1,27 @@ +# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +""" +Utility functions to make it easier to work with emr-containers customizations. + +""" + + +# Method to parse the arguments to get the region value +def get_region(session, parsed_globals): + region = parsed_globals.region + + if region is None: + region = session.get_config_variable("region") + + return region diff --git a/awscli/examples/emr-containers/create-role-associations.rst b/awscli/examples/emr-containers/create-role-associations.rst new file mode 100644 index 000000000000..7337431ccb72 --- /dev/null +++ b/awscli/examples/emr-containers/create-role-associations.rst @@ -0,0 +1,56 @@ +**To create role associations of an IAM Role with EMR service accounts to be used with Amazon EMR on EKS** + +This example command creates EKS pod identity associations of a role named **example_iam_role** with EMR service accounts such that it can be used with Amazon EMR on EKS with +**example_namespace** namespace from an EKS cluster named **example_cluster**. + +* Command:: + + aws emr-containers create-role-associations \ + --cluster-name example_cluster \ + --namespace example_namespace \ + --role-name example_iam_role \ + (--type example_type) \ + (--operator-namespace operator_namespace) \ + (--service-account-name custom_service_account) + +* Output:: + + If the iam role has already been associated with emr service accounts for pod identity, then the output will be: + An error occurred (ResourceInUseException) when calling the CreatePodIdentityAssociation operation: Association already exists: + + If the iam role has not been associated with emr service accounts yet, then the output will be: + [ + { + "clusterName": "example_cluster", + "namespace": "example_namespace", + "serviceAccount": "emr-spark-client-service-account-example", + "roleArn": "example_iam_role", + "associationArn": "example_association_arn", + "associationId": "example_association_id", + "tags": {}, + "createdAt": "example_created_at", + "modifiedAt": "example_modified_at" + }, + { + "clusterName": "example_cluster", + "namespace": "example_namespace", + "serviceAccount": "emr-spark-driver-service-account-example", + "roleArn": "example_iam_role", + "associationArn": "example_association_arn", + "associationId": "example_association_id", + "tags": {}, + "createdAt": "example_created_at", + "modifiedAt": "example_modified_at" + }, + { + "clusterName": "example_cluster", + "namespace": "example_namespace", + "serviceAccount": "emr-spark-executor-service-account-example", + "roleArn": "example_iam_role", + "associationArn": "example_association_arn", + "associationId": "example_association_id", + "tags": {}, + "createdAt": "example_created_at", + "modifiedAt": "example_modified_at" + } + ] diff --git a/awscli/examples/emr-containers/create-role-associations/_description.rst b/awscli/examples/emr-containers/create-role-associations/_description.rst new file mode 100644 index 000000000000..ae35b3ae9303 --- /dev/null +++ b/awscli/examples/emr-containers/create-role-associations/_description.rst @@ -0,0 +1,19 @@ +Creates role associations of given IAM role with EMR service accounts such that it can be used with Amazon EMR on EKS with the given namespace from the given EKS cluster. + +Note: +The command would associate EMR service accounts with provided IAM role to EKS pod identity: + +* "emr-containers-sa-%(FRAMEWORK)s-%(COMPONENT)s-%(AWS_ACCOUNT_ID)s-%(BASE36_ENCODED_ROLE_NAME)s" + +Here:: + + = EMR on EKS framework such as spark, flink, livy + = Task component for the framework. Such as client, driver, executor for spark; flink-operator, jobmanager, taskmanager for flink. + = AWS Account ID of the EKS cluster + = Base36 encoded form of the IAM Role name + +You can use the **--type** option to select which submission model to associate. + +You can use the **--operator-namespace** option if your operator/livy jobs are running in a different operator namespace other than your job/application namespace. **--namespace** should be the Job/Application Namespace, and this option is for operator namespace to associate operator/livy service account. + +You can use the **--service-account-name** option to associate a custom service account with the role. diff --git a/awscli/examples/emr-containers/delete-role-associations.rst b/awscli/examples/emr-containers/delete-role-associations.rst new file mode 100644 index 000000000000..f96b8426ca78 --- /dev/null +++ b/awscli/examples/emr-containers/delete-role-associations.rst @@ -0,0 +1,55 @@ +**To delete role associations of an IAM Role with EMR service accounts** + +This example command deletes EKS pod identity associations of a role named **example_iam_role** with EMR service accounts such that it can be removed from Amazon EMR on EKS with +**example_namespace** namespace from an EKS cluster named **example_cluster**. + +EKS allows associations with non existing resources (namespace, service account), so EMR on EKS suggest to delete the associations if the namespace is deleted or the role is not in use to release the space for other associations. + +* Command:: + + aws emr-containers delete-role-associations \ + --cluster-name example_cluster \ + --namespace example_namespace \ + --role-name example_iam_role \ + (--type example_type) \ + (--operator-namespace operator_namespace) \ + (--service-account-name custom_service_account) + +* Output:: + + If the iam role has been dissociated with emr service accounts, then the output will be: + [ + { + "clusterName": "example_cluster", + "namespace": "example_namespace", + "serviceAccount": "emr-spark-client-service-account-example", + "roleArn": "example_iam_role", + "associationArn": "example_association_arn", + "associationId": "example_association_id", + "tags": {}, + "createdAt": "example_created_at", + "modifiedAt": "example_modified_at" + }, + { + "clusterName": "example_cluster", + "namespace": "example_namespace", + "serviceAccount": "emr-spark-driver-service-account-example", + "roleArn": "example_iam_role", + "associationArn": "example_association_arn", + "associationId": "example_association_id", + "tags": {}, + "createdAt": "example_created_at", + "modifiedAt": "example_modified_at" + }, + { + "clusterName": "example_cluster", + "namespace": "example_namespace", + "serviceAccount": "emr-spark-executor-service-account-example", + "roleArn": "example_iam_role", + "associationArn": "example_association_arn", + "associationId": "example_association_id", + "tags": {}, + "createdAt": "example_created_at", + "modifiedAt": "example_modified_at" + } + ] diff --git a/awscli/examples/emr-containers/delete-role-associations/_description.rst b/awscli/examples/emr-containers/delete-role-associations/_description.rst new file mode 100644 index 000000000000..066b574bbeed --- /dev/null +++ b/awscli/examples/emr-containers/delete-role-associations/_description.rst @@ -0,0 +1,19 @@ +Deletes role associations of given IAM role with EMR service accounts such that it will be removed from Amazon EMR on EKS with the given namespace from the given EKS cluster. + +Note: +The command would dissociate EMR service accounts with provided IAM role to EKS pod identity: + +* "emr-containers-sa-%(FRAMEWORK)s-%(COMPONENT)s-%(AWS_ACCOUNT_ID)s-%(BASE36_ENCODED_ROLE_NAME)s" + +Here:: + + = EMR on EKS framework such as spark, flink, livy + = Task component for the framework. Such as client, driver, executor for spark; flink-operator, jobmanager, taskmanager for flink. + = AWS Account ID of the EKS cluster + = Base36 encoded form of the IAM Role name + +You can use the **--type** option to select which submission model to dissociate. + +You can use the **--operator-namespace** option if your operator/livy jobs are running in a different operator namespace other than your job/application namespace. **--namespace** should be the Job/Application Namespace, and this option is for operator namespace to dissociate operator/livy service account. + +You can use the **--service-account-name** option to dissociate a custom service account with the role. diff --git a/tests/unit/customizations/emrcontainers/conftest.py b/tests/unit/customizations/emrcontainers/conftest.py new file mode 100644 index 000000000000..158441dc3aa7 --- /dev/null +++ b/tests/unit/customizations/emrcontainers/conftest.py @@ -0,0 +1,177 @@ +import pytest +from tests import SessionStubber, CLIRunner +from awscli.customizations.emrcontainers.base36 import Base36 +from awscli.customizations.emrcontainers.constants import ( + SERVICE_ACCOUNT_NAMING, + ServiceAccount, +) + + +@pytest.fixture +def session_stubber(): + return SessionStubber() + + +@pytest.fixture +def region(): + return "us-west-2" + + +@pytest.fixture +def cli_runner(session_stubber, region): + cli_runner = CLIRunner(session_stubber=session_stubber) + cli_runner.env["AWS_DEFAULT_REGION"] = region + return cli_runner + + +@pytest.fixture +def account_id(): + return "123456789012" + + +@pytest.fixture +def cluster_name(): + return "test-cluster" + + +@pytest.fixture +def namespace(): + return "test" + + +@pytest.fixture +def role_name(): + return "myrole" + + +@pytest.fixture +def role_arn(account_id, role_name): + return f"arn:aws:iam::{account_id}:role/{role_name}" + + +@pytest.fixture +def base36_encoded_role_name(role_name): + return Base36().encode(role_name) + + +## StartJobRun expected service accounts +@pytest.fixture +def start_job_run_service_accounts(account_id, base36_encoded_role_name): + emr_spark_components = ["client", "driver", "executor"] + return [ + SERVICE_ACCOUNT_NAMING + % { + "FRAMEWORK": "spark", + "COMPONENT": component, + "AWS_ACCOUNT_ID": account_id, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name, + } + for component in emr_spark_components + ] + + +## InteractiveEndpoint expected service accounts +@pytest.fixture +def interactive_endpoint_service_accounts(account_id, base36_encoded_role_name): + emr_spark_components = ["jeg", "jeg-kernel", "session"] + return [ + SERVICE_ACCOUNT_NAMING + % { + "FRAMEWORK": "spark", + "COMPONENT": component, + "AWS_ACCOUNT_ID": account_id, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name, + } + for component in emr_spark_components + ] + + +## SparkOperator expected service accounts +@pytest.fixture +def spark_operator_service_accounts(account_id, base36_encoded_role_name): + emr_spark_operator_components = ["driver", "executor"] + spark_operator_service_accounts = [ + ServiceAccount.SPARK_OPERATOR_SERVICE_ACCOUNT.value + ] + spark_operator_service_accounts.extend( + [ + SERVICE_ACCOUNT_NAMING + % { + "FRAMEWORK": "spark", + "COMPONENT": component, + "AWS_ACCOUNT_ID": account_id, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name, + } + for component in emr_spark_operator_components + ] + ) + return spark_operator_service_accounts + + +## FlinkOperator expected service accounts +@pytest.fixture +def flink_operator_service_accounts(account_id, base36_encoded_role_name): + emr_flink_operator_components = ["jobmanager", "taskmanager"] + flink_operator_service_accounts = [ + ServiceAccount.FLINK_OPERATOR_SERVICE_ACCOUNT.value + ] + flink_operator_service_accounts.extend( + [ + SERVICE_ACCOUNT_NAMING + % { + "FRAMEWORK": "flink", + "COMPONENT": component, + "AWS_ACCOUNT_ID": account_id, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name, + } + for component in emr_flink_operator_components + ] + ) + return flink_operator_service_accounts + + +## Livy expected service accounts +@pytest.fixture +def livy_service_accounts(): + return [ + ServiceAccount.LIVY_SERVICE_ACCOUNT.value, + ServiceAccount.LIVY_SPARK_SERVICE_ACCOUNT.value, + ] + + +@pytest.fixture +def create_pod_identity_association_response(): + return b'{"association": {"associationId": "a-12345678", "clusterName": "test-cluster", "namespace": "namespace", "serviceAccount":"dummy"}}' + + +@pytest.fixture +def describe_cluster_response(): + return b'{"cluster": {"arn": "arn:aws:eks:us-west-2:123456789012:cluster/test-cluster"}}' + + +@pytest.fixture +def list_pod_identity_association_response(): + return [ + b'{"associations":[{"associationId": "1", "clusterName": "test-cluster", "namespace": "namespace", "serviceAccount":"dummy1"}]}', + b'{"associations":[{"associationId": "2", "clusterName": "test-cluster", "namespace": "namespace", "serviceAccount":"dummy2"}]}', + b'{"associations":[{"associationId": "3", "clusterName": "test-cluster", "namespace": "namespace", "serviceAccount":"dummy3"}]}', + ] + + +@pytest.fixture +def delete_pod_identity_association_response(): + return [ + b'{"association": {"association_id": "1", "clusterName": "test-cluster", "namespace": "namespace", "serviceAccount":"dummy1"}}', + b'{"association": {"association_id": "2", "clusterName": "test-cluster", "namespace": "namespace", "serviceAccount":"dummy2"}}', + b'{"association": {"association_id": "3", "clusterName": "test-cluster", "namespace": "namespace", "serviceAccount":"dummy3"}}', + ] + + +@pytest.fixture +def create_pod_identity_association_already_exists_error_response(): + return b'{"Code": "ResourceInUseException","Message": "An error occurred (ResourceInUseException) when calling the CreatePodIdentityAssociation operation: Association already exists: a-1"}' + + +@pytest.fixture +def create_pod_identity_association_other_error_response(): + return b'{"Code": "InvliadRequestException","Message": "Bad Request!"}' diff --git a/tests/unit/customizations/emrcontainers/test_create_role_associations.py b/tests/unit/customizations/emrcontainers/test_create_role_associations.py new file mode 100644 index 000000000000..63681c1b3f6a --- /dev/null +++ b/tests/unit/customizations/emrcontainers/test_create_role_associations.py @@ -0,0 +1,596 @@ +# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +from tests import HTTPResponse + + +class TestCreateRoleAssociationsCommand: + def assert_call_matches( + self, aws_request, service_account, cluster_name, namespace, role_arn + ): + assert aws_request.service_name == "eks" + assert aws_request.operation_name == "CreatePodIdentityAssociation" + assert aws_request.params["clusterName"] == cluster_name + assert aws_request.params["namespace"] == namespace + assert aws_request.params["roleArn"] == role_arn + assert aws_request.params["serviceAccount"] == service_account + + def assert_roll_back(self, aws_request, cluster_name, associationId): + assert aws_request.service_name == "eks" + assert aws_request.operation_name == "DeletePodIdentityAssociation" + assert aws_request.params["clusterName"] == cluster_name + assert aws_request.params["associationId"] == associationId + + # Use case: Expect to return create pod identity association results for start job run + # Expected results: Operation is performed by client + # to create pod identity associations with start job run service accounts + def test_create_role_associations_for_start_job_run( + self, + cli_runner, + role_arn, + cluster_name, + namespace, + role_name, + describe_cluster_response, + create_pod_identity_association_response, + start_job_run_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + result = cli_runner.run( + [ + "emr-containers", + "create-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + ] + ) + + assert result.aws_requests[0].service_name == "eks" + assert result.aws_requests[0].operation_name == "DescribeCluster" + for i in range(len(start_job_run_service_accounts)): + self.assert_call_matches( + result.aws_requests[i + 1], + start_job_run_service_accounts[i], + cluster_name, + namespace, + role_arn, + ) + + # Use case: Expect to return create pod identity association results for interactive endpoint + # Expected results: Operation is performed by client + # to create pod identity associations with interactive endpoint service accounts + def test_create_role_associations_for_interactive_endpoint( + self, + cli_runner, + role_arn, + cluster_name, + namespace, + role_name, + describe_cluster_response, + create_pod_identity_association_response, + interactive_endpoint_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + result = cli_runner.run( + [ + "emr-containers", + "create-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "interactive_endpoint", + ] + ) + + assert result.aws_requests[0].service_name == "eks" + assert result.aws_requests[0].operation_name == "DescribeCluster" + for i in range(len(interactive_endpoint_service_accounts)): + self.assert_call_matches( + result.aws_requests[i + 1], + interactive_endpoint_service_accounts[i], + cluster_name, + namespace, + role_arn, + ) + + # Use case: Expect to return create pod identity association results for spark operator + # Expected results: Operation is performed by client + # to create pod identity associations with spark operator service accounts + def test_create_role_associations_for_spark_operator( + self, + cli_runner, + role_arn, + cluster_name, + namespace, + role_name, + describe_cluster_response, + create_pod_identity_association_response, + spark_operator_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + result = cli_runner.run( + [ + "emr-containers", + "create-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "spark_operator", + ] + ) + + assert result.aws_requests[0].service_name == "eks" + assert result.aws_requests[0].operation_name == "DescribeCluster" + for i in range(len(spark_operator_service_accounts)): + self.assert_call_matches( + result.aws_requests[i + 1], + spark_operator_service_accounts[i], + cluster_name, + namespace, + role_arn, + ) + + # Use case: Expect to return create pod identity association results for spark operator with operator namespace + # Expected results: Operation is performed by client + # to create pod identity associations with spark operator service accounts in correct namespaces + def test_create_role_associations_for_spark_operator_namespace( + self, + cli_runner, + role_arn, + cluster_name, + namespace, + role_name, + describe_cluster_response, + create_pod_identity_association_response, + spark_operator_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + result = cli_runner.run( + [ + "emr-containers", + "create-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "spark_operator", + "--operator-namespace", + "spark-operator", + ] + ) + + assert result.aws_requests[0].service_name == "eks" + assert result.aws_requests[0].operation_name == "DescribeCluster" + for i in range(len(spark_operator_service_accounts)): + ns = "spark-operator" if i == 0 else namespace + self.assert_call_matches( + result.aws_requests[i + 1], + spark_operator_service_accounts[i], + cluster_name, + ns, + role_arn, + ) + + # Use case: Expect to return create pod identity association results for flink operator + # Expected results: Operation is performed by client + # to create pod identity associations with flink operator service accounts + def test_create_role_associations_for_flink_operator( + self, + cli_runner, + role_arn, + cluster_name, + namespace, + role_name, + describe_cluster_response, + create_pod_identity_association_response, + flink_operator_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + result = cli_runner.run( + [ + "emr-containers", + "create-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "flink_operator", + ] + ) + + assert result.aws_requests[0].service_name == "eks" + assert result.aws_requests[0].operation_name == "DescribeCluster" + for i in range(len(flink_operator_service_accounts)): + self.assert_call_matches( + result.aws_requests[i + 1], + flink_operator_service_accounts[i], + cluster_name, + namespace, + role_arn, + ) + + # Use case: Expect to return create pod identity association results for flink operator with operator namespaces + # Expected results: Operation is performed by client + # to create pod identity associations with flink operator service accounts in correct namespace + def test_create_role_associations_for_flink_operator_namespace( + self, + cli_runner, + role_arn, + cluster_name, + namespace, + role_name, + describe_cluster_response, + create_pod_identity_association_response, + flink_operator_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + result = cli_runner.run( + [ + "emr-containers", + "create-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "flink_operator", + "--operator-namespace", + "flink-operator", + ] + ) + + assert result.aws_requests[0].service_name == "eks" + assert result.aws_requests[0].operation_name == "DescribeCluster" + for i in range(len(flink_operator_service_accounts)): + ns = "flink-operator" if i == 0 else namespace + self.assert_call_matches( + result.aws_requests[i + 1], + flink_operator_service_accounts[i], + cluster_name, + ns, + role_arn, + ) + + # Use case: Expect to return create pod identity association results for livy + # Expected results: Operation is performed by client + # to create pod identity associations with livy service accounts + def test_create_role_associations_for_livy( + self, + cli_runner, + role_arn, + cluster_name, + namespace, + role_name, + describe_cluster_response, + create_pod_identity_association_response, + livy_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + result = cli_runner.run( + [ + "emr-containers", + "create-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "livy", + ] + ) + + assert result.aws_requests[0].service_name == "eks" + assert result.aws_requests[0].operation_name == "DescribeCluster" + for i in range(len(livy_service_accounts)): + self.assert_call_matches( + result.aws_requests[i + 1], + livy_service_accounts[i], + cluster_name, + namespace, + role_arn, + ) + + # Use case: Expect to return create pod identity association results for livy with controller namespace + # Expected results: Operation is performed by client + # to create pod identity associations with livy service accounts in correct namespace + def test_create_role_associations_for_livy_namespace( + self, + cli_runner, + role_arn, + cluster_name, + namespace, + role_name, + describe_cluster_response, + create_pod_identity_association_response, + livy_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + result = cli_runner.run( + [ + "emr-containers", + "create-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "livy", + "--operator-namespace", + "livy", + ] + ) + + assert result.aws_requests[0].service_name == "eks" + assert result.aws_requests[0].operation_name == "DescribeCluster" + for i in range(len(livy_service_accounts)): + ns = "livy" if i == 0 else namespace + self.assert_call_matches( + result.aws_requests[i + 1], + livy_service_accounts[i], + cluster_name, + ns, + role_arn, + ) + + # Use case: Expect to return create pod identity association results for customer input service accounts + # Expected results: Operation is performed by client + # to create pod identity associations with customer input service accounts + def test_create_role_associations_for_customer_service_account( + self, + cli_runner, + role_arn, + cluster_name, + namespace, + role_name, + describe_cluster_response, + create_pod_identity_association_response, + spark_operator_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + result = cli_runner.run( + [ + "emr-containers", + "create-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--service-account", + "test_sa", + ] + ) + + assert result.aws_requests[0].service_name == "eks" + assert result.aws_requests[0].operation_name == "DescribeCluster" + self.assert_call_matches( + result.aws_requests[1], "test_sa", cluster_name, namespace, role_arn + ) + + # Use case: Expect to return ResourceInUse exception and do nothing on already existed associations + # Expected results: Association creations are skipped + def test_create_role_associations_already_exists_for_start_job_run( + self, + cli_runner, + cluster_name, + namespace, + role_name, + describe_cluster_response, + create_pod_identity_association_already_exists_error_response, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response( + HTTPResponse( + body=create_pod_identity_association_already_exists_error_response, + status_code=409, + ) + ) + cli_runner.add_response( + HTTPResponse( + body=create_pod_identity_association_already_exists_error_response, + status_code=409, + ) + ) + cli_runner.add_response( + HTTPResponse( + body=create_pod_identity_association_already_exists_error_response, + status_code=409, + ) + ) + result = cli_runner.run( + [ + "emr-containers", + "create-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + ] + ) + components = ["client", "driver", "executor"] + expected_out = "" + for component in components: + expected_out += ( + f"Skipping pod identity association creation because pod identity association already exists for service " + + f"account emr-containers-sa-spark-{component}-123456789012-16o0gwny3pand role myrole in namespace test: " + + f"An error occurred (ResourceInUseException) when calling the " + + f"CreatePodIdentityAssociation operation: Association already exists: a-1\n" + ) + assert result.stderr == expected_out + + # Use case: Expect to return error exception and rollback on created associations in the same call + # Expected results: Associations are rolled back + def test_create_role_associations_error_rollback_for_start_job_run( + self, + cli_runner, + cluster_name, + namespace, + role_name, + describe_cluster_response, + start_job_run_service_accounts, + role_arn, + create_pod_identity_association_response, + create_pod_identity_association_other_error_response, + delete_pod_identity_association_response, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse(body=create_pod_identity_association_response) + ) + cli_runner.add_response( + HTTPResponse( + body=create_pod_identity_association_other_error_response, + status_code=400, + ) + ) + for i in range(2): + cli_runner.add_response( + HTTPResponse(body=delete_pod_identity_association_response[i]) + ) + result = cli_runner.run( + [ + "emr-containers", + "create-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + ] + ) + + assert result.aws_requests[0].service_name == "eks" + assert result.aws_requests[0].operation_name == "DescribeCluster" + for i in range(3): + self.assert_call_matches( + result.aws_requests[i + 1], + start_job_run_service_accounts[i], + cluster_name, + namespace, + role_arn, + ) + for i in range(2): + self.assert_roll_back( + result.aws_requests[i + 4], cluster_name, "a-12345678" + ) diff --git a/tests/unit/customizations/emrcontainers/test_delete_role_associations.py b/tests/unit/customizations/emrcontainers/test_delete_role_associations.py new file mode 100644 index 000000000000..fa1e794dac07 --- /dev/null +++ b/tests/unit/customizations/emrcontainers/test_delete_role_associations.py @@ -0,0 +1,578 @@ +# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from tests import HTTPResponse + + +class TestDeleteRoleAssociationsCommand: + def assert_list_call_matches( + self, aws_request, service_account, cluster_name, namespace + ): + assert aws_request.service_name == "eks" + assert aws_request.operation_name == "ListPodIdentityAssociations" + assert aws_request.params["clusterName"] == cluster_name + assert aws_request.params["namespace"] == namespace + assert aws_request.params["serviceAccount"] == service_account + + def assert_delete_call_matches( + self, aws_request, cluster_name, association_id + ): + assert aws_request.service_name == "eks" + assert aws_request.operation_name == "DeletePodIdentityAssociation" + assert aws_request.params["clusterName"] == cluster_name + assert aws_request.params["associationId"] == association_id + + # Use case: Expect to return delete pod identity association results for start job run + # Expected results: Operation is performed by client + # to delete pod identity associations with start job run service accounts + def test_delete_role_associations_for_start_job_run( + self, + cli_runner, + cluster_name, + namespace, + role_name, + describe_cluster_response, + list_pod_identity_association_response, + delete_pod_identity_association_response, + start_job_run_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + for i in range(3): + cli_runner.add_response( + HTTPResponse(body=list_pod_identity_association_response[i]) + ) + cli_runner.add_response( + HTTPResponse(body=delete_pod_identity_association_response[i]) + ) + + result = cli_runner.run( + [ + "emr-containers", + "delete-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + ] + ) + + request_idx = 0 + assert result.aws_requests[request_idx].service_name == "eks" + assert ( + result.aws_requests[request_idx].operation_name == "DescribeCluster" + ) + for i in range(len(start_job_run_service_accounts)): + request_idx += 1 + self.assert_list_call_matches( + result.aws_requests[request_idx], + start_job_run_service_accounts[i], + cluster_name, + namespace, + ) + request_idx += 1 + self.assert_delete_call_matches( + result.aws_requests[request_idx], cluster_name, str(i + 1) + ) + + # Use case: Expect to return delete pod identity association results for interactive endpoint + # Expected results: Operation is performed by client + # to delete pod identity associations with interactive endpoint service accounts + def test_delete_role_associations_for_interactive_endpoint( + self, + cli_runner, + cluster_name, + namespace, + role_name, + describe_cluster_response, + list_pod_identity_association_response, + delete_pod_identity_association_response, + interactive_endpoint_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + for i in range(3): + cli_runner.add_response( + HTTPResponse(body=list_pod_identity_association_response[i]) + ) + cli_runner.add_response( + HTTPResponse(body=delete_pod_identity_association_response[i]) + ) + + result = cli_runner.run( + [ + "emr-containers", + "delete-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "interactive_endpoint", + ] + ) + + request_idx = 0 + assert result.aws_requests[request_idx].service_name == "eks" + assert ( + result.aws_requests[request_idx].operation_name == "DescribeCluster" + ) + for i in range(len(interactive_endpoint_service_accounts)): + request_idx += 1 + self.assert_list_call_matches( + result.aws_requests[request_idx], + interactive_endpoint_service_accounts[i], + cluster_name, + namespace, + ) + request_idx += 1 + self.assert_delete_call_matches( + result.aws_requests[request_idx], cluster_name, str(i + 1) + ) + + # Use case: Expect to return delete pod identity association results for spark operator + # Expected results: Operation is performed by client + # to delete pod identity associations with spark operator service accounts + def test_delete_role_associations_for_spark_operator( + self, + cli_runner, + cluster_name, + namespace, + role_name, + describe_cluster_response, + list_pod_identity_association_response, + delete_pod_identity_association_response, + spark_operator_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + for i in range(3): + cli_runner.add_response( + HTTPResponse(body=list_pod_identity_association_response[i]) + ) + cli_runner.add_response( + HTTPResponse(body=delete_pod_identity_association_response[i]) + ) + + result = cli_runner.run( + [ + "emr-containers", + "delete-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "spark_operator", + ] + ) + + request_idx = 0 + assert result.aws_requests[request_idx].service_name == "eks" + assert ( + result.aws_requests[request_idx].operation_name == "DescribeCluster" + ) + for i in range(len(spark_operator_service_accounts)): + request_idx += 1 + self.assert_list_call_matches( + result.aws_requests[request_idx], + spark_operator_service_accounts[i], + cluster_name, + namespace, + ) + request_idx += 1 + self.assert_delete_call_matches( + result.aws_requests[request_idx], cluster_name, str(i + 1) + ) + + # Use case: Expect to return delete pod identity association results for spark operator with operator namespace + # Expected results: Operation is performed by client + # to delete pod identity associations with spark operator service accounts in correct namespaces + def test_delete_role_associations_for_spark_operator_namespace( + self, + cli_runner, + cluster_name, + namespace, + role_name, + describe_cluster_response, + list_pod_identity_association_response, + delete_pod_identity_association_response, + spark_operator_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + for i in range(3): + cli_runner.add_response( + HTTPResponse(body=list_pod_identity_association_response[i]) + ) + cli_runner.add_response( + HTTPResponse(body=delete_pod_identity_association_response[i]) + ) + + result = cli_runner.run( + [ + "emr-containers", + "delete-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "spark_operator", + "--operator-namespace", + "spark-operator", + ] + ) + + request_idx = 0 + assert result.aws_requests[request_idx].service_name == "eks" + assert ( + result.aws_requests[request_idx].operation_name == "DescribeCluster" + ) + for i in range(len(spark_operator_service_accounts)): + request_idx += 1 + ns = "spark-operator" if i == 0 else namespace + self.assert_list_call_matches( + result.aws_requests[request_idx], + spark_operator_service_accounts[i], + cluster_name, + ns, + ) + request_idx += 1 + self.assert_delete_call_matches( + result.aws_requests[request_idx], cluster_name, str(i + 1) + ) + + # Use case: Expect to return delete pod identity association results for flink operator + # Expected results: Operation is performed by client + # to delete pod identity associations with flink operator service accounts + def test_delete_role_associations_for_flink_operator( + self, + cli_runner, + cluster_name, + namespace, + role_name, + describe_cluster_response, + list_pod_identity_association_response, + delete_pod_identity_association_response, + flink_operator_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + for i in range(3): + cli_runner.add_response( + HTTPResponse(body=list_pod_identity_association_response[i]) + ) + cli_runner.add_response( + HTTPResponse(body=delete_pod_identity_association_response[i]) + ) + + result = cli_runner.run( + [ + "emr-containers", + "delete-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "flink_operator", + ] + ) + + request_idx = 0 + assert result.aws_requests[request_idx].service_name == "eks" + assert ( + result.aws_requests[request_idx].operation_name == "DescribeCluster" + ) + for i in range(len(flink_operator_service_accounts)): + request_idx += 1 + self.assert_list_call_matches( + result.aws_requests[request_idx], + flink_operator_service_accounts[i], + cluster_name, + namespace, + ) + request_idx += 1 + self.assert_delete_call_matches( + result.aws_requests[request_idx], cluster_name, str(i + 1) + ) + + # Use case: Expect to return delete pod identity association results for flink operator with operator namespaces + # Expected results: Operation is performed by client + # to delete pod identity associations with flink operator service accounts in correct namespace + def test_delete_role_associations_for_flink_operator_namespace( + self, + cli_runner, + cluster_name, + namespace, + role_name, + describe_cluster_response, + list_pod_identity_association_response, + delete_pod_identity_association_response, + flink_operator_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + for i in range(3): + cli_runner.add_response( + HTTPResponse(body=list_pod_identity_association_response[i]) + ) + cli_runner.add_response( + HTTPResponse(body=delete_pod_identity_association_response[i]) + ) + + result = cli_runner.run( + [ + "emr-containers", + "delete-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "flink_operator", + "--operator-namespace", + "flink-operator", + ] + ) + + request_idx = 0 + assert result.aws_requests[request_idx].service_name == "eks" + assert ( + result.aws_requests[request_idx].operation_name == "DescribeCluster" + ) + for i in range(len(flink_operator_service_accounts)): + request_idx += 1 + ns = "flink-operator" if i == 0 else namespace + self.assert_list_call_matches( + result.aws_requests[request_idx], + flink_operator_service_accounts[i], + cluster_name, + ns, + ) + request_idx += 1 + self.assert_delete_call_matches( + result.aws_requests[request_idx], cluster_name, str(i + 1) + ) + + # Use case: Expect to return delete pod identity association results for livy + # Expected results: Operation is performed by client + # to delete pod identity associations with livy service accounts + def test_delete_role_associations_for_livy( + self, + cli_runner, + cluster_name, + namespace, + role_name, + describe_cluster_response, + list_pod_identity_association_response, + delete_pod_identity_association_response, + livy_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + for i in range(2): + cli_runner.add_response( + HTTPResponse(body=list_pod_identity_association_response[i]) + ) + cli_runner.add_response( + HTTPResponse(body=delete_pod_identity_association_response[i]) + ) + + result = cli_runner.run( + [ + "emr-containers", + "delete-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "livy", + ] + ) + + request_idx = 0 + assert result.aws_requests[request_idx].service_name == "eks" + assert ( + result.aws_requests[request_idx].operation_name == "DescribeCluster" + ) + for i in range(len(livy_service_accounts)): + request_idx += 1 + self.assert_list_call_matches( + result.aws_requests[request_idx], + livy_service_accounts[i], + cluster_name, + namespace, + ) + request_idx += 1 + self.assert_delete_call_matches( + result.aws_requests[request_idx], cluster_name, str(i + 1) + ) + + # Use case: Expect to return delete pod identity association results for livy with controller namespace + # Expected results: Operation is performed by client + # to delete pod identity associations with livy service accounts in correct namespace + def test_delete_role_associations_for_livy_namespace( + self, + cli_runner, + cluster_name, + namespace, + role_name, + describe_cluster_response, + list_pod_identity_association_response, + delete_pod_identity_association_response, + livy_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + for i in range(2): + cli_runner.add_response( + HTTPResponse(body=list_pod_identity_association_response[i]) + ) + cli_runner.add_response( + HTTPResponse(body=delete_pod_identity_association_response[i]) + ) + + result = cli_runner.run( + [ + "emr-containers", + "delete-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--type", + "livy", + "--operator-namespace", + "livy", + ] + ) + + request_idx = 0 + assert result.aws_requests[request_idx].service_name == "eks" + assert ( + result.aws_requests[request_idx].operation_name == "DescribeCluster" + ) + for i in range(len(livy_service_accounts)): + request_idx += 1 + ns = "livy" if i == 0 else namespace + self.assert_list_call_matches( + result.aws_requests[request_idx], + livy_service_accounts[i], + cluster_name, + ns, + ) + request_idx += 1 + self.assert_delete_call_matches( + result.aws_requests[request_idx], cluster_name, str(i + 1) + ) + + # Use case: Expect to return delete pod identity association results for customer input service accounts + # Expected results: Operation is performed by client + # to delete pod identity associations with customer input service accounts + def test_delete_role_associations_for_customer_service_account( + self, + cli_runner, + cluster_name, + namespace, + role_name, + describe_cluster_response, + list_pod_identity_association_response, + delete_pod_identity_association_response, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response( + HTTPResponse(body=list_pod_identity_association_response[0]) + ) + cli_runner.add_response( + HTTPResponse(body=delete_pod_identity_association_response[0]) + ) + + result = cli_runner.run( + [ + "emr-containers", + "delete-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + "--service-account-name", + "test_sa", + ] + ) + + assert result.aws_requests[0].service_name == "eks" + assert result.aws_requests[0].operation_name == "DescribeCluster" + self.assert_list_call_matches( + result.aws_requests[1], "test_sa", cluster_name, namespace + ) + self.assert_delete_call_matches( + result.aws_requests[2], cluster_name, "1" + ) + + # Use case: Expect to do nothing on deletion when resource not found but print warning message + # Expected results: Association deletions are skipped + def test_create_role_associations_already_exists_for_start_job_run( + self, + cli_runner, + cluster_name, + namespace, + role_name, + describe_cluster_response, + start_job_run_service_accounts, + ): + + cli_runner.add_response(HTTPResponse(body=describe_cluster_response)) + cli_runner.add_response(HTTPResponse(body=[])) + cli_runner.add_response(HTTPResponse(body=[])) + cli_runner.add_response(HTTPResponse(body=[])) + result = cli_runner.run( + [ + "emr-containers", + "delete-role-associations", + "--cluster-name", + cluster_name, + "--namespace", + namespace, + "--role-name", + role_name, + ] + ) + + expected_out = "" + for service_account in start_job_run_service_accounts: + expected_out += ( + f"Skipping deletion as no pod identity association found for service account {service_account} " + + f"and role {role_name} in namespace {namespace}\n" + ) + assert result.stderr == expected_out