diff --git a/eeauditor/auditors/AWS_KMS_Auditor.py b/eeauditor/auditors/AWS_KMS_Auditor.py deleted file mode 100644 index a705838a..00000000 --- a/eeauditor/auditors/AWS_KMS_Auditor.py +++ /dev/null @@ -1,245 +0,0 @@ -# This file is part of ElectricEye. - -# ElectricEye is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# ElectricEye is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License along with ElectricEye. -# If not, see https://github.com/jonrau1/ElectricEye/blob/master/LICENSE. - -import boto3 -import datetime -import json -import os -from auditors.Auditor import Auditor - -# import boto3 clients -sts = boto3.client("sts") -kms = boto3.client("kms") - - -class KMSKeyRotationCheck(Auditor): - def execute(self): - awsAccountId = sts.get_caller_identity()["Account"] - awsRegion = os.environ["AWS_REGION"] - keys = kms.list_keys() - my_keys = keys["Keys"] - iso8601Time = datetime.datetime.now(datetime.timezone.utc).isoformat() - for key in my_keys: - keyid = key["KeyId"] - keyarn = key["KeyArn"] - key_rotation = kms.get_key_rotation_status(KeyId=keyid) - if key_rotation["KeyRotationEnabled"] == True: - finding = { - "SchemaVersion": "2018-10-08", - "Id": keyarn + "/kms-key-rotation-check", - "ProductArn": "arn:aws:securityhub:" - + awsRegion - + ":" - + awsAccountId - + ":product/" - + awsAccountId - + "/default", - "GeneratorId": keyarn, - "AwsAccountId": awsAccountId, - "Types": [ - "Software and Configuration Checks/AWS Security Best Practices", - ], - "FirstObservedAt": iso8601Time, - "CreatedAt": iso8601Time, - "UpdatedAt": iso8601Time, - "Severity": {"Label": "INFORMATIONAL"}, - "Confidence": 99, - "Title": "[KMS.1] KMS keys should have key rotation enabled", - "Description": "KMS Key " - + keyid - + " does have key rotation enabled.", - "Remediation": { - "Recommendation": { - "Text": "For more information on KMS key rotation refer to the AWS KMS Developer Guide on Rotating Keys", - "Url": "https://docs.aws.amazon.com/kms/latest/developerguide/rotate-keys.html", - } - }, - "ProductFields": {"Product Name": "ElectricEye"}, - "Resources": [ - { - "Type": "AwsKmsKey", - "Id": keyarn, - "Partition": "aws", - "Region": awsRegion, - "Details": {"AwsKmsKey": {"KeyId": keyid}}, - } - ], - "Compliance": {"Status": "PASSED"}, - "Workflow": {"Status": "RESOLVED"}, - "RecordState": "ARCHIVED", - } - yield finding - else: - finding = { - "SchemaVersion": "2018-10-08", - "Id": keyarn + "/kms-key-rotation-check", - "ProductArn": "arn:aws:securityhub:" - + awsRegion - + ":" - + awsAccountId - + ":product/" - + awsAccountId - + "/default", - "GeneratorId": keyarn, - "AwsAccountId": awsAccountId, - "Types": [ - "Software and Configuration Checks/AWS Security Best Practices" - ], - "FirstObservedAt": iso8601Time, - "CreatedAt": iso8601Time, - "UpdatedAt": iso8601Time, - "Severity": {"Label": "MEDIUM"}, - "Confidence": 99, - "Title": "[KMS.1] KMS keys should have key rotation enabled", - "Description": "KMS key " - + keyid - + " does not have key rotation enabled.", - "Remediation": { - "Recommendation": { - "Text": "For more information on KMS key rotation refer to the AWS KMS Developer Guide on Rotating Keys", - "Url": "https://docs.aws.amazon.com/kms/latest/developerguide/rotate-keys.html", - } - }, - "ProductFields": {"Product Name": "ElectricEye"}, - "Resources": [ - { - "Type": "AwsKmsKey", - "Id": keyarn, - "Partition": "aws", - "Region": awsRegion, - "Details": {"AwsKmsKey": {"KeyId": keyid}}, - } - ], - "Compliance": {"Status": "FAILED"}, - "Workflow": {"Status": "NEW"}, - "RecordState": "ACTIVE", - } - yield finding - - -class KMSKeyExposedCheck(Auditor): - def execute(self): - awsAccountId = sts.get_caller_identity()["Account"] - awsRegion = os.environ["AWS_REGION"] - response = kms.list_aliases() - aliasList = response["Aliases"] - iso8601Time = datetime.datetime.now(datetime.timezone.utc).isoformat() - for alias in aliasList: - if "TargetKeyId" in alias: - aliasArn = alias["AliasArn"] - keyid = alias["TargetKeyId"] - policyString = kms.get_key_policy(KeyId=keyid, PolicyName="default") - fail = False - policy_json = policyString["Policy"] - policy = json.loads(policy_json) - for sid in policy["Statement"]: - access = sid["Principal"].get("AWS", None) - if access != "*" or (access == "*" and "Condition" in sid): - continue - else: - fail = True - break - if not fail: - finding = { - "SchemaVersion": "2018-10-08", - "Id": aliasArn + "/kms-key-exposed-check", - "ProductArn": "arn:aws:securityhub:" - + awsRegion - + ":" - + awsAccountId - + ":product/" - + awsAccountId - + "/default", - "GeneratorId": aliasArn, - "AwsAccountId": awsAccountId, - "Types": [ - "Software and Configuration Checks/AWS Security Best Practices", - "Effects/Data Exposure", - ], - "FirstObservedAt": iso8601Time, - "CreatedAt": iso8601Time, - "UpdatedAt": iso8601Time, - "Severity": {"Label": "INFORMATIONAL"}, - "Confidence": 75, # The Condition may not effectively limit access - "Title": "[KMS.2] KMS keys should not have public access", - "Description": "KMS key " - + keyid - + " does not have public access or limited by a Condition. Refer to the remediation instructions to review kms access policy", - "Remediation": { - "Recommendation": { - "Text": "For more information on AWS KMS key policies refer to Using key policies in AWS KMS section of the AWS KMS Developer Guide.", - "Url": "https://docs.aws.amazon.com/kms/latest/developerguide/key-policies.html", - } - }, - "ProductFields": {"Product Name": "ElectricEye"}, - "Resources": [ - { - "Type": "AwsKmsAlias", - "Id": aliasArn, - "Partition": "aws", - "Region": awsRegion, - } - ], - "Compliance": {"Status": "PASSED"}, - "Workflow": {"Status": "RESOLVED"}, - "RecordState": "ARCHIVED", - } - yield finding - else: - finding = { - "SchemaVersion": "2018-10-08", - "Id": aliasArn + "/kms-key-exposed-check", - "ProductArn": "arn:aws:securityhub:" - + awsRegion - + ":" - + awsAccountId - + ":product/" - + awsAccountId - + "/default", - "GeneratorId": aliasArn, - "AwsAccountId": awsAccountId, - "Types": [ - "Software and Configuration Checks/AWS Security Best Practices", - "Effects/Data Exposure", - ], - "FirstObservedAt": iso8601Time, - "CreatedAt": iso8601Time, - "UpdatedAt": iso8601Time, - "Severity": {"Label": "HIGH"}, - "Confidence": 99, - "Title": "[KMS.2] KMS keys should not have public access", - "Description": "KMS key " - + keyid - + " has public access. Refer to the remediation instructions to review kms access policy", - "Remediation": { - "Recommendation": { - "Text": "For more information on AWS KMS key policies refer to Using key policies in AWS KMS section of the AWS KMS Developer Guide.", - "Url": "https://docs.aws.amazon.com/kms/latest/developerguide/key-policies.html", - } - }, - "ProductFields": {"Product Name": "ElectricEye"}, - "Resources": [ - { - "Type": "AwsKmsAlias", - "Id": aliasArn, - "Partition": "aws", - "Region": awsRegion, - } - ], - "Compliance": {"Status": "FAILED"}, - "Workflow": {"Status": "NEW"}, - "RecordState": "ACTIVE", - } - yield finding diff --git a/eeauditor/auditors/aws/AWS_KMS_Auditor.py b/eeauditor/auditors/aws/AWS_KMS_Auditor.py new file mode 100644 index 00000000..892ad1bd --- /dev/null +++ b/eeauditor/auditors/aws/AWS_KMS_Auditor.py @@ -0,0 +1,248 @@ +# This file is part of ElectricEye. + +# ElectricEye is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# ElectricEye is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License along with ElectricEye. +# If not, see https://github.com/jonrau1/ElectricEye/blob/master/LICENSE. + +import boto3 +import datetime +import json +import os +from check_register import CheckRegister + +registry = CheckRegister() +kms = boto3.client("kms") + + +def list_keys(cache): + response = cache.get("list_keys") + if response: + return response + cache["list_keys"] = kms.list_keys() + return cache["list_keys"] + + +def list_aliases(cache): + response = cache.get("list_aliases") + if response: + return response + cache["list_aliases"] = kms.list_aliases() + return cache["list_aliases"] + + +@registry.register_check("kms") +def kms_key_rotation_check(cache: dict, awsAccountId: str, awsRegion: str) -> dict: + keys = list_keys(cache=cache) + my_keys = keys["Keys"] + iso8601Time = datetime.datetime.now(datetime.timezone.utc).isoformat() + for key in my_keys: + keyid = key["KeyId"] + keyarn = key["KeyArn"] + key_rotation = kms.get_key_rotation_status(KeyId=keyid) + if key_rotation["KeyRotationEnabled"] == True: + finding = { + "SchemaVersion": "2018-10-08", + "Id": keyarn + "/kms-key-rotation-check", + "ProductArn": "arn:aws:securityhub:" + + awsRegion + + ":" + + awsAccountId + + ":product/" + + awsAccountId + + "/default", + "GeneratorId": keyarn, + "AwsAccountId": awsAccountId, + "Types": ["Software and Configuration Checks/AWS Security Best Practices",], + "FirstObservedAt": iso8601Time, + "CreatedAt": iso8601Time, + "UpdatedAt": iso8601Time, + "Severity": {"Label": "INFORMATIONAL"}, + "Confidence": 99, + "Title": "[KMS.1] KMS keys should have key rotation enabled", + "Description": "KMS Key " + keyid + " does have key rotation enabled.", + "Remediation": { + "Recommendation": { + "Text": "For more information on KMS key rotation refer to the AWS KMS Developer Guide on Rotating Keys", + "Url": "https://docs.aws.amazon.com/kms/latest/developerguide/rotate-keys.html", + } + }, + "ProductFields": {"Product Name": "ElectricEye"}, + "Resources": [ + { + "Type": "AwsKmsKey", + "Id": keyarn, + "Partition": "aws", + "Region": awsRegion, + "Details": {"AwsKmsKey": {"KeyId": keyid}}, + } + ], + "Compliance": {"Status": "PASSED"}, + "Workflow": {"Status": "RESOLVED"}, + "RecordState": "ARCHIVED", + } + yield finding + else: + finding = { + "SchemaVersion": "2018-10-08", + "Id": keyarn + "/kms-key-rotation-check", + "ProductArn": "arn:aws:securityhub:" + + awsRegion + + ":" + + awsAccountId + + ":product/" + + awsAccountId + + "/default", + "GeneratorId": keyarn, + "AwsAccountId": awsAccountId, + "Types": ["Software and Configuration Checks/AWS Security Best Practices"], + "FirstObservedAt": iso8601Time, + "CreatedAt": iso8601Time, + "UpdatedAt": iso8601Time, + "Severity": {"Label": "MEDIUM"}, + "Confidence": 99, + "Title": "[KMS.1] KMS keys should have key rotation enabled", + "Description": "KMS key " + keyid + " does not have key rotation enabled.", + "Remediation": { + "Recommendation": { + "Text": "For more information on KMS key rotation refer to the AWS KMS Developer Guide on Rotating Keys", + "Url": "https://docs.aws.amazon.com/kms/latest/developerguide/rotate-keys.html", + } + }, + "ProductFields": {"Product Name": "ElectricEye"}, + "Resources": [ + { + "Type": "AwsKmsKey", + "Id": keyarn, + "Partition": "aws", + "Region": awsRegion, + "Details": {"AwsKmsKey": {"KeyId": keyid}}, + } + ], + "Compliance": {"Status": "FAILED"}, + "Workflow": {"Status": "NEW"}, + "RecordState": "ACTIVE", + } + yield finding + + +@registry.register_check("kms") +def kms_key_exposed_check(cache: dict, awsAccountId: str, awsRegion: str) -> dict: + response = list_aliases(cache=cache) + aliasList = response["Aliases"] + iso8601Time = datetime.datetime.now(datetime.timezone.utc).isoformat() + for alias in aliasList: + if "TargetKeyId" in alias: + aliasArn = alias["AliasArn"] + keyid = alias["TargetKeyId"] + policyString = kms.get_key_policy(KeyId=keyid, PolicyName="default") + fail = False + policy_json = policyString["Policy"] + policy = json.loads(policy_json) + for sid in policy["Statement"]: + access = sid["Principal"].get("AWS", None) + if access != "*" or (access == "*" and "Condition" in sid): + continue + else: + fail = True + break + if not fail: + finding = { + "SchemaVersion": "2018-10-08", + "Id": aliasArn + "/kms-key-exposed-check", + "ProductArn": "arn:aws:securityhub:" + + awsRegion + + ":" + + awsAccountId + + ":product/" + + awsAccountId + + "/default", + "GeneratorId": aliasArn, + "AwsAccountId": awsAccountId, + "Types": [ + "Software and Configuration Checks/AWS Security Best Practices", + "Effects/Data Exposure", + ], + "FirstObservedAt": iso8601Time, + "CreatedAt": iso8601Time, + "UpdatedAt": iso8601Time, + "Severity": {"Label": "INFORMATIONAL"}, + "Confidence": 75, # The Condition may not effectively limit access + "Title": "[KMS.2] KMS keys should not have public access", + "Description": "KMS key " + + keyid + + " does not have public access or limited by a Condition. Refer to the remediation instructions to review kms access policy", + "Remediation": { + "Recommendation": { + "Text": "For more information on AWS KMS key policies refer to Using key policies in AWS KMS section of the AWS KMS Developer Guide.", + "Url": "https://docs.aws.amazon.com/kms/latest/developerguide/key-policies.html", + } + }, + "ProductFields": {"Product Name": "ElectricEye"}, + "Resources": [ + { + "Type": "AwsKmsAlias", + "Id": aliasArn, + "Partition": "aws", + "Region": awsRegion, + } + ], + "Compliance": {"Status": "PASSED"}, + "Workflow": {"Status": "RESOLVED"}, + "RecordState": "ARCHIVED", + } + yield finding + else: + finding = { + "SchemaVersion": "2018-10-08", + "Id": aliasArn + "/kms-key-exposed-check", + "ProductArn": "arn:aws:securityhub:" + + awsRegion + + ":" + + awsAccountId + + ":product/" + + awsAccountId + + "/default", + "GeneratorId": aliasArn, + "AwsAccountId": awsAccountId, + "Types": [ + "Software and Configuration Checks/AWS Security Best Practices", + "Effects/Data Exposure", + ], + "FirstObservedAt": iso8601Time, + "CreatedAt": iso8601Time, + "UpdatedAt": iso8601Time, + "Severity": {"Label": "HIGH"}, + "Confidence": 99, + "Title": "[KMS.2] KMS keys should not have public access", + "Description": "KMS key " + + keyid + + " has public access. Refer to the remediation instructions to review kms access policy", + "Remediation": { + "Recommendation": { + "Text": "For more information on AWS KMS key policies refer to Using key policies in AWS KMS section of the AWS KMS Developer Guide.", + "Url": "https://docs.aws.amazon.com/kms/latest/developerguide/key-policies.html", + } + }, + "ProductFields": {"Product Name": "ElectricEye"}, + "Resources": [ + { + "Type": "AwsKmsAlias", + "Id": aliasArn, + "Partition": "aws", + "Region": awsRegion, + } + ], + "Compliance": {"Status": "FAILED"}, + "Workflow": {"Status": "NEW"}, + "RecordState": "ACTIVE", + } + yield finding diff --git a/tests/test_AWS_KMS_Auditor.py b/eeauditor/tests/test_AWS_KMS_Auditor.py similarity index 69% rename from tests/test_AWS_KMS_Auditor.py rename to eeauditor/tests/test_AWS_KMS_Auditor.py index 8963519e..2f6c1dd3 100644 --- a/tests/test_AWS_KMS_Auditor.py +++ b/eeauditor/tests/test_AWS_KMS_Auditor.py @@ -2,24 +2,16 @@ import json import os import pytest + from botocore.stub import Stubber, ANY -from auditors.AWS_KMS_Auditor import ( - KMSKeyRotationCheck, - KMSKeyExposedCheck, - sts, + +from . import context +from auditors.aws.AWS_KMS_Auditor import ( + kms_key_exposed_check, + kms_key_rotation_check, kms, ) -# not available in local testing without ECS -os.environ["AWS_REGION"] = "us-east-1" -# for local testing, don't assume default profile exists -os.environ["AWS_DEFAULT_REGION"] = "us-east-1" - -sts_response = { - "Account": "012345678901", - "Arn": "arn:aws:iam::012345678901:user/user", -} - list_aliases_response = { "Aliases": [ { @@ -49,25 +41,15 @@ "Keys": [ { "KeyId": "273e5d8e-4746-4ba9-be3a-4dce36783814", - "KeyArn": "arn:aws:kms:us-east-1:012345678901:key/273e5d8e-4746-4ba9-be3a-4dce36783814" + "KeyArn": "arn:aws:kms:us-east-1:012345678901:key/273e5d8e-4746-4ba9-be3a-4dce36783814", } ] } -get_key_rotation_status_response = { - "KeyRotationEnabled": True -} +get_key_rotation_status_response = {"KeyRotationEnabled": True} -get_key_rotation_status_response1 = { - "KeyRotationEnabled": False -} +get_key_rotation_status_response1 = {"KeyRotationEnabled": False} -@pytest.fixture(scope="function") -def sts_stubber(): - sts_stubber = Stubber(sts) - sts_stubber.activate() - yield sts_stubber - sts_stubber.deactivate() @pytest.fixture(scope="function") def kms_stubber(): @@ -76,12 +58,11 @@ def kms_stubber(): yield kms_stubber kms_stubber.deactivate() -def test_key_rotation_enabled(sts_stubber, kms_stubber): - sts_stubber.add_response("get_caller_identity", sts_response) + +def test_key_rotation_enabled(kms_stubber): kms_stubber.add_response("list_keys", list_keys_response) kms_stubber.add_response("get_key_rotation_status", get_key_rotation_status_response) - check = KMSKeyRotationCheck() - results = check.execute() + results = kms_key_rotation_check(cache={}, awsAccountId="012345678901", awsRegion="us-east-1") for result in results: if "273e5d8e-4746-4ba9-be3a-4dce36783814" in result["Id"]: print(result["Id"]) @@ -90,12 +71,11 @@ def test_key_rotation_enabled(sts_stubber, kms_stubber): assert False kms_stubber.assert_no_pending_responses() -def test_key_rotation_not_enabled(sts_stubber, kms_stubber): - sts_stubber.add_response("get_caller_identity", sts_response) + +def test_key_rotation_not_enabled(kms_stubber): kms_stubber.add_response("list_keys", list_keys_response) kms_stubber.add_response("get_key_rotation_status", get_key_rotation_status_response1) - check = KMSKeyRotationCheck() - results = check.execute() + results = kms_key_rotation_check(cache={}, awsAccountId="012345678901", awsRegion="us-east-1") for result in results: if "273e5d8e-4746-4ba9-be3a-4dce36783814" in result["Id"]: print(result["Id"]) @@ -104,12 +84,11 @@ def test_key_rotation_not_enabled(sts_stubber, kms_stubber): assert False kms_stubber.assert_no_pending_responses() -def test_has_public_key(kms_stubber, sts_stubber): - sts_stubber.add_response("get_caller_identity", sts_response) + +def test_has_public_key(kms_stubber): kms_stubber.add_response("list_aliases", list_aliases_response) kms_stubber.add_response("get_key_policy", get_key_policy_public_response) - check = KMSKeyExposedCheck() - results = check.execute() + results = kms_key_exposed_check(cache={}, awsAccountId="012345678901", awsRegion="us-east-1") for result in results: if "s3" in result["Id"]: assert result["RecordState"] == "ACTIVE" @@ -117,12 +96,11 @@ def test_has_public_key(kms_stubber, sts_stubber): assert False kms_stubber.assert_no_pending_responses() -def test_no_public_key(kms_stubber, sts_stubber): - sts_stubber.add_response("get_caller_identity", sts_response) + +def test_no_public_key(kms_stubber): kms_stubber.add_response("list_aliases", list_aliases_response) kms_stubber.add_response("get_key_policy", get_key_policy_not_public_response) - check = KMSKeyExposedCheck() - results = check.execute() + results = kms_key_exposed_check(cache={}, awsAccountId="012345678901", awsRegion="us-east-1") for result in results: if "s3" in result["Id"]: print(result["Id"]) @@ -131,12 +109,11 @@ def test_no_public_key(kms_stubber, sts_stubber): assert False kms_stubber.assert_no_pending_responses() -def test_has_condition(kms_stubber, sts_stubber): - sts_stubber.add_response("get_caller_identity", sts_response) + +def test_has_condition(kms_stubber): kms_stubber.add_response("list_aliases", list_aliases_response) kms_stubber.add_response("get_key_policy", get_key_policy_has_condition_response) - check = KMSKeyExposedCheck() - results = check.execute() + results = kms_key_exposed_check(cache={}, awsAccountId="012345678901", awsRegion="us-east-1") for result in results: if "s3" in result["Id"]: assert result["RecordState"] == "ARCHIVED" @@ -144,12 +121,11 @@ def test_has_condition(kms_stubber, sts_stubber): assert False kms_stubber.assert_no_pending_responses() -def test_no_AWS(kms_stubber, sts_stubber): - sts_stubber.add_response("get_caller_identity", sts_response) + +def test_no_AWS(kms_stubber): kms_stubber.add_response("list_aliases", list_aliases_response) kms_stubber.add_response("get_key_policy", get_key_policy_no_AWS_response) - check = KMSKeyExposedCheck() - results = check.execute() + results = kms_key_exposed_check(cache={}, awsAccountId="012345678901", awsRegion="us-east-1") for result in results: if "s3" in result["Id"]: assert result["RecordState"] == "ARCHIVED"