Skip to content

Commit

Permalink
Fix typos
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper committed Jun 18, 2024
1 parent bfa7dfa commit f5a3d6a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
12 changes: 6 additions & 6 deletions brq/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class Consumer(DeferOperator, RunnableMixin):
enable_reprocess_timeout_job(bool, default=True): Whether to enable reprocess timeout job. If not, this consumer won't reprocess timeout jobs, jobs with expired timeout will be moved to dead queue(if enabled).
enable_dead_queue(bool, default=True): Whether to enable dead queue. If not, this consumer won't move expired jobs to dead queue but just delete them.
max_message_len(int, default=1000): The maximum length of a message. Follow redis stream `maxlen`.
delete_messgae_after_process(bool, default=False): Whether to delete message after process. If many consumer groups are used, this should be set to False.
delete_message_after_process(bool, default=False): Whether to delete message after process. If many consumer groups are used, this should be set to False.
run_parallel(bool, default=False): Whether to run in parallel.
"""

Expand All @@ -109,7 +109,7 @@ def __init__(
enable_reprocess_timeout_job: bool = True,
enable_dead_queue: bool = True,
max_message_len: int = 1000,
delete_messgae_after_process: bool = False,
delete_message_after_process: bool = False,
run_parallel: bool = False,
):
super().__init__(redis, redis_prefix, redis_seperator)
Expand Down Expand Up @@ -138,7 +138,7 @@ def __init__(
self.enable_reprocess_timeout_job = enable_reprocess_timeout_job
self.enable_dead_queue = enable_dead_queue
self.max_message_len = max_message_len
self.delete_messgae_after_process = delete_messgae_after_process
self.delete_message_after_process = delete_message_after_process
self.run_parallel = run_parallel

@property
Expand Down Expand Up @@ -250,7 +250,7 @@ async def _process_unacked_job(self):
logger.info(f"Retry {job} successfully")
await self.redis.xack(self.stream_name, self.group_name, message_id)

if self.delete_messgae_after_process:
if self.delete_message_after_process:
await self.redis.xdel(self.stream_name, message_id)

async def _pool_job(self):
Expand All @@ -274,7 +274,7 @@ async def _pool_job(self):
else:
await self.redis.xack(self.stream_name, self.group_name, message_id)

if self.delete_messgae_after_process:
if self.delete_message_after_process:
await self.redis.xdel(self.stream_name, message_id)

async def _pool_job_prallel(self):
Expand Down Expand Up @@ -308,7 +308,7 @@ async def _job_wrap(message_id, *args, **kwargs):
continue

await self.redis.xack(self.stream_name, self.group_name, message_id)
if self.delete_messgae_after_process:
if self.delete_message_after_process:
await self.redis.xdel(self.stream_name, message_id)

async def _acquire_retry_lock(self) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion brq/defer_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async def count_stream(self, function_name: str):
"""
Count stream length
If Consumer's `delete_messgae_after_process` if False, will include already processed messages
If Consumer's `delete_message_after_process` if False, will include already processed messages
"""
stream_name = self.get_stream_name(function_name)
return await self.redis.xlen(stream_name)
Expand Down

0 comments on commit f5a3d6a

Please sign in to comment.