Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] [customization] Create/delete role associations for EMR on EKS #9254

Open
wants to merge 1 commit into
base: v2
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changes/next-release/api-change-emrcontainers-7930.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"type": "api-change",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it count as an api change when we're adding commands via customization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm yeah it looks like new customizations are introduced as features. I'll update.

"category": "``emr-containers``",
"description": "Add custom ``create-role-associations`` and ``delete-role-associations`` commands to create/delete role associations for EMR service accounts and provided IAM role."
}
9 changes: 9 additions & 0 deletions awscli/customizations/emrcontainers/__init__.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,10 @@

from awscli.customizations.emrcontainers.update_role_trust_policy \
import UpdateRoleTrustPolicyCommand
from awscli.customizations.emrcontainers.create_role_associations \
import CreateRoleAssociationsCommand
from awscli.customizations.emrcontainers.delete_role_associations \
import DeleteRoleAssociationsCommand


def initialize(cli):
@@ -29,3 +33,8 @@ def inject_commands(command_table, session, **kwargs):
"""
command_table['update-role-trust-policy'] = UpdateRoleTrustPolicyCommand(
session)
command_table['create-role-associations'] = CreateRoleAssociationsCommand(
session)
command_table['delete-role-associations'] = DeleteRoleAssociationsCommand(
session)

10 changes: 10 additions & 0 deletions awscli/customizations/emrcontainers/constants.py
Original file line number Diff line number Diff line change
@@ -14,6 +14,8 @@
# Declare all the constants used by Lifecycle in this file

# Lifecycle role names
from enum import Enum

TRUST_POLICY_STATEMENT_FORMAT = '{ \
"Effect": "Allow", \
"Principal": { \
@@ -35,3 +37,11 @@
"were made!"

TRUST_POLICY_UPDATE_SUCCESSFUL = "Successfully updated trust policy of role %s"

SERVICE_ACCOUNT_NAMING = "emr-containers-sa-%(FRAMEWORK)s-%(COMPONENT)s-%(AWS_ACCOUNT_ID)s-%(BASE36_ENCODED_ROLE_NAME)s"

class ServiceAccount(Enum):
SPARK_OPERATOR_SERVICE_ACCOUNT = "emr-containers-sa-spark-operator"
FLINK_OPERATOR_SERVICE_ACCOUNT = "emr-containers-sa-flink-operator"
LIVY_SERVICE_ACCOUNT = "emr-containers-sa-livy"
LIVY_SPARK_SERVICE_ACCOUNT = "emr-container-sa-spark-livy"
298 changes: 298 additions & 0 deletions awscli/customizations/emrcontainers/create_role_associations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2024 -> 2025

#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

import logging, json, sys
import botocore

from awscli.customizations.commands import BasicCommand
from awscli.customizations.emrcontainers.constants import (
SERVICE_ACCOUNT_NAMING,
ServiceAccount,
)
from awscli.customizations.emrcontainers.base36 import Base36
from awscli.customizations.emrcontainers.eks import EKS
from awscli.customizations.utils import uni_print, get_policy_arn_suffix
from awscli.customizations.emrcontainers.utils import get_region

LOG = logging.getLogger(__name__)


class CreateRoleAssociationsCommand(BasicCommand):
NAME = "create-role-associations"

DESCRIPTION = BasicCommand.FROM_FILE(
"emr-containers", "create-role-associations", "_description.rst"
)

ARG_TABLE = [
{
"name": "cluster-name",
"help_text": (
"Specify the name of the Amazon EKS cluster with "
"which the IAM Role would be associated."
),
"required": True,
},
{
"name": "namespace",
"help_text": (
"Specify the job/application namespace from the Amazon EKS cluster "
"with which the IAM Role would be associated."
),
"required": True,
},
{
"name": "role-name",
"help_text": (
"Specify the IAM Role name that you want to associate "
"with Amazon EMR on EKS."
),
"required": True,
},
{
"name": "type",
"help_text": (
"Specify the EMR on EKS submission model and choose service accounts that you want to associate "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EMR -> Amazon EMR

"with Amazon EMR on EKS. By default is start_job_run. Supported Types: start_job_run, "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking nit:
By default -> The default
Types -> types

"interactive_endpoint, spark_operator, flink_operator, livy."
),
"required": False,
"choices": [
"start_job_run",
"interactive_endpoint",
"spark_operator",
"flink_operator",
"livy",
],
},
{
"name": "operator-namespace",
"help_text": (
"Specify the namespace that you want to associate the operator service account "
"with the IAM role. Default to the job/application namespace specified. Note: "
"If jobs are running in a different namespace than the operator installation namespace, "
"this parameter needs to be set as where the operator is running on."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking nit:
where -> the namespace that

),
"required": False,
},
{
"name": "service-account-name",
"help_text": (
"Specify the service account name that you want to associate "
"with the IAM role. Default to Amazon EMR on EKS service accounts."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code implies that if a service account name is unspecified then it falls back to relying on the value of type, so I'm confused to what "Default to Amazon EMR on EKS service accounts." means here. If it's meant to imply it falls back to type, I think we could be more explicit on how these two args are dependent / mutually exclusive.

Similar comment for delete-role-associations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This command is mainly for simplifying customer experience to create role associations with EMR service accounts under the hood. So that customer don't need to figure out EMR service accounts pattern and perform manual actions. This extra service-account-name is optional if they would like to do it manually or associate their roles to EKS with specific service account. But the original idea for this command is to create associations with EMR service accounts based on submission models. (Spark/Spark Operator/..)

Service account name and type are both optional and yes mutually exclusive, once service account name is set, we will directly use the name for the association and will not generate the EMR ones based on the type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. My suggestion in this comment is to clarify this for the user.

I think it's confusing to say there's a default value since if service-account-name is unspecified we don't utilize it. Typically default implies if the argument is unspecified, then a default value for the argument is used. This is not the case here, we instead fall back to using a different argument (type).

My (non-required) suggestion is to replace the "default" wording with specifying that this overrides the type argument if it's supplied.

Feel free to resolve this conversation if you feel this would not be confusing to your customers.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable, we can change it to
"(Optional) Specify the service account name that you want to associate "
"with the IAM role. If this is not set, Amazon EMR on EKS service accounts will be used for association."

),
"required": False,
},
]

def _run_main(self, parsed_args, parsed_globals):
"""Call to run the commands"""

self._cluster_name = parsed_args.cluster_name
self._namespace = parsed_args.namespace
self._role_name = parsed_args.role_name
self._type = parsed_args.type or "start_job_run"
self._operator_namespace = parsed_args.operator_namespace
self._service_account_name = parsed_args.service_account_name
self._region = get_region(self._session, parsed_globals)

result = self._create_role_associations(parsed_globals)
if result:
uni_print(json.dumps(result, indent=4))

return 0

def _create_role_associations(self, parsed_globals):
"""Method to create role associations if not done already"""
eks_client = EKS(
self._session.create_client(
"eks",
region_name=self._region,
verify=parsed_globals.verify_ssl,
)
)
account_id = eks_client.get_account_id(self._cluster_name)
role_arn = f"arn:{get_policy_arn_suffix(self._region)}:iam::{account_id}:role/{self._role_name}"

results = []
# If service account provided, create association with provided service account and role
if self._service_account_name:
service_account_namespace_mapping = [
(self._service_account_name, self._namespace)
]
else:
# By default, create associations with EMR on EKS service accounts
base36 = Base36()

base36_encoded_role_name = base36.encode(self._role_name)
LOG.debug(f"Base36 encoded role name: {base36_encoded_role_name}")
service_account_namespace_mapping = (
self._get_emr_service_account_namespace_mapping(
account_id, base36_encoded_role_name
)
)

for (
service_account_name,
namespace,
) in service_account_namespace_mapping:
LOG.debug(
f"Creating pod identity association for service account {service_account_name} and "
+ f"role {self._role_name} in namespace {namespace}"
)
try:
result = eks_client.create_pod_identity_association(
self._cluster_name,
namespace,
role_arn,
service_account_name,
)
results.append(
result["association"] if "association" in result else result
aemous marked this conversation as resolved.
Show resolved Hide resolved
)
except botocore.exceptions.ClientError as error:

# Raise the error if EKS throws exceptions other than ResourceInUseException
if error.response["Error"]["Code"] != "ResourceInUseException":
for result in results:
uni_print(
f"Rolling back association for service account {service_account_name} "
+ f"and role {self._role_name} in namespace {namespace} as encountered an error",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:

  • unnecessary concatenation +
  • add \n at end of error message

out_file=sys.stderr,
)
eks_client.delete_pod_identity_association(
hssyoo marked this conversation as resolved.
Show resolved Hide resolved
cluster_name=result["clusterName"],
association_id=result["associationId"],
)
raise Exception(
f"Failed to create pod identity association for service account {service_account_name}, "
+ f"role {self._role_name} in namespace {namespace}: {error.response['Error']['Message']}"
) from error
uni_print(
f"Skipping pod identity association creation because pod identity association already exists for "
+ f"service account {service_account_name}and role {self._role_name} in namespace {namespace}: {error.response['Error']['Message']}\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:

  • unnecessary concatenation operator +
  • add space before "and"

out_file=sys.stderr,
)
return results

def _get_emr_service_account_namespace_mapping(
self, account_id, base36_encoded_role_name
):
return getattr(self, f"_get_{self._type}_mapping")(
account_id, base36_encoded_role_name
)

def _get_start_job_run_mapping(self, account_id, base36_encoded_role_name):
emr_spark_components = ["client", "driver", "executor"]
return [
(
SERVICE_ACCOUNT_NAMING
% {
"FRAMEWORK": "spark",
"COMPONENT": component,
"AWS_ACCOUNT_ID": account_id,
"BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name,
},
self._namespace,
)
for component in emr_spark_components
]

def _get_interactive_endpoint_mapping(
self, account_id, base36_encoded_role_name
):
emr_spark_endpoint_components = ["jeg", "jeg-kernel", "session"]
return [
(
SERVICE_ACCOUNT_NAMING
% {
"FRAMEWORK": "spark",
"COMPONENT": component,
"AWS_ACCOUNT_ID": account_id,
"BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name,
},
self._namespace,
)
for component in emr_spark_endpoint_components
]

def _get_spark_operator_mapping(self, account_id, base36_encoded_role_name):
emr_spark_operator_components = ["driver", "executor"]
service_accounts = [
(
ServiceAccount.SPARK_OPERATOR_SERVICE_ACCOUNT.value,
(
self._operator_namespace
if self._operator_namespace
else self._namespace
),
)
]
service_accounts.extend(
[
(
SERVICE_ACCOUNT_NAMING
% {
"FRAMEWORK": "spark",
"COMPONENT": component,
"AWS_ACCOUNT_ID": account_id,
"BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name,
},
self._namespace,
)
for component in emr_spark_operator_components
]
)
return service_accounts

def _get_flink_operator_mapping(self, account_id, base36_encoded_role_name):
emr_flink_operator_components = ["jobmanager", "taskmanager"]
service_accounts = [
(
ServiceAccount.FLINK_OPERATOR_SERVICE_ACCOUNT.value,
(
self._operator_namespace
if self._operator_namespace
else self._namespace
),
)
]
service_accounts.extend(
[
(
SERVICE_ACCOUNT_NAMING
% {
"FRAMEWORK": "flink",
"COMPONENT": component,
"AWS_ACCOUNT_ID": account_id,
"BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name,
},
self._namespace,
)
for component in emr_flink_operator_components
]
)
return service_accounts

def _get_livy_mapping(self, account_id, base36_encoded_role_name):
return [
(
ServiceAccount.LIVY_SERVICE_ACCOUNT.value,
(
self._operator_namespace
if self._operator_namespace
else self._namespace
),
),
(ServiceAccount.LIVY_SPARK_SERVICE_ACCOUNT.value, self._namespace),
]
Loading