From 83edd9aeab9dd86bae7ddde0ad1e4c1f60f3d175 Mon Sep 17 00:00:00 2001 From: Matthew Elwell Date: Thu, 9 Jan 2025 14:41:32 +0000 Subject: [PATCH 1/2] Add call to migrate to v2 --- api/environments/dynamodb/migrator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/environments/dynamodb/migrator.py b/api/environments/dynamodb/migrator.py index d53763c20160..975b4c192921 100644 --- a/api/environments/dynamodb/migrator.py +++ b/api/environments/dynamodb/migrator.py @@ -6,6 +6,7 @@ from features.models import FeatureState from features.multivariate.models import MultivariateFeatureStateValue from projects.models import Project +from projects.tasks import migrate_project_environments_to_v2 from util.queryset import iterator_with_prefetch from .types import DynamoProjectMetadata, ProjectIdentityMigrationStatus @@ -81,4 +82,5 @@ def migrate(self): ) ) identity_wrapper.write_identities(iterator_with_prefetch(identities)) + migrate_project_environments_to_v2(project_id) self.project_metadata.finish_identity_migration() From e307efe568064c74569047d7a072c48437b3e515 Mon Sep 17 00:00:00 2001 From: Matthew Elwell Date: Fri, 17 Jan 2025 18:33:11 +0000 Subject: [PATCH 2/2] WIP --- api/environments/dynamodb/migrator.py | 46 ++++++++++++++++++- .../dynamodb/wrappers/identity_wrapper.py | 19 +++++++- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/api/environments/dynamodb/migrator.py b/api/environments/dynamodb/migrator.py index 975b4c192921..76524539e951 100644 --- a/api/environments/dynamodb/migrator.py +++ b/api/environments/dynamodb/migrator.py @@ -1,4 +1,5 @@ from django.db.models import Prefetch +from flag_engine.identities.models import IdentityModel from edge_api.identities.events import send_migration_event from environments.identities.models import Identity @@ -7,9 +8,18 @@ from features.multivariate.models import MultivariateFeatureStateValue from projects.models import Project from projects.tasks import migrate_project_environments_to_v2 +from util.mappers import ( + map_engine_feature_state_to_identity_override, + map_feature_state_to_engine, +) from util.queryset import iterator_with_prefetch -from .types import DynamoProjectMetadata, ProjectIdentityMigrationStatus +from . import DynamoEnvironmentV2Wrapper +from .types import ( + DynamoProjectMetadata, + IdentityOverridesV2Changeset, + ProjectIdentityMigrationStatus, +) from .wrappers import ( DynamoEnvironmentAPIKeyWrapper, DynamoEnvironmentWrapper, @@ -52,10 +62,12 @@ def migrate(self): Project.objects.filter(id=project_id).update(enable_dynamo_db=True) environment_wrapper = DynamoEnvironmentWrapper() + environments_v2_wrapper = DynamoEnvironmentV2Wrapper() environments = Environment.objects.filter_for_document_builder( project_id=project_id ) environment_wrapper.write_environments(environments) + environments_v2_wrapper.write_environments(environments) api_key_wrapper = DynamoEnvironmentAPIKeyWrapper() api_keys = EnvironmentAPIKey.objects.filter(environment__project_id=project_id) @@ -81,6 +93,36 @@ def migrate(self): ), ) ) - identity_wrapper.write_identities(iterator_with_prefetch(identities)) + + # TODO: I'm not sure this approach will actually work or, even if it does, + # it's going to be ugly and end up serializing / deserializing unnecessarily. + # The logical solution here is to extend the DynamoIdentityWrapper so + # that we can handle both things, but that feels like quite a refactor. + + identity_documents_with_overrides = identity_wrapper.write_identities( + iterator_with_prefetch(identities), + return_filter=lambda i: i["identity_features"] + ) + + identity_models = [ + IdentityModel.model_validate(identity_document) + for identity_document in identity_documents_with_overrides + ] + + environments_v2_wrapper.update_identity_overrides( + changeset=IdentityOverridesV2Changeset( + to_delete=[], + to_put=[ + map_engine_feature_state_to_identity_override( + feature_state=map_feature_state_to_engine( + identity_override, + mv_fs_values=identity_override.multivariate_feature_state_values.all() + ), + identifier= + ) for identity_override in identity_overrides + ] + ) + ) + migrate_project_environments_to_v2(project_id) self.project_metadata.finish_identity_migration() diff --git a/api/environments/dynamodb/wrappers/identity_wrapper.py b/api/environments/dynamodb/wrappers/identity_wrapper.py index a9b605432bb2..c161e6593ece 100644 --- a/api/environments/dynamodb/wrappers/identity_wrapper.py +++ b/api/environments/dynamodb/wrappers/identity_wrapper.py @@ -43,7 +43,19 @@ def query_items(self, *args, **kwargs) -> "QueryOutputTableTypeDef": def put_item(self, identity_dict: dict): self.table.put_item(Item=identity_dict) - def write_identities(self, identities: Iterable["Identity"]): + def write_identities( + self, + identities: Iterable["Identity"], + return_filter: typing.Callable[[dict[str, typing.Any]], bool] = lambda _: False, + ) -> list[dict[str, typing.Any]]: + """ + Write the given ORM identities to the DynamoDB table. + + If return_filter is passed, return the identity documents + matching the given filter function. By default, no objects + are returned to limit memory usage. + """ + _return_objects = [] with self.table.batch_writer() as batch: for identity in identities: identity_document = map_identity_to_identity_document(identity) @@ -56,6 +68,11 @@ def write_identities(self, identities: Iterable["Identity"]): continue batch.put_item(Item=identity_document) + if return_filter(identity_document): + _return_objects.append(identity_document) + + return _return_objects + def get_item(self, composite_key: str) -> typing.Optional[dict]: return self.table.get_item(Key={"composite_key": composite_key}).get("Item")