Skip to content

Commit

Permalink
Add dead queue feature swtich (#9)
Browse files Browse the repository at this point in the history
* Add dead queue feature swtich

* Add docstring

* Add test against this feature swtich

* Add warning on expire and timeout
  • Loading branch information
Wh1isper authored Jun 13, 2024
1 parent 51c07b2 commit beda3a7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
16 changes: 14 additions & 2 deletions brq/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class Consumer(DeferOperator, RunnableMixin):
redis_prefix(str, default="brq"): The prefix of redis key.
redis_seperator(str, default=":"): The seperator of redis key.
enable_enque_deferred_job(bool, default=True): Whether to enable enque deferred job. If not, this consumer won't enque deferred jobs.
enable_reprocess_timeout_job(bool, default=True): Whether to enable reprocess timeout job. If not, this consumer won't reprocess timeout jobs, jobs with expired timeout will be moved to dead queue(if enabled).
enable_dead_queue(bool, default=True): Whether to enable dead queue. If not, this consumer won't move expired jobs to dead queue but just delete them.
max_message_len(int, default=1000): The maximum length of a message. Follow redis stream `maxlen`.
delete_messgae_after_process(bool, default=False): Whether to delete message after process. If many consumer groups are used, this should be set to False.
run_parallel(bool, default=False): Whether to run in parallel.
Expand All @@ -105,6 +107,7 @@ def __init__(
redis_seperator: str = ":",
enable_enque_deferred_job: bool = True,
enable_reprocess_timeout_job: bool = True,
enable_dead_queue: bool = True,
max_message_len: int = 1000,
delete_messgae_after_process: bool = False,
run_parallel: bool = False,
Expand All @@ -122,11 +125,18 @@ def __init__(
self.block_time = block_time
self.expire_time = expire_time
self.process_timeout = process_timeout

if self.expire_time < self.process_timeout:
logger.warning(
f"expire_time({self.expire_time}) < process_timeout({self.process_timeout}), will causes no job retried but moved dead queue(if enabled)."
)

self.retry_lock_time = retry_lock_time
self.retry_cooldown = retry_cooldown

self.enable_enque_deferred_job = enable_enque_deferred_job
self.enable_reprocess_timeout_job = enable_reprocess_timeout_job
self.enable_dead_queue = enable_dead_queue
self.max_message_len = max_message_len
self.delete_messgae_after_process = delete_messgae_after_process
self.run_parallel = run_parallel
Expand Down Expand Up @@ -199,9 +209,11 @@ async def _move_expired_jobs(self):
# Fix (None, None) for redis 6.x
continue
job = Job.from_message(serialized_job)
await self.redis.zadd(self.dead_key, {job.to_redis(): job.create_at})
logger.info(f"Put expired job {job} to dead queue")
if self.enable_dead_queue:
await self.redis.zadd(self.dead_key, {job.to_redis(): job.create_at})
logger.info(f"Put expired job {job} to dead queue")
await self.redis.xdel(self.stream_name, message_id)
logger.debug(f"{job} expired")

async def _process_unacked_job(self):
if not self.enable_reprocess_timeout_job:
Expand Down
18 changes: 14 additions & 4 deletions tests/test_brq.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,15 @@ async def test_unprocessed_job(async_redis_client, capfd):
await consumer.cleanup()


async def test_process_dead_jobs(async_redis_client, capfd):
@pytest.mark.parametrize("enable_dead_queue", [True, False])
async def test_process_dead_jobs(async_redis_client, capfd, enable_dead_queue):
producer = Producer(async_redis_client)
consumer = Consumer(async_redis_client, mock_consume_raise_exception, expire_time=0.001)
consumer = Consumer(
async_redis_client,
mock_consume_raise_exception,
expire_time=0.001,
enable_dead_queue=enable_dead_queue,
)

await producer.run_job("mock_consume_raise_exception", ["hello"])
await consumer.initialize()
Expand All @@ -95,11 +101,15 @@ async def test_process_dead_jobs(async_redis_client, capfd):
assert "hello" not in out
await asyncio.sleep(0.1)
await consumer._move_expired_jobs()
assert await producer.count_dead_messages("mock_consume_raise_exception") == 1
if enable_dead_queue:
assert await producer.count_dead_messages("mock_consume_raise_exception") == 1
else:
assert await producer.count_dead_messages("mock_consume_raise_exception") == 0
await consumer.process_dead_jobs()

out, err = capfd.readouterr()
assert "hello" in out
if enable_dead_queue:
assert "hello" in out
await consumer.cleanup()


Expand Down

0 comments on commit beda3a7

Please sign in to comment.