Skip to content

Commit

Permalink
Improve code and rename protect func (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper authored Jun 18, 2024
1 parent 454c48d commit 3ae2c21
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 16 deletions.
8 changes: 7 additions & 1 deletion brq/defer_operator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from typing import Any

import redis.asyncio as redis

Expand Down Expand Up @@ -50,7 +51,7 @@ async def enque_deferred_job(self, function_name: str, maxlen: int = 1000):
if elements:
logger.info(f"Enqueued deferred jobs: {elements}")

async def remove_deferred_job(
async def _remove_deferred_job(
self,
function_name: str,
job: Job,
Expand Down Expand Up @@ -162,3 +163,8 @@ async def count_unacked_jobs(self, function_name: str, group_name: str = "defaul
async def count_dead_messages(self, function_name: str):
dead_key = self.get_dead_message_key(function_name)
return await self.redis.zcard(dead_key)

async def emit_deferred_job(self, function_name: str, defer_until: int, job: Job):
defer_key = self.get_deferred_key(function_name)
await self.redis.zadd(defer_key, {job.to_redis(): defer_until})
return job
28 changes: 13 additions & 15 deletions brq/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,50 +87,48 @@ async def run_job(
logger.info(
f"Deferring job: {function_name} until {datetime.fromtimestamp(defer_until / 1000)}"
)
job = await self._emit_deferred_job(
job = await self.run_deferred_job(
function_name,
defer_until,
args,
kwargs,
)
else:
logger.info(f"Scheduling job: {function_name}")
job = await self._emit_job(function_name, args, kwargs)
job = await self._run_job(function_name, args, kwargs)

logger.info(f"Job created: {job}")
return job

async def _emit_deferred_job(
self,
function_name: str,
defer_until: int,
args: list[Any] = None,
kwargs: dict[str, Any] = None,
async def _run_job(
self, function_name: str, args: list[Any] = None, kwargs: dict[str, Any] = None
) -> Job:
defer_key = self.get_deferred_key(function_name)
stream_name = self.get_stream_name(function_name)
created_at = await self.get_current_timestamp_ms(self.redis)

job = Job(
args=args or [],
kwargs=kwargs or {},
create_at=created_at,
)
await self.redis.zadd(defer_key, {job.to_redis(): defer_until})
await self.redis.xadd(stream_name, job.to_message(), maxlen=self.max_message_len)
return job

async def _emit_job(
self, function_name: str, args: list[Any] = None, kwargs: dict[str, Any] = None
async def run_deferred_job(
self,
function_name: str,
defer_until: int,
args: list[Any] = None,
kwargs: dict[str, Any] = None,
) -> Job:
stream_name = self.get_stream_name(function_name)
created_at = await self.get_current_timestamp_ms(self.redis)

job = Job(
args=args or [],
kwargs=kwargs or {},
create_at=created_at,
)
await self.redis.xadd(stream_name, job.to_message(), maxlen=self.max_message_len)
return job
return await self.emit_deferred_job(function_name, defer_until, job)

async def prune(self, function_name: str):
"""
Expand Down

0 comments on commit 3ae2c21

Please sign in to comment.