diff --git a/examples/example_decorator.py b/examples/example_decorator.py index b83dec4..e55e100 100644 --- a/examples/example_decorator.py +++ b/examples/example_decorator.py @@ -13,22 +13,23 @@ "timeout": 1, } +box = Meesee(config) -@Meesee.worker() +@box.worker() def func_a(item, worker_id): print('func: {}, worker_id: {}, item: {}'.format('func_a', worker_id, item)) -@Meesee.worker() +@box.worker() def func_b(item, worker_id): print('func: {}, worker_id: {}, item: {}'.format('func_b', worker_id, item)) -@Meesee.worker() +@box.worker() def func_c(item, worker_id): print('func: {}, worker_id: {}, item: {}'.format('func_c', worker_id, item)) if __name__ == '__main__': workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10 - Meesee.start_workers(workers=workers, config=config) + box.start_workers(workers=workers, config=config) diff --git a/meesee.py b/meesee.py index aea25b0..8da61e4 100644 --- a/meesee.py +++ b/meesee.py @@ -92,7 +92,6 @@ def __len__(self): class Meesee: - worker_funcs = {} def __init__(self, workers=10, namespace="main", timeout=None, queue="main", redis_config={}): self.workers = workers @@ -100,6 +99,7 @@ def __init__(self, workers=10, namespace="main", timeout=None, queue="main", red self.timeout = timeout self.queue = queue self.redis_config = redis_config + self.worker_funcs = {} def create_produce_config(self): return { @@ -158,28 +158,25 @@ def wrapper(*args, **kwargs): return wrapper return decorator - @staticmethod - def parse_func_name(func): + def parse_func_name(self, func): return func.__name__ - @classmethod - def worker(cls, queue=None): + def worker(self, queue=None): def decorator(func): - parsed_name = queue if queue is not None else cls.parse_func_name(func) - cls.worker_funcs[parsed_name] = func + parsed_name = queue if queue is not None else self.parse_func_name(func) + self.worker_funcs[parsed_name] = func return func return decorator - @classmethod - def start_workers(cls, workers=10, config=config): - n_workers = len(cls.worker_funcs) + def start_workers(self, workers=10, config=config): + n_workers = len(self.worker_funcs) if n_workers == 0: print("No workers have been assigned with a decorator") if n_workers > workers: print(f"Not enough workers, increasing the workers started with: {workers} we need atleast: {n_workers}") workers = n_workers - startapp(list(cls.worker_funcs.values()), workers=workers, config=config) + startapp(list(self.worker_funcs.values()), workers=workers, config=config) def push_button(self, workers=None, wait=None): if workers is not None: @@ -189,13 +186,13 @@ def push_button(self, workers=None, wait=None): "key": queue, "namespace": self.namespace, "redis_config": self.redis_config, - } for queue in self.__class__.worker_funcs.keys() + } for queue in self.worker_funcs.keys() ] if self.timeout is not None or wait is not None: for config in configs: config["timeout"] = self.timeout or wait - startapp(list(self.__class__.worker_funcs.values()), workers=self.workers, config=configs) + startapp(list(self.worker_funcs.values()), workers=self.workers, config=configs) class InitFail(Exception): diff --git a/meesee_methods_tests.py b/meesee_methods_tests.py deleted file mode 100644 index 453012f..0000000 --- a/meesee_methods_tests.py +++ /dev/null @@ -1,88 +0,0 @@ -import unittest -from meesee import Meesee, config -import redis -import uuid - - -box = Meesee(workers=2, namespace="test1", timeout=2) - - -@box.worker_producer(input_queue="foo", output_queue="foobar") -def foo(item, worker_id): - redis_client = redis.Redis(**config['redis_config']) - key = f"test1:result_test:foo:{worker_id}:{uuid.uuid4()}" - redis_client.set(key, f"foo_processed_{item}") - return [f"foo_processed_{item}"] - - -@box.worker() -def foobar(item, worker_id): - redis_client = redis.Redis(**config['redis_config']) - key = f"test1:result_test:foobar:{worker_id}:{uuid.uuid4()}" - redis_client.set(key, item) - - -class TestMeesee(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.box = box - cls.redis_client = redis.Redis(**config['redis_config']) - - def setUp(self): - self.clean_up_redis() - - def tearDown(self): - self.clean_up_redis() - - def clean_up_redis(self): - patterns = [ - "test1:result_test:foo:*", - "test1:result_test:foobar:*" - ] - for pattern in patterns: - for key in self.redis_client.scan_iter(pattern): - self.redis_client.delete(key) - - def test_produce_to_functionality(self): - expected = ["item1", "item2", "item3"] - - @self.box.produce() - def produce_to_foobar(items): - return items - - produce_to_foobar(expected) - self.box.push_button(workers=5, wait=3) - - results = [] - for key in self.redis_client.scan_iter("test1:result_test:foobar:*"): - value = self.redis_client.get(key).decode('utf-8') - results.append(value) - - self.assertEqual(sorted(results), sorted(expected)) - - def test_worker_producer_functionality(self): - expected = ["item1", "item2", "item3"] - - @self.box.produce(queue="foo") - def produce_to_foo(items): - return items - - produce_to_foo(expected) - self.box.push_button(workers=10, wait=3) - - foo_results = [] - for key in self.redis_client.scan_iter("test1:result_test:foo:*"): - value = self.redis_client.get(key).decode('utf-8') - foo_results.append(value) - - foobar_results = [] - for key in self.redis_client.scan_iter("test1:result_test:foobar:*"): - value = self.redis_client.get(key).decode('utf-8') - foobar_results.append(value) - - self.assertEqual(sorted(foo_results), sorted([f"foo_processed_{item}" for item in expected])) - self.assertEqual(sorted(foobar_results), sorted([f"foo_processed_{item}" for item in expected])) - - -if __name__ == '__main__': - unittest.main() diff --git a/meesee_types_tests.py b/meesee_types_tests.py deleted file mode 100644 index 1d651be..0000000 --- a/meesee_types_tests.py +++ /dev/null @@ -1,138 +0,0 @@ -import unittest -import redis -from meesee import Meesee, config - -box = Meesee(workers=5, namespace="test", timeout=2) - - -@box.worker(queue="strings") -def consume_strings(item, worker_id): - redis_client = redis.Redis(**config['redis_config']) - key = f"result_test:consume_strings:{worker_id}:{redis_client.incr('result_test:consume_strings:counter')}" - redis_client.set(key, item) - - -@box.worker(queue="integers") -def consume_integers(item, worker_id): - redis_client = redis.Redis(**config['redis_config']) - key = f"result_test:consume_integers:{worker_id}:{redis_client.incr('result_test:consume_integers:counter')}" - redis_client.set(key, str(item)) - - -@box.worker(queue="lists") -def consume_lists(item, worker_id): - redis_client = redis.Redis(**config['redis_config']) - key = f"result_test:consume_lists:{worker_id}:{redis_client.incr('result_test:consume_lists:counter')}" - redis_client.set(key, str(item)) - - -@box.worker(queue="dicts") -def consume_dicts(item, worker_id): - redis_client = redis.Redis(**config['redis_config']) - key = f"result_test:consume_dicts:{worker_id}:{redis_client.incr('result_test:consume_dicts:counter')}" - redis_client.set(key, str(item)) - - -class TestMeesee(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.box = box - cls.redis_client = redis.Redis(**config['redis_config']) - - def setUp(self): - self.clean_up_redis() - - def tearDown(self): - self.clean_up_redis() - - def clean_up_redis(self): - patterns = [ - "result_test:consume_dicts:*", - "result_test:consume_strings:*", - "result_test:consume_integers:*", - "result_test:consume_lists:*", - ] - for pattern in patterns: - for key in self.redis_client.scan_iter(pattern): - self.redis_client.delete(key) - - def test_produce_and_consume_strings(self): - expected = ["apple", "banana", "cherry"] - - @self.box.produce(queue="strings") - def produce_strings(): - return expected - - produce_strings() - self.box.push_button(workers=5, wait=1) - - results = [] - for key in self.redis_client.scan_iter("result_test:consume_strings:*"): - if key != b"result_test:consume_strings:counter": - value = self.redis_client.get(key).decode('utf-8') - results.append(value) - - self.assertEqual(sorted(results), sorted(expected)) - - def test_produce_and_consume_integers(self): - expected = [1, 2, 3, 4, 5] - - @self.box.produce(queue="integers") - def produce_integers(): - return expected - - produce_integers() - self.box.push_button(workers=5, wait=1) - - results = [] - for key in self.redis_client.scan_iter("result_test:consume_integers:*"): - if key != b"result_test:consume_integers:counter": - value = int(self.redis_client.get(key)) - results.append(value) - - self.assertEqual(sorted(results), sorted(expected)) - - def test_produce_and_consume_lists(self): - expected = [[1, 2], [3, 4], [5, 6]] - - @self.box.produce(queue="lists") - def produce_lists(): - return expected - - produce_lists() - self.box.push_button(workers=5, wait=1) - - results = [] - for key in self.redis_client.scan_iter("result_test:consume_lists:*"): - if key != b"result_test:consume_lists:counter": - value = eval(self.redis_client.get(key)) - results.append(value) - - sorted_results = sorted(results) - sorted_expected = sorted(expected) - self.assertEqual(sorted_results, sorted_expected) - - def test_produce_and_consume_dicts(self): - expected = [{"a": 1}, {"b": 2}, {"c": 3}] - - @self.box.produce(queue="dicts") - def produce_dicts(): - return expected - - produce_dicts() - self.box.push_button(workers=5, wait=1) - - results = [] - for key in self.redis_client.scan_iter("result_test:consume_dicts:*"): - if key != b"result_test:consume_dicts:counter": - value = self.redis_client.get(key) - results.append(eval(value)) # Convert string back to dict - - sorted_results = sorted(results, key=lambda x: list(x.keys())[0]) - sorted_expected = sorted(expected, key=lambda x: list(x.keys())[0]) - - self.assertEqual(sorted_results, sorted_expected) - - -if __name__ == '__main__': - unittest.main() diff --git a/start_worker_tests.py b/start_worker_tests.py deleted file mode 100644 index eb9909f..0000000 --- a/start_worker_tests.py +++ /dev/null @@ -1,189 +0,0 @@ -import unittest -import sys -import io -import time - -import meesee -from meesee import run_worker - - -def stub_setup_init_items(func_kwargs, init_kwargs): - return {name: func_kwargs[name] for name in init_kwargs.keys()} - - -def stub_init_add(func_kwargs, init_items, init_kwargs): - for name, value in init_items.items(): - if callable(value): - func_kwargs[name] = value() - else: - func_kwargs[name] = value - return func_kwargs - - -class StubRedisQueue: - def __init__(self, **config): - self.config = config - self.items = [] - self.state = "normal" - self.iteration_count = 0 - - def __iter__(self): - return self - - def __next__(self): - if self.state == "exception" and self.iteration_count > 0: - raise Exception("Simulated failure") - if self.iteration_count >= len(self.items): - raise StopIteration - item = self.items[self.iteration_count] - self.iteration_count += 1 - return ("key", item.encode('utf-8')) - - def first_inline_send(self, item): - self.items.append(item) - - -class TestRunWorker(unittest.TestCase): - def setUp(self): - self.func_kwargs = {"arg1": "value1", "arg2": "value2", "init_arg": lambda: "init_value"} - self.init_kwargs = {"init_arg": "init_value"} - self.config = {"key": "test_queue", "timeout": 0.1} - self.original_stdout = sys.stdout - sys.stdout = io.StringIO() - - # Save original implementations - self.original_RedisQueue = meesee.RedisQueue - self.original_setup_init_items = meesee.setup_init_items - self.original_init_add = meesee.init_add - - # Apply patches - meesee.RedisQueue = StubRedisQueue - meesee.setup_init_items = stub_setup_init_items - meesee.init_add = stub_init_add - - def tearDown(self): - - sys.stdout = self.original_stdout - - # Restore original implementations - meesee.RedisQueue = self.original_RedisQueue - meesee.setup_init_items = self.original_setup_init_items - meesee.init_add = self.original_init_add - - def test_normal_execution(self): - def stub_func(item, worker_id, **kwargs): - return f"Processed: {item}" - - StubRedisQueue.items = ["item1", "item2"] - StubRedisQueue.state = "normal" - run_worker(stub_func, self.func_kwargs, None, self.config, 1, self.init_kwargs) - - output = sys.stdout.getvalue() - self.assertIn("worker 1 started", output) - self.assertIn("listening to test_queue", output) - self.assertIn("timeout reached worker 1 stopped", output) - - def test_init_fail(self): - def stub_func(item, worker_id, **kwargs): - return "This should not be reached" - - def failing_init(): - raise Exception("Init failure") - - self.func_kwargs["init_arg"] = failing_init - StubRedisQueue.items = ["item1"] - StubRedisQueue.state = "normal" - run_worker(stub_func, self.func_kwargs, None, self.config, 1, self.init_kwargs) - - output = sys.stdout.getvalue() - self.assertIn("worker 1 failed reason Init failure", output) - - def test_keyboard_interrupt(self): - def stub_func(item, worker_id, **kwargs): - raise KeyboardInterrupt() - - StubRedisQueue.items = ["item1"] - StubRedisQueue.state = "normal" - run_worker(stub_func, self.func_kwargs, None, self.config, 1, self.init_kwargs) - - output = sys.stdout.getvalue() - self.assertIn("worker 1 stopped", output) - - def test_general_exception(self): - def stub_func(item, worker_id, **kwargs): - return f"Processed: {item}" - - def stub_on_failure_func(item, e, r, worker_id): - sys.stdout.write(f"Failure handled for item: {item}\n") - - StubRedisQueue.items = ["item1", "item2"] - StubRedisQueue.state = "exception" - StubRedisQueue.iteration_count = 0 - self.config["timeout"] = 2 - run_worker(stub_func, self.func_kwargs, stub_on_failure_func, self.config, 1, self.init_kwargs) - self.config["timeout"] = 0.1 - - output = sys.stdout.getvalue() - self.assertIn("worker 1 started", output) - self.assertIn("stub_func listening to test_queue", output) - self.assertIn("timeout reached worker 1 stopped", output) - - def test_timeout(self): - def stub_func(item, worker_id, **kwargs): - # Sleep longer than the timeout - time.sleep(0.2) - return f"Processed: {item}" - - StubRedisQueue.items = ["item1", "item2"] - StubRedisQueue.state = "timeout" - run_worker(stub_func, self.func_kwargs, None, self.config, 1, self.init_kwargs) - - output = sys.stdout.getvalue() - self.assertIn("timeout reached worker 1 stopped", output) - - def test_multiple_funcs_and_configs(self): - def func_a(item, worker_id, **kwargs): - return f"func_a processed {item} with worker {worker_id}" - - def func_b(item, worker_id, **kwargs): - return f"func_b processed {item} with worker {worker_id}" - - def func_c(item, worker_id, **kwargs): - return f"func_c processed {item} with worker {worker_id}" - - funcs = [func_a, func_b, func_c] - - config_a = {"key": "queue_a", "timeout": 0.1} - config_b = {"key": "queue_b", "timeout": 0.1} - config_c = {"key": "queue_c", "timeout": 0.1} - - configs = [config_a, config_b, config_c] - - def stub_on_failure_func(item, e, r, worker_id): - sys.stdout.write(f"Failure handled for item: {item} on worker {worker_id}\n") - - StubRedisQueue.items = ["item1", "item2"] - StubRedisQueue.state = "normal" - - # Reset stdout capture for each iteration - sys.stdout = io.StringIO() - - for worker_id in range(5): - run_worker(funcs, self.func_kwargs, stub_on_failure_func, configs, worker_id, self.init_kwargs) - - output = sys.stdout.getvalue() - expected_func = funcs[worker_id % len(funcs)].__name__ - expected_queue = configs[worker_id % len(configs)]["key"] - - self.assertIn(f"worker {worker_id} started", output) - self.assertIn(f"{expected_func} listening to {expected_queue}", output) - self.assertIn("timeout reached worker", output) - - if "item1" in output: - self.assertIn(f"{expected_func} processed item1 with worker {worker_id}", output) - if "item2" in output: - self.assertIn(f"{expected_func} processed item2 with worker {worker_id}", output) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/meesee_function_tests.py b/tests/meesee_function_tests.py new file mode 100644 index 0000000..7cf8efd --- /dev/null +++ b/tests/meesee_function_tests.py @@ -0,0 +1,69 @@ +import redis +import uuid +import json + +import unittest + +from unittest.mock import patch, MagicMock +from meesee import Meesee, config + +class TestWorkerProducerLineCoverage(unittest.TestCase): + def setUp(self): + self.box = Meesee(workers=10, namespace="test", timeout=2) + + @patch('meesee.RedisQueue') + @patch('redis.Redis') + def test_worker_producer_line_coverage(self, mock_redis, mock_redis_queue): + + @self.box.worker_producer(input_queue="foo", output_queue="foobar") + def test_func_both_queues(input_data): + return input_data + + @self.box.worker_producer(input_queue="bar") + def test_func_input_queue(input_data): + return input_data + + @self.box.worker_producer(output_queue="baz") + def test_func_output_queue(input_data): + return input_data + + @self.box.worker_producer() + def produce_to_qux(input_data): + return input_data + + @self.box.worker_producer() + def test_func_list(input_data): + return [input_data, {"key": "value"}] + + @self.box.worker_producer() + def test_func_dict(input_data): + return {"key": input_data} + + @self.box.worker_producer() + def test_func_list_with_dict(input_data): + return input_data + + @self.box.worker_producer() + def test_func_none(input_data): + return None + + test_func_both_queues("test_data") + test_func_input_queue("test_data") + test_func_output_queue("test_data") + produce_to_qux("test_data") + test_func_list("test_data") + test_func_dict("test_data") + test_func_none("test_data") + + test_func_list_with_dict([{"key1": "value1"}, {"key2": "value2"}]) + + mock_redis_queue.assert_called() + mock_redis_queue.return_value.send.assert_called() + self.assertIn("foo", self.box.worker_funcs) + self.assertIn("bar", self.box.worker_funcs) + self.assertIn("produce_to_qux", self.box.worker_funcs) + + mock_redis_queue.return_value.send.assert_any_call(json.dumps({"key": "test_data"})) + +if __name__ == '__main__': + unittest.main()