Skip to content

Commit

Permalink
Added collector version with example
Browse files Browse the repository at this point in the history
  • Loading branch information
Attumm committed Aug 18, 2024
1 parent 2986827 commit 823377f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 28 deletions.
58 changes: 30 additions & 28 deletions examples/example_decorator_magic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import sys
import json
import time

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

Expand All @@ -9,40 +11,40 @@
box = Meesee()


@box.worker()
def foobar(item, worker_id):
print('func: foobar, worker_id: {}, item: {}'.format(worker_id, item))


@box.worker()
def name_of_the_function(item, worker_id):
print('func: name_of_the_function, worker_id: {}, item: {}'.format(worker_id, item))


@box.worker(queue="passed_name")
def passed_name_not_this_one(item, worker_id):
print('func: passed_name_not_this_one, worker_id: {}, item: {}'.format(worker_id, item))


@box.produce(queue="passed_name")
def produce_some_items(amount):
yield from range(amount)


@box.produce()
def produce_to_foo(items):
def produce_to_process_data(items):
return items


@box.worker_producer(input_queue="foo", output_queue="foobar")
def foo(item, worker_id):
print(f"{worker_id} {item} foo pass it too foobar")
@box.worker_producer(output_queue="foobar")
def process_data(item, worker_id):
item = json.loads(item)
wait = item["wait"]
print(f"{worker_id} processing: {item} for {wait} seconds and send it too foobar")
item["name"] = f"{item['name']}_processed"
time.sleep(wait)
return [item,]


@box.collector(wait=1, until=5)
def foobar(items):
return items


if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
produce_some_items(10)
items = [{"name": f"name{i}"} for i in range(10)]
produce_to_foo(items)
box.push_button(workers, wait=1)
wait = int(sys.argv[sys.argv.index('--wait') + 1]) if '--wait' in sys.argv else 5
items = [{"name": f"name{i}", "wait": wait} for i in range(10)]
print(f"sending {len(items)} tasks to {workers} workers")
print(f"simulate processing with with a wait of {wait}")
start = time.time()

produce_to_process_data(items)
box.push_button(workers, wait=0.1)

result = foobar()
print(result)
print("-----")
result = foobar()
print(result)
print(f"done with running took: {round(time.time()- start, 2)}")
20 changes: 20 additions & 0 deletions meesee.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,26 @@ def decorator(func):
return func
return decorator

def collector(self, wait=10, until=float('inf'), queue=None):
def decorator(func):
def wrapper(*args, **kwargs):
parsed_name = queue if queue is not None else self.parse_func_name(func)
config = {
"key": parsed_name,
"namespace": self.namespace,
"timeout": kwargs.pop("wait", None) or wait,
"redis_config": self.redis_config,
}
r = RedisQueue(**config)
results = []
for _, item in r:
results.append(item.decode('utf-8'))
if len(results) >= until:
break
return func(results)
return wrapper
return decorator

def start_workers(self, workers=10, config=config):
n_workers = len(self._worker_funcs)
if n_workers == 0:
Expand Down

0 comments on commit 823377f

Please sign in to comment.