diff --git a/setup.cfg b/setup.cfg index 76a0c9580..50a66d4d4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -51,6 +51,7 @@ install_requires = pytz requests >= 1.0.0 six + iso3166 [options.packages.find] diff --git a/src/saml2/config.py b/src/saml2/config.py index 274e3960f..63353dc4a 100644 --- a/src/saml2/config.py +++ b/src/saml2/config.py @@ -7,6 +7,10 @@ import os import re import sys +from functools import partial +import re +from urllib import parse +from iso3166 import countries import six @@ -21,6 +25,7 @@ from saml2.mdstore import MetadataStore from saml2.saml import NAME_FORMAT_URI from saml2.virtual_org import VirtualOrg +from saml2.utility import not_empty logger = logging.getLogger(__name__) @@ -97,6 +102,9 @@ "sp_type", "sp_type_in_metadata", "requested_attributes", + "node_country", + "application_identifier", + "protocol_version" ] AA_IDP_ARGS = [ @@ -118,6 +126,9 @@ "domain", "name_qualifier", "edu_person_targeted_id", + "node_country", + "application_identifier", + "protocol_version", ] PDP_ARGS = ["endpoints", "name_form", "name_id_format"] @@ -176,6 +187,13 @@ "attribute_consuming_service": _RPA } +EIDAS_NOTIFIED_LOA_PREFIX = "http://eidas.europa.eu/LoA/" + +EIDAS_NOTIFIED_LOA = [ + "{prefix}{x}".format(prefix=EIDAS_NOTIFIED_LOA_PREFIX, x=x) + for x in ["low", "substantial", "high"] +] + class ConfigurationError(SAMLError): pass @@ -540,6 +558,9 @@ def service_per_endpoint(self, context=None): res[endp] = (service, binding) return res + def validate(self): + pass + class SPConfig(Config): def_context = "sp" @@ -569,6 +590,137 @@ def ecp_endpoint(self, ipaddress): return None +class eIDASConfig(Config): + def get_endpoint_element(self, element): + pass + + def get_protocol_version(self): + pass + + def get_application_identifier(self): + pass + + def get_node_country(self): + pass + + @staticmethod + def validate_node_country_format(node_country): + try: + return countries.get(node_country).alpha2 == node_country + except KeyError: + return False + + @staticmethod + def validate_application_identifier_format(application_identifier): + pattern_match = re.search(r"([a-zA-Z0-9])+:([a-zA-Z0-9():_\-])+:([0-9])+" + r"(\.([0-9])+){1,2}", application_identifier) + if not application_identifier or pattern_match: + return True + return False + + @staticmethod + def get_type_contact_person(contacts, ctype): + return [contact for contact in contacts + if contact.get("contact_type") == ctype] + + @staticmethod + def contact_has_email_address(contact): + return not_empty(contact.get("email_address")) + + @property + def warning_validators(self): + return { + "single_logout_service SHOULD NOT be declared": + self.get_endpoint_element("single_logout_service") is None, + "artifact_resolution_service SHOULD NOT be declared": + self.get_endpoint_element("artifact_resolution_service") is None, + "manage_name_id_service SHOULD NOT be declared": + self.get_endpoint_element("manage_name_id_service") is None, + "application_identifier SHOULD be declared": + not_empty(self.get_application_identifier()), + "protocol_version SHOULD be declared": + not_empty(self.get_protocol_version()), + "minimal organization info (name/dname/url) SHOULD be declared": + not_empty(self.organization), + "contact_person with contact_type 'technical' and at least one " + "email_address SHOULD be declared": + any(filter(self.contact_has_email_address, + self.get_type_contact_person(self.contact_person, + ctype="technical"))), + "contact_person with contact_type 'support' and at least one " + "email_address SHOULD be declared": + any(filter(self.contact_has_email_address, + self.get_type_contact_person(self.contact_person, + ctype="support"))) + } + + @property + def error_validators(self): + return { + "KeyDescriptor MUST be declared": + not_empty(self.cert_file or self.encryption_keypairs), + "node_country MUST be declared in ISO 3166-1 alpha-2 format": + self.validate_node_country_format(self.get_node_country()), + "application_identifier MUST be in the form ::.[.]": + self.validate_application_identifier_format( + self.get_application_identifier()), + "entityid MUST be an HTTPS URL pointing to the location of its published " + "metadata": + parse.urlparse(self.entityid).scheme == "https" + } + + def validate(self): + if not all(self.warning_validators.values()): + logger.warning( + "Configuration validation warnings occurred: {}".format( + [msg for msg, check in self.warning_validators.items() + if check is not True] + ) + ) + + if not all(self.error_validators.values()): + error = "Configuration validation errors occurred {}:".format( + [msg for msg, check in self.error_validators.items() + if check is not True]) + logger.error(error) + raise ConfigValidationError(error) + + +class eIDASSPConfig(SPConfig, eIDASConfig): + def get_endpoint_element(self, element): + return getattr(self, "_sp_endpoints", {}).get(element, None) + + def get_application_identifier(self): + return getattr(self, "_sp_application_identifier", None) + + def get_protocol_version(self): + return getattr(self, "_sp_protocol_version", None) + + def get_node_country(self): + return getattr(self, "_sp_node_country", None) + + @property + def warning_validators(self): + sp_warning_validators = { + "hide_assertion_consumer_service SHOULD be set to True": + getattr(self, "_sp_hide_assertion_consumer_service", None) is True + } + return {**super().warning_validators, **sp_warning_validators} + + @property + def error_validators(self): + sp_error_validators = { + "authn_requests_signed MUST be set to True": + getattr(self, "_sp_authn_requests_signed", None) is True, + "sp_type MUST be set to 'public' or 'private'": + getattr(self, "_sp_sp_type", None) in ("public", "private"), + "allow_unsolicited MUST be set to False": + getattr(self, "_sp_allow_unsolicited", None) is False + } + return {**super().error_validators, **sp_error_validators} + + class IdPConfig(Config): def_context = "idp" @@ -576,6 +728,57 @@ def __init__(self): Config.__init__(self) +class eIDASIdPConfig(IdPConfig, eIDASConfig): + def get_endpoint_element(self, element): + return getattr(self, "_idp_endpoints", {}).get(element, None) + + def get_application_identifier(self): + return getattr(self, "_idp_application_identifier", None) + + def get_protocol_version(self): + return getattr(self, "_idp_protocol_version", None) + + def get_node_country(self): + return getattr(self, "_idp_node_country", None) + + def verify_non_notified_loa(self): + return not any( + x.startswith(EIDAS_NOTIFIED_LOA_PREFIX) and x not in EIDAS_NOTIFIED_LOA + for x in getattr(self, "assurance_certification", [])) + + def verify_notified_loa(self): + return any( + x in EIDAS_NOTIFIED_LOA + for x in getattr(self, "assurance_certification", []) + ) + + @property + def warning_validators(self): + idp_warning_validators = {} + return {**super().warning_validators, **idp_warning_validators} + + @property + def error_validators(self): + idp_error_validators = { + "want_authn_requests_signed MUST be set to True": + getattr(self, "_idp_want_authn_requests_signed", None) is True, + "provided_attributes MUST be set to denote the supported attributes by " + "the IdP": + not_empty(getattr(self, "_idp_provided_attributes", None)), + "assurance_certification for non-notified eIDs MUST NOT use an " + "{} prefix".format(EIDAS_NOTIFIED_LOA_PREFIX): + self.verify_non_notified_loa(), + "assurance_certification for notified eID MUST be (at least) one of [{}]" + .format(", ".join(EIDAS_NOTIFIED_LOA)): + self.verify_notified_loa(), + "sign_response MUST be set to True": + getattr(self, "_idp_sign_response", None) is True, + "encrypt_assertion MUST be set to True": + getattr(self, "_idp_encrypt_assertion", None) is True, + } + return {**super().error_validators, **idp_error_validators} + + def config_factory(_type, config): """ @@ -603,3 +806,7 @@ def config_factory(_type, config): conf.context = _type return conf + + +class ConfigValidationError(Exception): + pass diff --git a/src/saml2/extension/node_country.py b/src/saml2/extension/node_country.py new file mode 100644 index 000000000..e9368fd2c --- /dev/null +++ b/src/saml2/extension/node_country.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python + +# +# Generated Thu Dec 12 18:16:51 2019 by parse_xsd.py version 0.5. +# + +import saml2 +from saml2 import SamlBase + + +NAMESPACE = 'http://eidas.europa.eu/saml-extensions' +class NodeCountryType_(SamlBase): + """The http://eidas.europa.eu/saml-extensions:NodeCountryType element """ + + c_tag = 'NodeCountryType' + c_namespace = NAMESPACE + c_children = SamlBase.c_children.copy() + c_attributes = SamlBase.c_attributes.copy() + c_child_order = SamlBase.c_child_order[:] + c_cardinality = SamlBase.c_cardinality.copy() + +def node_country_type__from_string(xml_string): + return saml2.create_class_from_xml_string(NodeCountryType_, xml_string) + + +class NodeCountry(NodeCountryType_): + """The http://eidas.europa.eu/saml-extensions:NodeCountry element """ + + c_tag = 'NodeCountry' + c_namespace = NAMESPACE + c_children = NodeCountryType_.c_children.copy() + c_attributes = NodeCountryType_.c_attributes.copy() + c_child_order = NodeCountryType_.c_child_order[:] + c_cardinality = NodeCountryType_.c_cardinality.copy() + +def node_country_from_string(xml_string): + return saml2.create_class_from_xml_string(NodeCountry, xml_string) + + +ELEMENT_FROM_STRING = { + NodeCountry.c_tag: node_country_from_string, + NodeCountryType_.c_tag: node_country_type__from_string, +} + +ELEMENT_BY_TAG = { + 'NodeCountry': NodeCountry, + 'NodeCountryType': NodeCountryType_, +} +def factory(tag, **kwargs): + return ELEMENT_BY_TAG[tag](**kwargs) diff --git a/src/saml2/metadata.py b/src/saml2/metadata.py index 5c465032b..ddac542bb 100644 --- a/src/saml2/metadata.py +++ b/src/saml2/metadata.py @@ -10,6 +10,7 @@ from saml2.extension import shibmd from saml2.extension import mdattr from saml2.extension import sp_type +from saml2.extension import node_country from saml2.saml import NAME_FORMAT_URI from saml2.saml import AttributeValue from saml2.saml import Attribute @@ -20,6 +21,7 @@ from saml2 import BINDING_SOAP from saml2 import samlp from saml2 import class_name +from saml2.utility import make_list from saml2 import xmldsig as ds import six @@ -592,6 +594,14 @@ def do_idpsso_descriptor(conf, cert=None, enc_cert=None): except KeyError: setattr(idpsso, key, DEFAULTS[key]) + attributes = [ + Attribute(name=attribute.get("name", None), + name_format=attribute.get("name_format", None), + friendly_name=attribute.get("friendly_name", None)) + for attribute in conf.getattr("provided_attributes", "idp") + ] + idpsso.attribute = attributes + return idpsso @@ -770,6 +780,33 @@ def entity_descriptor(confd): entd.authn_authority_descriptor = do_aq_descriptor(confd, mycert, enc_cert) + conf_node_country = confd.getattr('node_country', confd.context) + if conf_node_country: + if not entd.extensions: + entd.extensions = md.Extensions() + item = node_country.NodeCountry(text=conf_node_country) + entd.extensions.add_extension_element(item) + + app_identifer = confd.getattr("application_identifier", confd.context) + if app_identifer: + entd.extensions = entd.extensions or md.Extensions() + ava = AttributeValue(text=app_identifer) + attr = Attribute( + attribute_value=ava, + name="http://eidas.europa.eu/entity-attributes/application-identifier" + ) + _add_attr_to_entity_attributes(entd.extensions, attr) + + protocol_version = confd.getattr("protocol_version", confd.context) + if protocol_version: + entd.extensions = entd.extensions or md.Extensions() + ava = [AttributeValue(text=str(c)) for c in make_list(protocol_version)] + attr = Attribute( + attribute_value=ava, + name="http://eidas.europa.eu/entity-attributes/protocol-version" + ) + _add_attr_to_entity_attributes(entd.extensions, attr) + return entd diff --git a/src/saml2/utility/__init__.py b/src/saml2/utility/__init__.py new file mode 100644 index 000000000..7fe70232c --- /dev/null +++ b/src/saml2/utility/__init__.py @@ -0,0 +1,19 @@ +def make_type(mtype, *args): + t_args = [] + similar_type = tuple if mtype is list else list + for x in args: + t_args.extend([x] if not isinstance(x, (list, tuple)) else similar_type(x)) + return mtype(t_args) + + +def make_list(*args): + return make_type(list, *args) + + +def not_empty(element): + if isinstance(element, bool): + return True + + if element: + return True + return False diff --git a/tests/eidas/eidas_idp_conf.py b/tests/eidas/eidas_idp_conf.py new file mode 100644 index 000000000..515501b99 --- /dev/null +++ b/tests/eidas/eidas_idp_conf.py @@ -0,0 +1,96 @@ +from saml2 import BINDING_SOAP +from saml2 import BINDING_HTTP_REDIRECT +from saml2 import BINDING_HTTP_POST +from saml2.saml import NAMEID_FORMAT_PERSISTENT, NAME_FORMAT_BASIC +from saml2.saml import NAME_FORMAT_URI + +from pathutils import full_path +from pathutils import xmlsec_path + +BASE = "http://localhost:8088" + +CONFIG = { + "assurance_certification": ["http://eidas.europa.eu/LoA/high", + "http://eidas.europa.eu/LoA/low"], + "entityid": "https://example.org", + "name": "Rolands IdP", + "service": { + "idp": { + "endpoints": { + "single_sign_on_service": [ + ("%s/sso" % BASE, BINDING_HTTP_REDIRECT)], + }, + "policy": { + "default": { + "lifetime": {"minutes": 15}, + "attribute_restrictions": None, # means all I have + "name_form": NAME_FORMAT_URI, + }, + "urn:mace:example.com:saml:roland:sp": { + "lifetime": {"minutes": 5}, + "nameid_format": NAMEID_FORMAT_PERSISTENT, + }, + "https://example.com/sp": { + "lifetime": {"minutes": 5}, + "nameid_format": NAMEID_FORMAT_PERSISTENT, + "name_form": NAME_FORMAT_BASIC + } + }, + "subject_data": full_path("subject_data.db"), + "node_country": "GR", + "application_identifier": "CEF:eIDAS-ref:2.0", + "protocol_version": [1.1, 2.2], + "want_authn_requests_signed": True, + "provided_attributes": [ + { + "name": "http://eidas.europa.eu/attributes/naturalperson/PersonIdentifier", + "friendly_name": "PersonIdentifier", + "name_format": "urn:oasis:names:tc:SAML:2.0:attrname-format:uri" + }, + { + "name": "http://eidas.europa.eu/attributes/naturalperson/CurrentFamilyName", + "friendly_name": "FamilyName", + }, + { + "name": "http://eidas.europa.eu/attributes/naturalperson/CurrentGivenName", + "name_format": "urn:oasis:names:tc:SAML:2.0:attrname-format:uri" + } + ], + "sign_response": True, + "encrypt_assertion": True + }, + }, + "debug": 1, + "key_file": full_path("test.key"), + "cert_file": full_path("test.pem"), + "xmlsec_binary": xmlsec_path, + "metadata": [{ + "class": "saml2.mdstore.MetaDataFile", + "metadata": [(full_path("metadata_sp_1.xml"), ), + (full_path("metadata_sp_2.xml"), ), + (full_path("vo_metadata.xml"), )], + }], + "attribute_map_dir": full_path("attributemaps"), + "organization": { + "name": ("AB Exempel", "se"), + "display_name": ("AB Exempel", "se"), + "url": "http://www.example.org", + }, + "contact_person": [ + { + "given_name": "Roland", + "sur_name": "Hedberg", + "telephone_number": "+46 70 100 0000", + "email_address": ["tech@eample.com", + "tech@example.org"], + "contact_type": "technical" + }, + { + "given_name": "Roland", + "sur_name": "Hedberg", + "telephone_number": "+46 70 100 0000", + "email_address": ["tech@eample.com", + "tech@example.org"], + "contact_type": "support"} + ], +} diff --git a/tests/eidas/eidas_sp_conf.py b/tests/eidas/eidas_sp_conf.py new file mode 100644 index 000000000..5b9cdde93 --- /dev/null +++ b/tests/eidas/eidas_sp_conf.py @@ -0,0 +1,92 @@ +from pathutils import full_path +from pathutils import xmlsec_path + +CONFIG = { + "entityid": "https://example.org", + "name": "urn:mace:example.com:saml:roland:sp", + "description": "My own SP", + "service": { + "sp": { + "endpoints": { + "assertion_consumer_service": [ + "http://lingon.catalogix.se:8087/"], + }, + "required_attributes": ["surName", "givenName", "mail"], + "optional_attributes": ["title"], + "idp": ["urn:mace:example.com:saml:roland:idp"], + "requested_attributes": [ + { + "name": "http://eidas.europa.eu/attributes/naturalperson/DateOfBirth", + "required": False, + }, + { + "friendly_name": "PersonIdentifier", + "required": True, + }, + { + "friendly_name": "PlaceOfBirth", + }, + ], + "sp_type": "public", + "sp_type_in_metadata": False, + "force_authn": True, + "authn_requests_signed": True, + "node_country": "GR", + "application_identifier": "CEF:eIDAS-ref:2.0", + "protocol_version": [1.1, 2.2], + "hide_assertion_consumer_service": True, + "allow_unsolicited": False + } + }, + "debug": 1, + "key_file": full_path("test.key"), + "cert_file": full_path("test.pem"), + "encryption_keypairs": [{"key_file": full_path("test_1.key"), "cert_file": full_path("test_1.crt")}, + {"key_file": full_path("test_2.key"), "cert_file": full_path("test_2.crt")}], + "ca_certs": full_path("cacerts.txt"), + "xmlsec_binary": xmlsec_path, + "metadata": [{ + "class": "saml2.mdstore.MetaDataFile", + "metadata": [(full_path("idp.xml"), ), (full_path("vo_metadata.xml"), )], + }], + "virtual_organization": { + "urn:mace:example.com:it:tek": { + "nameid_format": "urn:oid:1.3.6.1.4.1.1466.115.121.1.15-NameID", + "common_identifier": "umuselin", + } + }, + "subject_data": "subject_data.db", + "accepted_time_diff": 60, + "attribute_map_dir": full_path("attributemaps"), + "valid_for": 6, + "organization": { + "name": ("AB Exempel", "se"), + "display_name": ("AB Exempel", "se"), + "url": "http://www.example.org", + }, + "contact_person": [ + { + "given_name": "Roland", + "sur_name": "Hedberg", + "telephone_number": "+46 70 100 0000", + "email_address": ["tech@eample.com", + "tech@example.org"], + "contact_type": "technical" + }, + { + "given_name": "Roland", + "sur_name": "Hedberg", + "telephone_number": "+46 70 100 0000", + "email_address": ["tech@eample.com", + "tech@example.org"], + "contact_type": "support"} + ], + "logger": { + "rotating": { + "filename": full_path("sp.log"), + "maxBytes": 100000, + "backupCount": 5, + }, + "loglevel": "info", + } +} diff --git a/tests/eidas/sp2.xml b/tests/eidas/sp2.xml new file mode 100644 index 000000000..777f40e99 --- /dev/null +++ b/tests/eidas/sp2.xml @@ -0,0 +1,40 @@ +GRCEF:eIDAS-ref:2.01.12.2MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB +gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy +3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN +efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G +A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs +iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt +U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw +mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6 +h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5 +U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6 +mrPzGzk3ECbupFnqyREH3+ZPSdk= +MIICITCCAYoCAQEwDQYJKoZIhvcNAQELBQAwWDELMAkGA1UEBhMCenoxCzAJBgNV +BAgMAnp6MQ0wCwYDVQQHDAR6enp6MQ4wDAYDVQQKDAVaenp6ejEOMAwGA1UECwwF +Wnp6enoxDTALBgNVBAMMBHRlc3QwIBcNMTkwNDEyMTk1MDM0WhgPMzAxODA4MTMx +OTUwMzRaMFgxCzAJBgNVBAYTAnp6MQswCQYDVQQIDAJ6ejENMAsGA1UEBwwEenp6 +ejEOMAwGA1UECgwFWnp6enoxDjAMBgNVBAsMBVp6enp6MQ0wCwYDVQQDDAR0ZXN0 +MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDHcj80WU/XBsd9FlyQmfjPUdfm +edhCFDd6TEQmZNNqP/UG+VkGa+BXjRIHMfic/WxPTbGhCjv68ci0UDNomUXagFex +LGNpkwa7+CRVtoc/1xgq+ySE6M4nhcCutScoxNvWNn5eSQ66i3U0sTv91MgsXxqE +dTaiZg0BIufEc3dueQIDAQABMA0GCSqGSIb3DQEBCwUAA4GBAGUV5B+USHvaRa8k +gCNJSuNpo6ARlv0ekrk8bbdNRBiEUdCMyoGJFfuM9K0zybX6Vr25wai3nvaog294 +Vx/jWjX2g5SDbjItH6VGy6C9GCGf1A07VxFRCfJn5tA9HuJjPKiE+g/BmrV5N4Ce +alzFxPHWYkNOzoRU8qI7OqUai1kL +MIICITCCAYoCAQEwDQYJKoZIhvcNAQELBQAwWDELMAkGA1UEBhMCenoxCzAJBgNV +BAgMAnp6MQ0wCwYDVQQHDAR6enp6MQ4wDAYDVQQKDAVaenp6ejEOMAwGA1UECwwF +Wnp6enoxDTALBgNVBAMMBHRlc3QwIBcNMTkwNDEyMTk1MDM0WhgPMzAxODA4MTMx +OTUwMzRaMFgxCzAJBgNVBAYTAnp6MQswCQYDVQQIDAJ6ejENMAsGA1UEBwwEenp6 +ejEOMAwGA1UECgwFWnp6enoxDjAMBgNVBAsMBVp6enp6MQ0wCwYDVQQDDAR0ZXN0 +MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDjW0kJM+4baWKtvO24ZsGXNvNK +KkwTMz7OW5Z6BRqhSOq2WA0c5NCpMk6rD8Z2OTFEolPojEjf8dVyd/Ds/hrjFKQv +8wQgbdXLN51YTIsgd6h+hBJO+vzhl0PT4aT7M0JKo5ALtS6qk4tsworW2BnwyvsG +SAinwfeWt4t/b1J3kwIDAQABMA0GCSqGSIb3DQEBCwUAA4GBAFtj7WArQQBugmh/ +KQjjlfTQ5A052QeXfgTyO9vv1S6MRIi7qgiaEv49cGXnJv/TWbySkMKObPMUApjg +6z8PqcxuShew5FCTkNvwhABFPiyu0fUj3e2FEPHfsBu76jz4ugtmhUqjqhzwFY9c +tnWRkkl6J0AjM3LnHOSgjNIclDZG +urn:mace:example.com:saml:roland:spMy own SPAB ExempelAB Exempelhttp://www.example.orgRolandHedbergtech@eample.comtech@example.org+46 70 100 0000RolandHedbergtech@eample.comtech@example.org+46 70 100 0000 diff --git a/tests/eidas/test_idp.py b/tests/eidas/test_idp.py new file mode 100644 index 000000000..a568cb0ad --- /dev/null +++ b/tests/eidas/test_idp.py @@ -0,0 +1,326 @@ +import pytest +import copy +from saml2 import BINDING_HTTP_POST +from saml2 import metadata +from saml2.utility import make_list +from saml2.client import Saml2Client +from saml2.server import Server +from saml2.config import eIDASSPConfig, eIDASIdPConfig, ConfigValidationError +from eidas.eidas_idp_conf import CONFIG + + +class TestIdP: + def setup_class(self): + self.server = Server("eidas_idp_conf") + + self.conf = eIDASIdPConfig() + self.conf.load_file("eidas_idp_conf") + + sp_conf = eIDASSPConfig() + sp_conf.load_file("eidas_sp_conf") + + self.client = Saml2Client(sp_conf) + + def teardown_class(self): + self.server.close() + + @pytest.fixture(scope="function") + def config(self): + return copy.deepcopy(CONFIG) + + def test_node_country_in_metadata(self): + entd = metadata.entity_descriptor(self.conf) + assert any(filter(lambda x: x.tag == "NodeCountry", + entd.extensions.extension_elements)) + + def test_application_identifier_in_metadata(self): + entd = metadata.entity_descriptor(self.conf) + entity_attributes = next(filter(lambda x: x.tag == "EntityAttributes", + entd.extensions.extension_elements)) + app_identifier = [ + x for x in entity_attributes.children + if getattr(x, "name", "") == + "http://eidas.europa.eu/entity-attributes/application-identifier" + ] + assert len(app_identifier) == 1 + assert self.conf._idp_application_identifier \ + == next(x.attribute_value.text for x in app_identifier) + + def test_multiple_protocol_version_in_metadata(self): + entd = metadata.entity_descriptor(self.conf) + entity_attributes = next(filter(lambda x: x.tag == "EntityAttributes", + entd.extensions.extension_elements)) + protocol_version = next( + x for x in entity_attributes.children + if getattr(x, "name", "") == + "http://eidas.europa.eu/entity-attributes/protocol-version" + ) + assert len(protocol_version.attribute_value) == 2 + assert set(str(x) for x in self.conf._idp_protocol_version) \ + == set([x.text for x in protocol_version.attribute_value]) + + def test_protocol_version_in_metadata(self, config): + config["service"]["idp"]["protocol_version"] = 1.2 + + conf = eIDASIdPConfig() + conf.load(config) + + entd = metadata.entity_descriptor(conf) + entity_attributes = next(filter(lambda x: x.tag == "EntityAttributes", + entd.extensions.extension_elements)) + protocol_version = next( + x for x in entity_attributes.children + if getattr(x, "name", "") == + "http://eidas.europa.eu/entity-attributes/protocol-version" + ) + assert len(protocol_version.attribute_value) == 1 + assert {str(conf._idp_protocol_version)} \ + == set([x.text for x in protocol_version.attribute_value]) + + def test_supported_attributes(self, config): + entd = metadata.entity_descriptor(self.conf) + attributes_published = [ + set( + filter(lambda x: x is not None, + [attribute.name, attribute.name_format, attribute.friendly_name] + ) + ) + for attribute in entd.idpsso_descriptor.attribute + ] + attributes_stated = [set(x.values()) for x + in self.conf._idp_provided_attributes] + assert all(filter(lambda x: x in attributes_published, attributes_stated)) + + def test_loa_attribute_exposed(self, config): + entd = metadata.entity_descriptor(self.conf) + entity_attributes = next(filter(lambda x: x.tag == "EntityAttributes", + entd.extensions.extension_elements)) + loa_attribute = next( + (x for x in entity_attributes.children + if getattr(x, "attributes", {}).get("Name") == + "urn:oasis:names:tc:SAML:attribute:assurance-certification"), None + ) + assert loa_attribute is not None + assert loa_attribute.attributes.get("NameFormat", "") == \ + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri" + loa_values = {x.text for x in loa_attribute.children} + + assert loa_values == set(config["assurance_certification"]) + + +class TestIdPConfig: + @staticmethod + def config_validate(config): + conf = eIDASIdPConfig() + conf.load(config) + conf.validate() + + @staticmethod + def assert_validation_error(config): + conf = eIDASIdPConfig() + conf.load(config) + with pytest.raises(ConfigValidationError): + conf.validate() + + @pytest.fixture(scope="function") + def technical_contacts(self, config): + return [ + x for x in config["contact_person"] + if x["contact_type"] == "technical" + ] + + @pytest.fixture(scope="function") + def support_contacts(self, config): + return [ + x for x in config["contact_person"] + if x["contact_type"] == "support" + ] + + @pytest.fixture(scope="function") + def raise_error_on_warning(self, monkeypatch): + def r(*args, **kwargs): + raise ConfigValidationError() + monkeypatch.setattr("saml2.config.logger.warning", r) + + @pytest.fixture(scope="function") + def config(self): + return copy.deepcopy(CONFIG) + + def test_singlelogout_declared(self, config, raise_error_on_warning): + config["service"]["idp"]["endpoints"]["single_logout_service"] = \ + [("https://example.com", BINDING_HTTP_POST)] + self.assert_validation_error(config) + + def test_artifact_resolution_declared(self, config, raise_error_on_warning): + config["service"]["idp"]["endpoints"]["artifact_resolution_service"] = \ + [("https://example.com", BINDING_HTTP_POST)] + self.assert_validation_error(config) + + def test_manage_nameid_service_declared(self, config, raise_error_on_warning): + config["service"]["idp"]["endpoints"]["manage_name_id_service"] = \ + [("https://example.com", BINDING_HTTP_POST)] + self.assert_validation_error(config) + + def test_no_keydescriptor(self, config): + del config["cert_file"] + self.assert_validation_error(config) + + def test_no_nodecountry(self, config): + del config["service"]["idp"]["node_country"] + self.assert_validation_error(config) + + def test_nodecountry_wrong_format(self, config): + config["service"]["idp"]["node_country"] = "gr" + self.assert_validation_error(config) + + def test_no_application_identifier_warning(self, config, raise_error_on_warning): + del config["service"]["idp"]["application_identifier"] + + self.assert_validation_error(config) + + def test_empty_application_identifier_warning(self, config, raise_error_on_warning): + config["service"]["idp"]["application_identifier"] = "" + + self.assert_validation_error(config) + + def test_application_identifier_wrong_format(self, config): + config["service"]["idp"]["application_identifier"] = "TEST:Node.1" + + self.assert_validation_error(config) + + def test_config_ok(self, config, raise_error_on_warning): + self.config_validate(config) + + def test_no_protocol_version_warning(self, config, raise_error_on_warning): + del config["service"]["idp"]["protocol_version"] + + self.assert_validation_error(config) + + def test_empty_protocol_version_warning(self, config, raise_error_on_warning): + config["service"]["idp"]["protocol_version"] = "" + + self.assert_validation_error(config) + + def test_no_organization_info_warning(self, config, raise_error_on_warning): + del config["organization"] + + self.assert_validation_error(config) + + def test_empty_organization_info_warning(self, config, raise_error_on_warning): + config["organization"] = {} + + self.assert_validation_error(config) + + def test_no_technical_contact_person(self, + config, + technical_contacts, + raise_error_on_warning): + for contact in technical_contacts: + contact["contact_type"] = "other" + + self.assert_validation_error(config) + + def test_technical_contact_person_no_email(self, + config, + technical_contacts, + raise_error_on_warning): + + for contact in technical_contacts: + del contact["email_address"] + + self.assert_validation_error(config) + + def test_technical_contact_person_empty_email(self, + config, + technical_contacts, + raise_error_on_warning): + + for contact in technical_contacts: + del contact["email_address"] + + self.assert_validation_error(config) + + def test_no_support_contact_person(self, + config, + support_contacts, + raise_error_on_warning): + for contact in support_contacts: + contact["contact_type"] = "other" + + self.assert_validation_error(config) + + def test_support_contact_person_no_email(self, + config, + support_contacts, + raise_error_on_warning): + + for contact in support_contacts: + del contact["email_address"] + + self.assert_validation_error(config) + + def test_support_contact_person_empty_email(self, + config, + support_contacts, + raise_error_on_warning): + + for contact in support_contacts: + del contact["email_address"] + + self.assert_validation_error(config) + + def test_entityid_no_https(self, config): + config["entityid"] = "urn:mace:example.com:saml:roland:idp" + + self.assert_validation_error(config) + + def test_want_authn_requests_signed_unset(self, config): + del config["service"]["idp"]["want_authn_requests_signed"] + + self.assert_validation_error(config) + + def test_want_authn_requests_signed_false(self, config): + config["service"]["idp"]["want_authn_requests_signed"] = False + + self.assert_validation_error(config) + + def test_provided_attributes_unset(self, config): + del config["service"]["idp"]["provided_attributes"] + + self.assert_validation_error(config) + + def test_assurance_certification_unset(self, config): + del config["assurance_certification"] + + self.assert_validation_error(config) + + def test_non_notified_loa_invalid_prefix(self, config): + config["assurance_certification"] = \ + ["http://eidas.europa.eu/LoA/something-else"] + + self.assert_validation_error(config) + + def test_no_notified_loa(self, config): + config["assurance_certification"] = ["http://example.com/something-else"] + + self.assert_validation_error(config) + + def test_sign_response_unset(self, config): + del config["service"]["idp"]["sign_response"] + + self.assert_validation_error(config) + + def test_sign_response_false(self, config): + config["service"]["idp"]["sign_response"] = False + + self.assert_validation_error(config) + + def test_encrypt_assertion_unset(self, config): + del config["service"]["idp"]["encrypt_assertion"] + + self.assert_validation_error(config) + + def test_encrypt_assertion_false(self, config): + config["service"]["idp"]["encrypt_assertion"] = False + + self.assert_validation_error(config) diff --git a/tests/eidas/test_sp.py b/tests/eidas/test_sp.py new file mode 100644 index 000000000..3331e23c1 --- /dev/null +++ b/tests/eidas/test_sp.py @@ -0,0 +1,314 @@ +import pytest +import copy +from saml2 import BINDING_HTTP_POST +from saml2 import metadata +from saml2 import samlp +from saml2.client import Saml2Client +from saml2.server import Server +from saml2.config import eIDASSPConfig, ConfigValidationError +from eidas.eidas_sp_conf import CONFIG + + +class TestSP: + def setup_class(self): + self.server = Server("eidas_idp_conf") + + self.conf = eIDASSPConfig() + self.conf.load_file("eidas_sp_conf") + + self.client = Saml2Client(self.conf) + + def teardown_class(self): + self.server.close() + + @pytest.fixture(scope="function") + def config(self): + return copy.deepcopy(CONFIG) + + def test_authn_request_force_authn(self): + req_str = "{0}".format(self.client.create_authn_request( + "http://www.example.com/sso", message_id="id1")[-1]) + req = samlp.authn_request_from_string(req_str) + assert req.force_authn == "true" + + def test_sp_type_only_in_request(self): + entd = metadata.entity_descriptor(self.conf) + req_str = "{0}".format(self.client.create_authn_request( + "http://www.example.com/sso", message_id="id1")[-1]) + req = samlp.authn_request_from_string(req_str) + sp_type_elements = filter(lambda x: x.tag == "SPType", + req.extensions.extension_elements) + assert any(filter(lambda x: x.text == "public", sp_type_elements)) + assert not any(filter(lambda x: x.tag == "SPType", + entd.extensions.extension_elements)) + + def test_sp_type_in_metadata(self, config): + config["service"]["sp"]["sp_type_in_metadata"] = True + sconf = eIDASSPConfig() + sconf.load(config) + custom_client = Saml2Client(sconf) + + req_str = "{0}".format(custom_client.create_authn_request( + "http://www.example.com/sso", message_id="id1")[-1]) + req = samlp.authn_request_from_string(req_str) + sp_type_elements = filter(lambda x: x.tag == "SPType", + req.extensions.extension_elements) + assert not any(filter(lambda x: x.text == "public", sp_type_elements)) + + entd = metadata.entity_descriptor(sconf) + assert any(filter(lambda x: x.tag == "SPType", + entd.extensions.extension_elements)) + + def test_node_country_in_metadata(self): + entd = metadata.entity_descriptor(self.conf) + assert any(filter(lambda x: x.tag == "NodeCountry", + entd.extensions.extension_elements)) + + def test_application_identifier_in_metadata(self): + entd = metadata.entity_descriptor(self.conf) + entity_attributes = next(filter(lambda x: x.tag == "EntityAttributes", + entd.extensions.extension_elements)) + app_identifier = [ + x for x in entity_attributes.children + if getattr(x, "attributes", {}).get("Name") == + "http://eidas.europa.eu/entity-attributes/application-identifier" + ] + assert len(app_identifier) == 1 + assert self.conf._sp_application_identifier \ + == next(x.text for y in app_identifier for x in y.children) + + def test_multiple_protocol_version_in_metadata(self): + entd = metadata.entity_descriptor(self.conf) + entity_attributes = next(filter(lambda x: x.tag == "EntityAttributes", + entd.extensions.extension_elements)) + protocol_version = next( + x for x in entity_attributes.children + if getattr(x, "name", "") == + "http://eidas.europa.eu/entity-attributes/protocol-version" + ) + assert len(protocol_version.attribute_value) == 2 + assert set(str(x) for x in self.conf._sp_protocol_version) \ + == set([x.text for x in protocol_version.attribute_value]) + + def test_protocol_version_in_metadata(self, config): + config["service"]["sp"]["protocol_version"] = 1.2 + + conf = eIDASSPConfig() + conf.load(config) + + entd = metadata.entity_descriptor(conf) + entity_attributes = next(filter(lambda x: x.tag == "EntityAttributes", + entd.extensions.extension_elements)) + protocol_version = next( + x for x in entity_attributes.children + if getattr(x, "name", "") == + "http://eidas.europa.eu/entity-attributes/protocol-version" + ) + assert len(protocol_version.attribute_value) == 1 + assert {str(conf._sp_protocol_version)} \ + == set([x.text for x in protocol_version.attribute_value]) + + +class TestSPConfig: + @staticmethod + def assert_validation_error(config): + conf = eIDASSPConfig() + conf.load(config) + with pytest.raises(ConfigValidationError): + conf.validate() + + @pytest.fixture(scope="function") + def technical_contacts(self, config): + return [ + x for x in config["contact_person"] + if x["contact_type"] == "technical" + ] + + @pytest.fixture(scope="function") + def support_contacts(self, config): + return [ + x for x in config["contact_person"] + if x["contact_type"] == "support" + ] + + @pytest.fixture(scope="function") + def raise_error_on_warning(self, monkeypatch): + def r(*args, **kwargs): + raise ConfigValidationError() + monkeypatch.setattr("saml2.config.logger.warning", r) + + @pytest.fixture(scope="function") + def config(self): + return copy.deepcopy(CONFIG) + + def test_singlelogout_declared(self, config, raise_error_on_warning): + config["service"]["sp"]["endpoints"]["single_logout_service"] = \ + [("https://example.com", BINDING_HTTP_POST)] + self.assert_validation_error(config) + + def test_artifact_resolution_declared(self, config, raise_error_on_warning): + config["service"]["sp"]["endpoints"]["artifact_resolution_service"] = \ + [("https://example.com", BINDING_HTTP_POST)] + self.assert_validation_error(config) + + def test_manage_nameid_service_declared(self, config, raise_error_on_warning): + config["service"]["sp"]["endpoints"]["manage_name_id_service"] = \ + [("https://example.com", BINDING_HTTP_POST)] + self.assert_validation_error(config) + + def test_no_keydescriptor(self, config): + del config["cert_file"] + del config["encryption_keypairs"] + self.assert_validation_error(config) + + def test_no_nodecountry(self, config): + del config["service"]["sp"]["node_country"] + self.assert_validation_error(config) + + def test_nodecountry_wrong_format(self, config): + config["service"]["sp"]["node_country"] = "gr" + self.assert_validation_error(config) + + def test_no_application_identifier_warning(self, config, raise_error_on_warning): + del config["service"]["sp"]["application_identifier"] + + self.assert_validation_error(config) + + def test_empty_application_identifier_warning(self, config, raise_error_on_warning): + config["service"]["sp"]["application_identifier"] = "" + + self.assert_validation_error(config) + + def test_application_identifier_wrong_format(self, config): + config["service"]["sp"]["application_identifier"] = "TEST:Node.1" + + self.assert_validation_error(config) + + def test_config_ok(self, config, raise_error_on_warning): + conf = eIDASSPConfig() + conf.load(config) + conf.validate() + + def test_no_protocol_version_warning(self, config, raise_error_on_warning): + del config["service"]["sp"]["protocol_version"] + + self.assert_validation_error(config) + + def test_empty_protocol_version_warning(self, config, raise_error_on_warning): + config["service"]["sp"]["protocol_version"] = "" + + self.assert_validation_error(config) + + def test_no_organization_info_warning(self, config, raise_error_on_warning): + del config["organization"] + + self.assert_validation_error(config) + + def test_empty_organization_info_warning(self, config, raise_error_on_warning): + config["organization"] = {} + + self.assert_validation_error(config) + + def test_no_technical_contact_person(self, + config, + technical_contacts, + raise_error_on_warning): + for contact in technical_contacts: + contact["contact_type"] = "other" + + self.assert_validation_error(config) + + def test_technical_contact_person_no_email(self, + config, + technical_contacts, + raise_error_on_warning): + + for contact in technical_contacts: + del contact["email_address"] + + self.assert_validation_error(config) + + def test_technical_contact_person_empty_email(self, + config, + technical_contacts, + raise_error_on_warning): + + for contact in technical_contacts: + del contact["email_address"] + + self.assert_validation_error(config) + + def test_no_support_contact_person(self, + config, + support_contacts, + raise_error_on_warning): + for contact in support_contacts: + contact["contact_type"] = "other" + + self.assert_validation_error(config) + + def test_support_contact_person_no_email(self, + config, + support_contacts, + raise_error_on_warning): + + for contact in support_contacts: + del contact["email_address"] + + self.assert_validation_error(config) + + def test_support_contact_person_empty_email(self, + config, + support_contacts, + raise_error_on_warning): + + for contact in support_contacts: + del contact["email_address"] + + self.assert_validation_error(config) + + def test_entityid_no_https(self, config): + config["entityid"] = "urn:mace:example.com:saml:roland:idp" + + self.assert_validation_error(config) + + def test_authn_requests_signed_false(self, config): + config["service"]["sp"]["authn_requests_signed"] = False + + self.assert_validation_error(config) + + def test_authn_requests_signed_unassigned(self, config): + del config["service"]["sp"]["authn_requests_signed"] + + self.assert_validation_error(config) + + def test_sp_type_undeclared(self, config): + del config["service"]["sp"]["sp_type"] + + self.assert_validation_error(config) + + def test_sp_type_invalid_value(self, config): + config["service"]["sp"]["sp_type"] = "test value" + + self.assert_validation_error(config) + + def test_hide_assertion_consumer_service_false(self, config, raise_error_on_warning): + config["service"]["sp"]["hide_assertion_consumer_service"] = False + + self.assert_validation_error(config) + + def test_hide_assertion_consumer_service_unset(self, config, + raise_error_on_warning): + del config["service"]["sp"]["hide_assertion_consumer_service"] + + self.assert_validation_error(config) + + def test_allow_unsolicited_true(self, config): + config["service"]["sp"]["allow_unsolicited"] = True + + self.assert_validation_error(config) + + def test_allow_unsolicited_unse(self, config): + del config["service"]["sp"]["allow_unsolicited"] + + self.assert_validation_error(config) diff --git a/tools/data/node_country.xsd b/tools/data/node_country.xsd new file mode 100644 index 000000000..c2a4e3722 --- /dev/null +++ b/tools/data/node_country.xsd @@ -0,0 +1,15 @@ + + + + + + + + +