From 82c82d9bd901bda89d8a5892bb7b0e1f037f39c3 Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Wed, 27 Nov 2024 02:10:04 +0100 Subject: [PATCH 01/21] wip --- reflex/state.py | 292 ++++++++++++----------- tests/integration/test_client_storage.py | 18 +- tests/units/test_state_tree.py | 10 +- 3 files changed, 167 insertions(+), 153 deletions(-) diff --git a/reflex/state.py b/reflex/state.py index 55f29cf45f3..350cd70677f 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -938,7 +938,20 @@ def get_class_substate(cls, path: Sequence[str] | str) -> Type[BaseState]: for substate in cls.get_substates(): if path[0] == substate.get_name(): return substate.get_class_substate(path[1:]) - raise ValueError(f"Invalid path: {path}") + raise ValueError(f"Invalid path: {cls.get_full_name()=} {path=}") + + @classmethod + # @functools.lru_cache() + def get_all_substate_classes(cls) -> set[Type[BaseState]]: + """Get all substate classes of the state. + + Returns: + The set of all substate classes. + """ + substates = set(cls.get_substates()) + for substate in cls.get_substates(): + substates.update(substate.get_all_substate_classes()) + return substates @classmethod def get_class_var(cls, path: Sequence[str]) -> Any: @@ -1393,7 +1406,9 @@ def get_substate(self, path: Sequence[str]) -> BaseState: return self path = path[1:] if path[0] not in self.substates: - raise ValueError(f"Invalid path: {path}") + raise ValueError( + f"Invalid path: {path=} {self.get_full_name()=} {self.substates.keys()=}" + ) return self.substates[path[0]].get_substate(path[1:]) @classmethod @@ -1455,6 +1470,29 @@ def _get_parent_states(self) -> list[tuple[str, BaseState]]: parent_states_with_name.append((parent_state.get_full_name(), parent_state)) return parent_states_with_name + def _get_all_loaded_states(self) -> dict[str, BaseState]: + """Get all loaded states in the state tree. + + Returns: + A list of all loaded states in the state tree. + """ + root_state = self._get_root_state() + d = {root_state.get_full_name(): root_state} + d.update(root_state._get_loaded_substates()) + return d + + def _get_loaded_substates(self) -> dict[str, BaseState]: + """Get all loaded substates of this state. + + Returns: + A list of all loaded substates of this state. + """ + loaded_substates = {} + for substate in self.substates.values(): + loaded_substates[substate.get_full_name()] = substate + loaded_substates.update(substate._get_loaded_substates()) + return loaded_substates + def _get_root_state(self) -> BaseState: """Get the root state of the state tree. @@ -1861,6 +1899,7 @@ def _dirty_computed_vars( if include_backend or not self.computed_vars[cvar]._backend ) + # TODO: just return full name? cache? @classmethod def _potentially_dirty_substates(cls) -> set[Type[BaseState]]: """Determine substates which could be affected by dirty vars in this state. @@ -1882,6 +1921,22 @@ def _potentially_dirty_substates(cls) -> set[Type[BaseState]]: ) return fetch_substates + # TODO: just return full name? cache? + # this only needs to be computed once, and only for the root state? + @classmethod + def _recursive_potentially_dirty_substates(cls) -> set[Type[BaseState]]: + """Recursively determine substates which could be affected by dirty vars in this state. + + Returns: + Set of State classes that may need to be fetched to recalc computed vars. + """ + fetch_substates = cls._potentially_dirty_substates() + for substate_cls in cls.get_substates(): + fetch_substates.update( + substate_cls._recursive_potentially_dirty_substates() + ) + return fetch_substates + def get_delta(self) -> Delta: """Get the delta for the state. @@ -3190,77 +3245,6 @@ class StateManagerRedis(StateManager): b"evicted", } - async def _get_parent_state( - self, token: str, state: BaseState | None = None - ) -> BaseState | None: - """Get the parent state for the state requested in the token. - - Args: - token: The token to get the state for (_substate_key). - state: The state instance to get parent state for. - - Returns: - The parent state for the state requested by the token or None if there is no such parent. - """ - parent_state = None - client_token, state_path = _split_substate_key(token) - parent_state_name = state_path.rpartition(".")[0] - if parent_state_name: - cached_substates = None - if state is not None: - cached_substates = [state] - # Retrieve the parent state to populate event handlers onto this substate. - parent_state = await self.get_state( - token=_substate_key(client_token, parent_state_name), - top_level=False, - get_substates=False, - cached_substates=cached_substates, - ) - return parent_state - - async def _populate_substates( - self, - token: str, - state: BaseState, - all_substates: bool = False, - ): - """Fetch and link substates for the given state instance. - - There is no return value; the side-effect is that `state` will have `substates` populated, - and each substate will have its `parent_state` set to `state`. - - Args: - token: The token to get the state for. - state: The state instance to populate substates for. - all_substates: Whether to fetch all substates or just required substates. - """ - client_token, _ = _split_substate_key(token) - - if all_substates: - # All substates are requested. - fetch_substates = state.get_substates() - else: - # Only _potentially_dirty_substates need to be fetched to recalc computed vars. - fetch_substates = state._potentially_dirty_substates() - - tasks = {} - # Retrieve the necessary substates from redis. - for substate_cls in fetch_substates: - if substate_cls.get_name() in state.substates: - continue - substate_name = substate_cls.get_name() - tasks[substate_name] = asyncio.create_task( - self.get_state( - token=_substate_key(client_token, substate_cls), - top_level=False, - get_substates=all_substates, - parent_state=state, - ) - ) - - for substate_name, substate_task in tasks.items(): - state.substates[substate_name] = await substate_task - @override async def get_state( self, @@ -3268,7 +3252,6 @@ async def get_state( top_level: bool = True, get_substates: bool = True, parent_state: BaseState | None = None, - cached_substates: list[BaseState] | None = None, ) -> BaseState: """Get the state for a token. @@ -3277,7 +3260,6 @@ async def get_state( top_level: If true, return an instance of the top-level state (self.state). get_substates: If true, also retrieve substates. parent_state: If provided, use this parent_state instead of getting it from redis. - cached_substates: If provided, attach these substates to the state. Returns: The state for the token. @@ -3285,8 +3267,8 @@ async def get_state( Raises: RuntimeError: when the state_cls is not specified in the token """ - # Split the actual token from the fully qualified substate name. - _, state_path = _split_substate_key(token) + # new impl from top to bottomA + client_token, state_path = _split_substate_key(token) if state_path: # Get the State class associated with the given path. state_cls = self.state.get_class_substate(state_path) @@ -3295,44 +3277,92 @@ async def get_state( "StateManagerRedis requires token to be specified in the form of {token}_{state_full_name}" ) - # The deserialized or newly created (sub)state instance. - state = None - - # Fetch the serialized substate from redis. - redis_state = await self.redis.get(token) - - if redis_state is not None: - # Deserialize the substate. - with contextlib.suppress(StateSchemaMismatchError): - state = BaseState._deserialize(data=redis_state) - if state is None: - # Key didn't exist or schema mismatch so create a new instance for this token. - state = state_cls( - init_substates=False, - _reflex_internal_init=True, + state_tokens = {state_path} + + # walk up the state path + walk_state_path = state_path + while "." in walk_state_path: + walk_state_path = walk_state_path.rpartition(".")[0] + state_tokens.add(walk_state_path) + + state_tokens.update( + { + substate.get_full_name() + for substate in self.state._recursive_potentially_dirty_substates() + } + ) + if get_substates: + state_tokens.update( + { + substate.get_full_name() + for substate in state_cls.get_all_substate_classes() + } ) - # Populate parent state if missing and requested. - if parent_state is None: - parent_state = await self._get_parent_state(token, state) - # Set up Bidirectional linkage between this state and its parent. + + loaded_states = {} if parent_state is not None: - parent_state.substates[state.get_name()] = state - state.parent_state = parent_state - # Avoid fetching substates multiple times. - if cached_substates: - for substate in cached_substates: - state.substates[substate.get_name()] = substate - if substate.parent_state is None: - substate.parent_state = state - # Populate substates if requested. - await self._populate_substates(token, state, all_substates=get_substates) - - # To retain compatibility with previous implementation, by default, we return - # the top-level state by chasing `parent_state` pointers up the tree. + loaded_states = parent_state._get_all_loaded_states() + # remove all states that are already loaded + state_tokens = state_tokens.difference(loaded_states.keys()) + + redis_states = await self.hmget(name=client_token, keys=list(state_tokens)) + redis_states.update(loaded_states) + root_state = redis_states[self.state.get_full_name()] + self.recursive_link_substates(state=root_state, substates=redis_states) + if top_level: - return state._get_root_state() + return root_state + + state = redis_states[state_path] return state + def recursive_link_substates( + self, + state: BaseState, + substates: dict[str, BaseState], + ): + """Recursively link substates to a state. + + Args: + state: The state to link substates to. + substates: The substates to link. + """ + for substate_cls in state.get_substates(): + if substate_cls.get_full_name() not in substates: + continue + substate = substates[substate_cls.get_full_name()] + state.substates[substate.get_name()] = substate + substate.parent_state = state + self.recursive_link_substates( + state=substate, + substates=substates, + ) + + async def hmget(self, name: str, keys: List[str]) -> dict[str, BaseState]: + """Get multiple values from a hash. + + Args: + name: The name of the hash. + keys: The keys to get. + + Returns: + The values. + """ + d = {} + for state in await self.redis.hmget(name=name, keys=keys): # type: ignore + key = keys.pop(0) + if state is not None: + with contextlib.suppress(StateSchemaMismatchError): + state = BaseState._deserialize(data=state) + if state is None: + state_cls = self.state.get_class_substate(key) + state = state_cls( + init_substates=False, + _reflex_internal_init=True, + ) + d[state.get_full_name()] = state + return d + @override async def set_state( self, @@ -3368,31 +3398,25 @@ async def set_state( f"Cannot `set_state` with mismatching token {token} and substate {state.get_full_name()}." ) - # Recursively set_state on all known substates. - tasks = [] - for substate in state.substates.values(): - tasks.append( - asyncio.create_task( - self.set_state( - token=_substate_key(client_token, substate), - state=substate, - lock_id=lock_id, - ) - ) - ) - # Persist only the given state (parents or substates are excluded by BaseState.__getstate__). - if state._get_was_touched(): - pickle_state = state._serialize() - if pickle_state: - await self.redis.set( - _substate_key(client_token, state), - pickle_state, - ex=self.token_expiration, - ) + redis_hashset = {} + + for state_name, substate in state._get_all_loaded_states().items(): + if not substate._get_was_touched(): + continue + pickle_state = substate._serialize() + if not pickle_state: + continue + redis_hashset[state_name] = pickle_state - # Wait for substates to be persisted. - for t in tasks: - await t + if not redis_hashset: + return + + await self.redis.hmset(name=client_token, mapping=redis_hashset) # type: ignore + await self.redis.hexpire( + client_token, + self.token_expiration, + *redis_hashset.keys(), + ) @override @contextlib.asynccontextmanager diff --git a/tests/integration/test_client_storage.py b/tests/integration/test_client_storage.py index 236d3e14e10..649b236a413 100644 --- a/tests/integration/test_client_storage.py +++ b/tests/integration/test_client_storage.py @@ -11,7 +11,6 @@ from selenium.webdriver.remote.webdriver import WebDriver from reflex.state import ( - State, StateManagerDisk, StateManagerMemory, StateManagerRedis, @@ -278,6 +277,7 @@ def set_sub_sub(var: str, value: str): set_sub_sub_state_button.click() token = poll_for_token() + assert token is not None # get a reference to all cookie and local storage elements c1 = driver.find_element(By.ID, "c1") @@ -613,16 +613,7 @@ def set_sub_sub(var: str, value: str): # Simulate state expiration if isinstance(client_side.state_manager, StateManagerRedis): - await client_side.state_manager.redis.delete( - _substate_key(token, State.get_full_name()) - ) - await client_side.state_manager.redis.delete(_substate_key(token, state_name)) - await client_side.state_manager.redis.delete( - _substate_key(token, sub_state_name) - ) - await client_side.state_manager.redis.delete( - _substate_key(token, sub_sub_state_name) - ) + await client_side.state_manager.redis.delete(token) elif isinstance(client_side.state_manager, (StateManagerMemory, StateManagerDisk)): del client_side.state_manager.states[token] if isinstance(client_side.state_manager, StateManagerDisk): @@ -679,9 +670,8 @@ async def poll_for_not_hydrated(): # Get the backend state and ensure the values are still set async def get_sub_state(): - root_state = await client_side.get_state( - _substate_key(token or "", sub_state_name) - ) + assert token is not None + root_state = await client_side.get_state(_substate_key(token, sub_state_name)) state = root_state.substates[client_side.get_state_name("_client_side_state")] sub_state = state.substates[ client_side.get_state_name("_client_side_sub_state") diff --git a/tests/units/test_state_tree.py b/tests/units/test_state_tree.py index ebdd877de2d..f1d100ff284 100644 --- a/tests/units/test_state_tree.py +++ b/tests/units/test_state_tree.py @@ -354,11 +354,11 @@ async def state_manager_redis( ], ) async def test_get_state_tree( - state_manager_redis, - token, - substate_cls, - exp_root_substates, - exp_root_dict_keys, + state_manager_redis: StateManagerRedis, + token: str, + substate_cls: type[BaseState], + exp_root_substates: list[str], + exp_root_dict_keys: list[str], ): """Test getting state trees and assert on which branches are retrieved. From cfdebb032d85a0febec30bf2dc0ff04cfb25a9c6 Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Sun, 1 Dec 2024 15:33:22 +0100 Subject: [PATCH 02/21] even less redis calls using pipelines --- reflex/state.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/reflex/state.py b/reflex/state.py index 350cd70677f..9b4aa1704fa 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -3411,11 +3411,15 @@ async def set_state( if not redis_hashset: return - await self.redis.hmset(name=client_token, mapping=redis_hashset) # type: ignore - await self.redis.hexpire( - client_token, - self.token_expiration, - *redis_hashset.keys(), + pipe = self.redis.pipeline() + await ( + pipe.hmset(name=client_token, mapping=redis_hashset) + .hexpire( # type: ignore + client_token, + self.token_expiration, + *redis_hashset.keys(), + ) + .execute() ) @override From eb23c0e079e415127052473a564a42bf9919f067 Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Tue, 3 Dec 2024 02:22:50 +0100 Subject: [PATCH 03/21] fix for state schema mismatch --- reflex/state.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/reflex/state.py b/reflex/state.py index 9b4aa1704fa..e073fbfee80 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -3349,11 +3349,12 @@ async def hmget(self, name: str, keys: List[str]) -> dict[str, BaseState]: The values. """ d = {} - for state in await self.redis.hmget(name=name, keys=keys): # type: ignore + for redis_state in await self.redis.hmget(name=name, keys=keys): # type: ignore key = keys.pop(0) - if state is not None: + state = None + if redis_state is not None: with contextlib.suppress(StateSchemaMismatchError): - state = BaseState._deserialize(data=state) + state = BaseState._deserialize(data=redis_state) if state is None: state_cls = self.state.get_class_substate(key) state = state_cls( From 685736261896804497b24a498e70d1d4976bbfb6 Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Tue, 3 Dec 2024 02:41:28 +0100 Subject: [PATCH 04/21] minor performance improvement for _potentially_dirty_substates + tests --- reflex/state.py | 22 +++++++--------------- tests/units/test_state.py | 15 ++++++++++++--- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/reflex/state.py b/reflex/state.py index e073fbfee80..eef1f153b6c 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -1899,36 +1899,33 @@ def _dirty_computed_vars( if include_backend or not self.computed_vars[cvar]._backend ) - # TODO: just return full name? cache? @classmethod - def _potentially_dirty_substates(cls) -> set[Type[BaseState]]: + def _potentially_dirty_substates(cls) -> set[str]: """Determine substates which could be affected by dirty vars in this state. Returns: - Set of State classes that may need to be fetched to recalc computed vars. + Set of State full names that may need to be fetched to recalc computed vars. """ # _always_dirty_substates need to be fetched to recalc computed vars. fetch_substates = set( - cls.get_class_substate((cls.get_name(), *substate_name.split("."))) + f"{cls.get_full_name()}.{substate_name}" for substate_name in cls._always_dirty_substates ) for dependent_substates in cls._substate_var_dependencies.values(): fetch_substates.update( set( - cls.get_class_substate((cls.get_name(), *substate_name.split("."))) + f"{cls.get_full_name()}.{substate_name}" for substate_name in dependent_substates ) ) return fetch_substates - # TODO: just return full name? cache? - # this only needs to be computed once, and only for the root state? @classmethod - def _recursive_potentially_dirty_substates(cls) -> set[Type[BaseState]]: + def _recursive_potentially_dirty_substates(cls) -> set[str]: """Recursively determine substates which could be affected by dirty vars in this state. Returns: - Set of State classes that may need to be fetched to recalc computed vars. + Set of full state names that may need to be fetched to recalc computed vars. """ fetch_substates = cls._potentially_dirty_substates() for substate_cls in cls.get_substates(): @@ -3285,12 +3282,7 @@ async def get_state( walk_state_path = walk_state_path.rpartition(".")[0] state_tokens.add(walk_state_path) - state_tokens.update( - { - substate.get_full_name() - for substate in self.state._recursive_potentially_dirty_substates() - } - ) + state_tokens.update(self.state._recursive_potentially_dirty_substates()) if get_substates: state_tokens.update( { diff --git a/tests/units/test_state.py b/tests/units/test_state.py index 8e61b8dae7f..04c03771536 100644 --- a/tests/units/test_state.py +++ b/tests/units/test_state.py @@ -3135,10 +3135,17 @@ class C1(State): def bar(self) -> str: return "" - assert RxState._potentially_dirty_substates() == {State} - assert State._potentially_dirty_substates() == {C1} + assert RxState._potentially_dirty_substates() == {State.get_full_name()} + assert State._potentially_dirty_substates() == {C1.get_full_name()} assert C1._potentially_dirty_substates() == set() + assert RxState._recursive_potentially_dirty_substates() == { + State.get_full_name(), + C1.get_full_name(), + } + assert State._recursive_potentially_dirty_substates() == {C1.get_full_name()} + assert C1._recursive_potentially_dirty_substates() == set() + def test_router_var_dep() -> None: """Test that router var dependencies are correctly tracked.""" @@ -3159,7 +3166,9 @@ def foo(self) -> str: State._init_var_dependency_dicts() assert foo._deps(objclass=RouterVarDepState) == {"router"} - assert RouterVarParentState._potentially_dirty_substates() == {RouterVarDepState} + assert RouterVarParentState._potentially_dirty_substates() == { + RouterVarDepState.get_full_name() + } assert RouterVarParentState._substate_var_dependencies == { "router": {RouterVarDepState.get_name()} } From f70b6a14ad65ff2124286e8663a4f38618e95211 Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Tue, 3 Dec 2024 02:43:44 +0100 Subject: [PATCH 05/21] cleanup --- reflex/state.py | 1 - 1 file changed, 1 deletion(-) diff --git a/reflex/state.py b/reflex/state.py index eef1f153b6c..039fedb029a 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -941,7 +941,6 @@ def get_class_substate(cls, path: Sequence[str] | str) -> Type[BaseState]: raise ValueError(f"Invalid path: {cls.get_full_name()=} {path=}") @classmethod - # @functools.lru_cache() def get_all_substate_classes(cls) -> set[Type[BaseState]]: """Get all substate classes of the state. From 346755a02d5ed3f6de7f1a7ef7cdcc2bf119c14d Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Mon, 9 Dec 2024 12:19:51 +0100 Subject: [PATCH 06/21] fix: redis hmset is deprecated in favor of hset --- reflex/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reflex/state.py b/reflex/state.py index 039fedb029a..5804bde82f7 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -3405,7 +3405,7 @@ async def set_state( pipe = self.redis.pipeline() await ( - pipe.hmset(name=client_token, mapping=redis_hashset) + pipe.hset(name=client_token, mapping=redis_hashset) .hexpire( # type: ignore client_token, self.token_expiration, From 8d99b7662a7f1edf72150b31a7c0909ace70ea43 Mon Sep 17 00:00:00 2001 From: Masen Furer Date: Thu, 12 Dec 2024 01:40:14 -0800 Subject: [PATCH 07/21] clean up redis initialization remove oxbow code now that REDIS_URL must be a real redis url --- reflex/utils/prerequisites.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/reflex/utils/prerequisites.py b/reflex/utils/prerequisites.py index cc56bdf8834..cd401d8b0a1 100644 --- a/reflex/utils/prerequisites.py +++ b/reflex/utils/prerequisites.py @@ -334,10 +334,9 @@ def get_redis() -> Redis | None: Returns: The asynchronous redis client. """ - if isinstance((redis_url_or_options := parse_redis_url()), str): - return Redis.from_url(redis_url_or_options) - elif isinstance(redis_url_or_options, dict): - return Redis(**redis_url_or_options) + redis_url = parse_redis_url() + if redis_url is not None: + return Redis.from_url(redis_url) return None @@ -347,14 +346,13 @@ def get_redis_sync() -> RedisSync | None: Returns: The synchronous redis client. """ - if isinstance((redis_url_or_options := parse_redis_url()), str): - return RedisSync.from_url(redis_url_or_options) - elif isinstance(redis_url_or_options, dict): - return RedisSync(**redis_url_or_options) + redis_url = parse_redis_url() + if redis_url is not None: + return RedisSync.from_url(redis_url) return None -def parse_redis_url() -> str | dict | None: +def parse_redis_url() -> str | None: """Parse the REDIS_URL in config if applicable. Returns: From 1379bc25103d4c2da667c547f7914bf9a13a4758 Mon Sep 17 00:00:00 2001 From: Masen Furer Date: Thu, 12 Dec 2024 01:42:33 -0800 Subject: [PATCH 08/21] Set top-level hash expiration if HEXPIRE is not supported Continue to support redis < 7.2 Instead of updating individual substate expiration, in older versions of redis any update to any substate will cause the entire state expiration to be refreshed, which is better than not supporting older redis versions. --- reflex/state.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/reflex/state.py b/reflex/state.py index 9564d80370b..14b85d74446 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -3228,6 +3228,9 @@ class StateManagerRedis(StateManager): # The maximum time to hold a lock (ms). lock_expiration: int = pydantic.Field(default_factory=_default_lock_expiration) + # If HEXPIRE is not supported, use EXPIRE instead. + _hexpire_not_supported: bool | None = pydantic.PrivateAttr(None) + # The keyspace subscription string when redis is waiting for lock to be released _redis_notify_keyspace_events: str = ( "K" # Enable keyspace notifications (target a particular key) @@ -3375,6 +3378,7 @@ async def set_state( Raises: LockExpiredError: If lock_id is provided and the lock for the token is not held by that ID. RuntimeError: If the state instance doesn't match the state name in the token. + ResponseError: If the redis command fails. """ # Check that we're holding the lock. if ( @@ -3406,16 +3410,33 @@ async def set_state( if not redis_hashset: return + try: + await self._hset_pipeline(client_token, redis_hashset) + except ResponseError as re: + if "unknown command 'HEXPIRE'" not in str(re): + raise + # HEXPIRE not supported, try again with fallback expire. + self._hexpire_not_supported = True + await self._hset_pipeline(client_token, redis_hashset) + + async def _hset_pipeline(self, client_token: str, redis_hashset: dict[str, bytes]): + """Set multiple fields in a hash with expiration. + + Args: + client_token: The name of the hash. + redis_hashset: The keys and values to set. + """ pipe = self.redis.pipeline() - await ( - pipe.hset(name=client_token, mapping=redis_hashset) - .hexpire( # type: ignore + pipe.hset(name=client_token, mapping=redis_hashset) + if self._hexpire_not_supported: + pipe.expire(client_token, self.token_expiration) + else: + pipe.hexpire( client_token, self.token_expiration, *redis_hashset.keys(), ) - .execute() - ) + await pipe.execute() @override @contextlib.asynccontextmanager From bbddd409c0414e0acaa471873b4e2ab006666cd0 Mon Sep 17 00:00:00 2001 From: Masen Furer Date: Thu, 12 Dec 2024 01:44:48 -0800 Subject: [PATCH 09/21] pyproject.toml: require redis >= 5.1.0 redis-py 5.1.0 adds support for the .hexpire() command --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b63fad3195b..6c5dc596449 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ psutil = ">=5.9.4,<7.0" pydantic = ">=1.10.2,<3.0" python-multipart = ">=0.0.5,<0.1" python-socketio = ">=5.7.0,<6.0" -redis = ">=4.3.5,<6.0" +redis = ">=5.1.0,<6.0" rich = ">=13.0.0,<14.0" sqlmodel = ">=0.0.14,<0.1" typer = ">=0.4.2,<1.0" From 1637ea35524851b3a4d0d08c4bc8a30986f2fa7d Mon Sep 17 00:00:00 2001 From: Masen Furer Date: Thu, 12 Dec 2024 01:45:42 -0800 Subject: [PATCH 10/21] relock poetry.lock --- poetry.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index 7d46e419949..fd46be95afe 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3041,4 +3041,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "3810e99ff4d09952e62d88b2c26651a0d8e0ffe4007bc3274c2fb83b68243951" +content-hash = "ce537f53afeb53d30c4d762e21fe0678546692c7922cab68785a782e0be28ecb" From b9de47f4b3197e0f627bf86cd268155ddff0c928 Mon Sep 17 00:00:00 2001 From: Masen Furer Date: Thu, 12 Dec 2024 02:04:55 -0800 Subject: [PATCH 11/21] py3.9 annotation --- reflex/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reflex/state.py b/reflex/state.py index cf5234789b6..e113f6cb945 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -3252,7 +3252,7 @@ class StateManagerRedis(StateManager): lock_expiration: int = pydantic.Field(default_factory=_default_lock_expiration) # If HEXPIRE is not supported, use EXPIRE instead. - _hexpire_not_supported: bool | None = pydantic.PrivateAttr(None) + _hexpire_not_supported: Optional[bool] = pydantic.PrivateAttr(None) # The keyspace subscription string when redis is waiting for lock to be released _redis_notify_keyspace_events: str = ( From c347dc8e4b038087ef5eb291a6c4213a10fe8686 Mon Sep 17 00:00:00 2001 From: Masen Furer Date: Fri, 13 Dec 2024 10:56:48 -0800 Subject: [PATCH 12/21] Update test_state_manager_lock_warning_threshold_contend expectations Now that all substates are persisted together, there is only 1 warning message expected, not 7 --- tests/units/test_state.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/units/test_state.py b/tests/units/test_state.py index 1e17fc653cb..84a97efb69c 100644 --- a/tests/units/test_state.py +++ b/tests/units/test_state.py @@ -31,6 +31,7 @@ import reflex as rx import reflex.config +import reflex.utils.console from reflex import constants from reflex.app import App from reflex.base import Base @@ -1857,7 +1858,7 @@ async def test_state_manager_lock_warning_threshold_contend( substate_token_redis: A token + substate name for looking up in state manager. mocker: Pytest mocker object. """ - console_warn = mocker.patch("reflex.utils.console.warn") + console_warn = mocker.spy(reflex.utils.console, "warn") state_manager_redis.lock_expiration = LOCK_EXPIRATION state_manager_redis.lock_warning_threshold = LOCK_WARNING_THRESHOLD @@ -1875,7 +1876,7 @@ async def _coro_blocker(): await tasks[0] console_warn.assert_called() - assert console_warn.call_count == 7 + assert console_warn.call_count == 1 class CopyingAsyncMock(AsyncMock): From 84f78076720b868b64815e8dee1dba43d6f46d75 Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Sat, 14 Dec 2024 01:03:05 +0100 Subject: [PATCH 13/21] add basic benchmark for redis state manager --- benchmarks/benchmark_state_manager_redis.py | 309 ++++++++++++++++++++ 1 file changed, 309 insertions(+) create mode 100644 benchmarks/benchmark_state_manager_redis.py diff --git a/benchmarks/benchmark_state_manager_redis.py b/benchmarks/benchmark_state_manager_redis.py new file mode 100644 index 00000000000..e7eb51c6af6 --- /dev/null +++ b/benchmarks/benchmark_state_manager_redis.py @@ -0,0 +1,309 @@ +"""Benchmark for the state manager redis.""" + +import asyncio +from uuid import uuid4 + +import pytest +from pytest_benchmark.fixture import BenchmarkFixture + +from reflex.state import State, StateManagerRedis +from reflex.utils.prerequisites import get_redis +from reflex.vars.base import computed_var + + +class RootState(State): + """Root state class for testing.""" + + counter: int = 0 + int_dict: dict[str, int] = {} + + +class ChildState(RootState): + """Child state class for testing.""" + + child_counter: int = 0 + + @computed_var + def str_dict(self): + """Convert the int dict to a string dict. + + Returns: + A dictionary with string keys and integer values. + """ + return {str(k): v for k, v in self.int_dict.items()} + + +class ChildState2(RootState): + """Child state 2 class for testing.""" + + child2_counter: int = 0 + + +class GrandChildState(ChildState): + """Grandchild state class for testing.""" + + grand_child_counter: int = 0 + + @computed_var + def double_counter(self): + """Double the counter. + + Returns: + The counter value multiplied by 2. + """ + return self.counter * 2 + + +@pytest.fixture +def state_manager() -> StateManagerRedis: + """Fixture for the redis state manager. + + Returns: + An instance of StateManagerRedis. + """ + redis = get_redis() + if redis is None: + pytest.skip("Redis is not available") + return StateManagerRedis(redis=redis, state=State) + + +@pytest.fixture +def token() -> str: + """Fixture for the token. + + Returns: + A unique token string. + """ + return str(uuid4()) + + +@pytest.fixture +def grand_child_state_token(token: str) -> str: + """Fixture for the grand child state token. + + Args: + token: The token fixture. + + Returns: + A string combining the token and the grandchild state name. + """ + return f"{token}_{GrandChildState.get_full_name()}" + + +@pytest.fixture +def base_state_token(token: str) -> str: + """Fixture for the base state token. + + Args: + token: The token fixture. + + Returns: + A string combining the token and the base state name. + """ + return f"{token}_{State.get_full_name()}" + + +@pytest.fixture +def grand_child_state() -> GrandChildState: + """Fixture for the grand child state. + + Returns: + An instance of GrandChildState. + """ + state = State() + + root = RootState() + root.parent_state = state + state.substates[root.get_name()] = root + + child = ChildState() + child.parent_state = root + root.substates[child.get_name()] = child + + child2 = ChildState2() + child2.parent_state = root + root.substates[child2.get_name()] = child2 + + gcs = GrandChildState() + gcs.parent_state = child + child.substates[gcs.get_name()] = gcs + + return gcs + + +@pytest.fixture +def grand_child_state_big(grand_child_state: GrandChildState) -> GrandChildState: + """Fixture for the grand child state with big data. + + Args: + grand_child_state: The grand child state fixture. + + Returns: + An instance of GrandChildState with large data. + """ + grand_child_state.counter = 100 + grand_child_state.child_counter = 200 + grand_child_state.grand_child_counter = 300 + grand_child_state.int_dict = {str(i): i for i in range(10000)} + return grand_child_state + + +def test_set_base_state( + benchmark: BenchmarkFixture, + state_manager: StateManagerRedis, + event_loop: asyncio.AbstractEventLoop, + token: str, +) -> None: + """Benchmark setting state with minimal data. + + Args: + benchmark: The benchmark fixture. + state_manager: The state manager fixture. + event_loop: The event loop fixture. + token: The token fixture. + """ + state = State() + + def func(): + event_loop.run_until_complete(state_manager.set_state(token=token, state=state)) + + benchmark(func) + + +def test_get_base_state( + benchmark: BenchmarkFixture, + state_manager: StateManagerRedis, + event_loop: asyncio.AbstractEventLoop, + base_state_token: str, +) -> None: + """Benchmark getting state with minimal data. + + Args: + benchmark: The benchmark fixture. + state_manager: The state manager fixture. + event_loop: The event loop fixture. + base_state_token: The base state token fixture. + """ + state = State() + event_loop.run_until_complete( + state_manager.set_state(token=base_state_token, state=state) + ) + + def func(): + _ = event_loop.run_until_complete( + state_manager.get_state(token=base_state_token) + ) + + benchmark(func) + + +def test_set_state_tree_minimal( + benchmark: BenchmarkFixture, + state_manager: StateManagerRedis, + event_loop: asyncio.AbstractEventLoop, + grand_child_state_token: str, + grand_child_state: GrandChildState, +) -> None: + """Benchmark setting state with minimal data. + + Args: + benchmark: The benchmark fixture. + state_manager: The state manager fixture. + event_loop: The event loop fixture. + grand_child_state_token: The grand child state token fixture. + grand_child_state: The grand child state fixture. + """ + + def func(): + event_loop.run_until_complete( + state_manager.set_state( + token=grand_child_state_token, state=grand_child_state + ) + ) + + benchmark(func) + + +def test_get_state_tree_minimal( + benchmark: BenchmarkFixture, + state_manager: StateManagerRedis, + event_loop: asyncio.AbstractEventLoop, + grand_child_state_token: str, + grand_child_state: GrandChildState, +) -> None: + """Benchmark getting state with minimal data. + + Args: + benchmark: The benchmark fixture. + state_manager: The state manager fixture. + event_loop: The event loop fixture. + grand_child_state_token: The grand child state token fixture. + grand_child_state: The grand child state fixture. + """ + event_loop.run_until_complete( + state_manager.set_state(token=grand_child_state_token, state=grand_child_state) + ) + + def func(): + _ = event_loop.run_until_complete( + state_manager.get_state(token=grand_child_state_token) + ) + + benchmark(func) + + +def test_set_state_tree_big( + benchmark: BenchmarkFixture, + state_manager: StateManagerRedis, + event_loop: asyncio.AbstractEventLoop, + grand_child_state_token: str, + grand_child_state_big: GrandChildState, +) -> None: + """Benchmark setting state with minimal data. + + Args: + benchmark: The benchmark fixture. + state_manager: The state manager fixture. + event_loop: The event loop fixture. + grand_child_state_token: The grand child state token fixture. + grand_child_state_big: The grand child state fixture. + """ + + def func(): + event_loop.run_until_complete( + state_manager.set_state( + token=grand_child_state_token, state=grand_child_state_big + ) + ) + + benchmark(func) + + +def test_get_state_tree_big( + benchmark: BenchmarkFixture, + state_manager: StateManagerRedis, + event_loop: asyncio.AbstractEventLoop, + grand_child_state_token: str, + grand_child_state_big: GrandChildState, +) -> None: + """Benchmark getting state with minimal data. + + Args: + benchmark: The benchmark fixture. + state_manager: The state manager fixture. + event_loop: The event loop fixture. + grand_child_state_token: The grand child state token fixture. + grand_child_state_big: The grand child state fixture. + """ + event_loop.run_until_complete( + state_manager.set_state( + token=grand_child_state_token, state=grand_child_state_big + ) + ) + + def func(): + _ = event_loop.run_until_complete( + state_manager.get_state(token=grand_child_state_token) + ) + + benchmark(func) From c9db0ceb14c127abbda6e73401e1b2e041cf8aba Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Sat, 14 Dec 2024 01:07:04 +0100 Subject: [PATCH 14/21] ruffing --- reflex/state.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/reflex/state.py b/reflex/state.py index a132bbd5293..9c079d5d0c7 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -1928,16 +1928,16 @@ def _potentially_dirty_substates(cls) -> set[str]: Set of State full names that may need to be fetched to recalc computed vars. """ # _always_dirty_substates need to be fetched to recalc computed vars. - fetch_substates = set( + fetch_substates = { f"{cls.get_full_name()}.{substate_name}" for substate_name in cls._always_dirty_substates - ) + } for dependent_substates in cls._substate_var_dependencies.values(): fetch_substates.update( - set( + { f"{cls.get_full_name()}.{substate_name}" for substate_name in dependent_substates - ) + } ) return fetch_substates From 19d6e699137030a0b1d9b63fb3abdf01bc2908f0 Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Sat, 14 Dec 2024 01:28:44 +0100 Subject: [PATCH 15/21] rename base_state to state --- benchmarks/benchmark_state_manager_redis.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/benchmarks/benchmark_state_manager_redis.py b/benchmarks/benchmark_state_manager_redis.py index e7eb51c6af6..88be85be611 100644 --- a/benchmarks/benchmark_state_manager_redis.py +++ b/benchmarks/benchmark_state_manager_redis.py @@ -91,7 +91,7 @@ def grand_child_state_token(token: str) -> str: @pytest.fixture -def base_state_token(token: str) -> str: +def state_token(token: str) -> str: """Fixture for the base state token. Args: @@ -148,7 +148,7 @@ def grand_child_state_big(grand_child_state: GrandChildState) -> GrandChildState return grand_child_state -def test_set_base_state( +def test_set_state( benchmark: BenchmarkFixture, state_manager: StateManagerRedis, event_loop: asyncio.AbstractEventLoop, @@ -170,11 +170,11 @@ def func(): benchmark(func) -def test_get_base_state( +def test_get_state( benchmark: BenchmarkFixture, state_manager: StateManagerRedis, event_loop: asyncio.AbstractEventLoop, - base_state_token: str, + state_token: str, ) -> None: """Benchmark getting state with minimal data. @@ -182,17 +182,15 @@ def test_get_base_state( benchmark: The benchmark fixture. state_manager: The state manager fixture. event_loop: The event loop fixture. - base_state_token: The base state token fixture. + state_token: The base state token fixture. """ state = State() event_loop.run_until_complete( - state_manager.set_state(token=base_state_token, state=state) + state_manager.set_state(token=state_token, state=state) ) def func(): - _ = event_loop.run_until_complete( - state_manager.get_state(token=base_state_token) - ) + _ = event_loop.run_until_complete(state_manager.get_state(token=state_token)) benchmark(func) From 71adc178fc9355cedd58c185ba0b42dcc261f7e5 Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Sat, 14 Dec 2024 12:30:42 +0100 Subject: [PATCH 16/21] minor performance improvement in _get_loaded_substates --- reflex/state.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/reflex/state.py b/reflex/state.py index 9c079d5d0c7..3097417b6a7 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -1499,20 +1499,21 @@ def _get_all_loaded_states(self) -> dict[str, BaseState]: """ root_state = self._get_root_state() d = {root_state.get_full_name(): root_state} - d.update(root_state._get_loaded_substates()) + root_state._get_loaded_substates(d) return d - def _get_loaded_substates(self) -> dict[str, BaseState]: + def _get_loaded_substates( + self, + loaded_substates: dict[str, BaseState], + ) -> None: """Get all loaded substates of this state. - Returns: - A list of all loaded substates of this state. + Args: + loaded_substates: A dictionary of loaded substates which will be updated with the substates of this state. """ - loaded_substates = {} for substate in self.substates.values(): loaded_substates[substate.get_full_name()] = substate - loaded_substates.update(substate._get_loaded_substates()) - return loaded_substates + substate._get_loaded_substates(loaded_substates) def _get_root_state(self) -> BaseState: """Get the root state of the state tree. From 6d6a7b8db0054ca179937681458206fa2334b474 Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Sat, 14 Dec 2024 12:31:45 +0100 Subject: [PATCH 17/21] naming is hard, rename _get_all_loaded_states to _get_loaded_states --- reflex/state.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/reflex/state.py b/reflex/state.py index 3097417b6a7..2da11045031 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -1491,7 +1491,7 @@ def _get_parent_states(self) -> list[tuple[str, BaseState]]: parent_states_with_name.append((parent_state.get_full_name(), parent_state)) return parent_states_with_name - def _get_all_loaded_states(self) -> dict[str, BaseState]: + def _get_loaded_states(self) -> dict[str, BaseState]: """Get all loaded states in the state tree. Returns: @@ -3354,7 +3354,7 @@ async def get_state( loaded_states = {} if parent_state is not None: - loaded_states = parent_state._get_all_loaded_states() + loaded_states = parent_state._get_loaded_states() # remove all states that are already loaded state_tokens = state_tokens.difference(loaded_states.keys()) @@ -3466,7 +3466,7 @@ async def set_state( redis_hashset = {} - for state_name, substate in state._get_all_loaded_states().items(): + for state_name, substate in state._get_loaded_states().items(): if not substate._get_was_touched(): continue pickle_state = substate._serialize() From 604e96a52d3a327194a7f32a1aef4abec8b781ec Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Sat, 14 Dec 2024 12:41:17 +0100 Subject: [PATCH 18/21] prevent multiple iterations over the state tree when serializing touched states --- reflex/state.py | 43 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/reflex/state.py b/reflex/state.py index 2da11045031..9333893a794 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -1515,6 +1515,39 @@ def _get_loaded_substates( loaded_substates[substate.get_full_name()] = substate substate._get_loaded_substates(loaded_substates) + def _serialize_touched_states(self) -> dict[str, bytes]: + """Serialize all touched states in the state tree. + + Returns: + The serialized states. + """ + root_state = self._get_root_state() + d = {} + if root_state._get_was_touched(): + serialized = root_state._serialize() + if serialized: + d[root_state.get_full_name()] = serialized + root_state._serialize_touched_substates(d) + return d + + def _serialize_touched_substates( + self, + touched_substates: dict[str, bytes], + ) -> None: + """Serialize all touched substates of this state. + + Args: + touched_substates: A dictionary of touched substates which will be updated with the substates of this state. + """ + for substate in self.substates.values(): + substate._serialize_touched_substates(touched_substates) + if not substate._get_was_touched(): + continue + serialized = substate._serialize() + if not serialized: + continue + touched_substates[substate.get_full_name()] = serialized + def _get_root_state(self) -> BaseState: """Get the root state of the state tree. @@ -3464,15 +3497,7 @@ async def set_state( f"Cannot `set_state` with mismatching token {token} and substate {state.get_full_name()}." ) - redis_hashset = {} - - for state_name, substate in state._get_loaded_states().items(): - if not substate._get_was_touched(): - continue - pickle_state = substate._serialize() - if not pickle_state: - continue - redis_hashset[state_name] = pickle_state + redis_hashset = state._serialize_touched_states() if not redis_hashset: return From aa4e32dcd25ac2cf54d71286cf39025f7f3f80ca Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Sat, 14 Dec 2024 14:00:09 +0100 Subject: [PATCH 19/21] minor performance improvement for recursive state dirty substates --- reflex/state.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/reflex/state.py b/reflex/state.py index 9333893a794..884ab853b28 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -1976,16 +1976,24 @@ def _potentially_dirty_substates(cls) -> set[str]: return fetch_substates @classmethod - def _recursive_potentially_dirty_substates(cls) -> set[str]: + def _recursive_potentially_dirty_substates( + cls, + already_selected: Type[BaseState] | None = None, + ) -> set[str]: """Recursively determine substates which could be affected by dirty vars in this state. + Args: + already_selected: The class of the state that has already been selected and needs no further processing. + Returns: Set of full state names that may need to be fetched to recalc computed vars. """ + if already_selected is not None and already_selected == cls: + return set() fetch_substates = cls._potentially_dirty_substates() for substate_cls in cls.get_substates(): fetch_substates.update( - substate_cls._recursive_potentially_dirty_substates() + substate_cls._recursive_potentially_dirty_substates(already_selected) ) return fetch_substates @@ -3376,7 +3384,6 @@ async def get_state( walk_state_path = walk_state_path.rpartition(".")[0] state_tokens.add(walk_state_path) - state_tokens.update(self.state._recursive_potentially_dirty_substates()) if get_substates: state_tokens.update( { @@ -3384,6 +3391,13 @@ async def get_state( for substate in state_cls.get_all_substate_classes() } ) + state_tokens.update( + self.state._recursive_potentially_dirty_substates( + already_selected=state_cls, + ) + ) + else: + state_tokens.update(self.state._recursive_potentially_dirty_substates()) loaded_states = {} if parent_state is not None: From 2720c869a5873dfba4b8b93c9c2a99b2fb76695d Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Sat, 14 Dec 2024 14:05:29 +0100 Subject: [PATCH 20/21] introduce another big dict to the redis state manager benchmark --- benchmarks/benchmark_state_manager_redis.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/benchmarks/benchmark_state_manager_redis.py b/benchmarks/benchmark_state_manager_redis.py index 88be85be611..7690b7001a7 100644 --- a/benchmarks/benchmark_state_manager_redis.py +++ b/benchmarks/benchmark_state_manager_redis.py @@ -43,6 +43,7 @@ class GrandChildState(ChildState): """Grandchild state class for testing.""" grand_child_counter: int = 0 + float_dict: dict[str, float] = {} @computed_var def double_counter(self): @@ -145,6 +146,7 @@ def grand_child_state_big(grand_child_state: GrandChildState) -> GrandChildState grand_child_state.child_counter = 200 grand_child_state.grand_child_counter = 300 grand_child_state.int_dict = {str(i): i for i in range(10000)} + grand_child_state.float_dict = {str(i): i + 0.5 for i in range(10000)} return grand_child_state From ffd99ec00ec866230d77444e7cea17d658d724df Mon Sep 17 00:00:00 2001 From: Benedikt Bartscher Date: Sat, 14 Dec 2024 14:51:20 +0100 Subject: [PATCH 21/21] minor performance improvement, disable redis pipeline transaction --- reflex/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reflex/state.py b/reflex/state.py index 884ab853b28..800b13b79a8 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -3532,7 +3532,7 @@ async def _hset_pipeline(self, client_token: str, redis_hashset: dict[str, bytes client_token: The name of the hash. redis_hashset: The keys and values to set. """ - pipe = self.redis.pipeline() + pipe = self.redis.pipeline(transaction=False) pipe.hset(name=client_token, mapping=redis_hashset) if self._hexpire_not_supported: pipe.expire(client_token, self.token_expiration)