Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bumped version v1.6.0 #25

Merged
merged 4 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions examples/example_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@

from meesee import Meesee # noqa: E402

config = {
"namespace": "removeme",
"key": "tasks",
"redis_config": {},
"maxsize": 100,
"timeout": 1,
}

box = Meesee(config)
box = Meesee()


@box.worker()
Expand All @@ -33,4 +25,4 @@ def func_c(item, worker_id):

if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
box.start_workers(workers=workers, config=config)
box.push_button(workers=workers)
9 changes: 1 addition & 8 deletions examples/example_decorator_magic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,7 @@
from meesee import Meesee # noqa: E402


config = {
"namespace": "removeme",
"key": "tasks", "redis_config": {},
"maxsize": 100,
"timeout": 1,
}

box = Meesee(config)
box = Meesee()


@box.worker()
Expand Down
44 changes: 44 additions & 0 deletions examples/example_decorator_produce_to_multiple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os
import sys

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from meesee import Meesee # noqa: E402

box = Meesee()


@box.produce_to()
def produce_multi(items):
return items


@box.worker()
def foo1(item, worker_id):
print(f"{worker_id} {item} foo1")
return [item,]


@box.worker()
def foo2(item, worker_id):
print(f"{worker_id} {item} foo2")
return [item,]


@box.worker()
def foo3(item, worker_id):
print(f"{worker_id} {item} foo3")
return [item,]


if __name__ == '__main__':
items = [
("foo1", "item1"),
("foo2", "item2"),
("foo3", "item3"),
("foo1", "item4"),
("foo2", "item5"),
("foo3", "item6"),
]
produce_multi(items)
box.push_button(wait=1)
65 changes: 58 additions & 7 deletions meesee.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(self, workers=10, namespace="main", timeout=None, queue="main", red
self.timeout = timeout
self.queue = queue
self.redis_config = redis_config
self.worker_funcs = {}
self._worker_funcs = {}

def create_produce_config(self):
return {
Expand Down Expand Up @@ -135,7 +135,7 @@ def wrapper(*args, **kwargs):

return result
parsed_name = input_queue if input_queue is not None else self.parse_func_name(func)
self.worker_funcs[parsed_name] = wrapper
self._worker_funcs[parsed_name] = wrapper

return wrapper
return decorator
Expand All @@ -158,25 +158,76 @@ def wrapper(*args, **kwargs):
return wrapper
return decorator

def produce_to(self):
"""
Produce items to be sent to specific queues.
Send items to its corresponding queue using a RedisQueue.

The decorated function should yield tuples of (queue_name, item_value).

Example:
@box.produce_to()
def produce_multi(items):
return items

items = [
("foo1", "item1"),
("foo2", "item2"),
("foo3", "item3"),
("foo1", "item4"),
("foo2", "item5"),
("foo3", "item6"),
]
produce_multi(items)

In this example:
- Each tuple in the `items` list represents a (queue, value) pair.
- The first element of each tuple ("foo1", "foo2", "foo3") is the queue name.
- The second element of each tuple ("item1", "item2", etc.) is the value to be sent to the queue.

The decorator will process these items as follows:
1. "item1" will be sent to the "foo1" queue
2. "item2" will be sent to the "foo2" queue
3. "item3" will be sent to the "foo3" queue
4. "item4" will be sent to the "foo1" queue
5. "item5" will be sent to the "foo2" queue
6. "item6" will be sent to the "foo3" queue

Notes:
- If an item is a list or dict, it will be JSON-encoded before being sent to the queue.
"""
def decorator(func):
def wrapper(*args, **kwargs):
config = self.create_produce_config()
redis_queue = RedisQueue(**config)

for queue, item in func(*args, **kwargs):
if isinstance(item, (list, dict)):
item = json.dumps(item)
redis_queue.send_to(queue, item)

return wrapper
return decorator

def parse_func_name(self, func):
return func.__name__

def worker(self, queue=None):
def decorator(func):
parsed_name = queue if queue is not None else self.parse_func_name(func)
self.worker_funcs[parsed_name] = func
self._worker_funcs[parsed_name] = func
return func
return decorator

def start_workers(self, workers=10, config=config):
n_workers = len(self.worker_funcs)
n_workers = len(self._worker_funcs)
if n_workers == 0:
sys.stdout.write("No workers have been assigned with a decorator\n")
if n_workers > workers:
sys.stdout.write(f"Not enough workers, increasing the workers started with: {workers} we need atleast: {n_workers}\n")
workers = n_workers

startapp(list(self.worker_funcs.values()), workers=workers, config=config)
startapp(list(self._worker_funcs.values()), workers=workers, config=config)

def push_button(self, workers=None, wait=None):
if workers is not None:
Expand All @@ -186,13 +237,13 @@ def push_button(self, workers=None, wait=None):
"key": queue,
"namespace": self.namespace,
"redis_config": self.redis_config,
} for queue in self.worker_funcs.keys()
} for queue in self._worker_funcs.keys()
]
if self.timeout is not None or wait is not None:
for config in configs:
config["timeout"] = self.timeout or wait

startapp(list(self.worker_funcs.values()), workers=self.workers, config=configs)
startapp(list(self._worker_funcs.values()), workers=self.workers, config=configs)


class InitFail(Exception):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
long_description=long_description,
long_description_content_type='text/markdown',

version='1.5.0',
version='1.6.0',
py_modules=['meesee'],
install_requires=['redis==4.5.5'],
python_requires='>3.5',
Expand Down
93 changes: 83 additions & 10 deletions tests/mock_function_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ def test_func_none(input_data):

mock_redis_queue.assert_called()
mock_redis_queue.return_value.send.assert_called()
self.assertIn("foo", self.box.worker_funcs)
self.assertIn("bar", self.box.worker_funcs)
self.assertIn("produce_to_qux", self.box.worker_funcs)
self.assertIn("foo", self.box._worker_funcs)
self.assertIn("bar", self.box._worker_funcs)
self.assertIn("produce_to_qux", self.box._worker_funcs)

mock_redis_queue.return_value.send.assert_any_call(json.dumps({"key": "test_data"}))

Expand All @@ -76,7 +76,7 @@ def setUp(self):
@patch('meesee.startapp')
@patch('sys.stdout.write')
def test_start_workers_no_workers(self, mock_stdout_write, mock_startapp):
self.box.worker_funcs = {}
self.box._worker_funcs = {}
self.box.start_workers()
mock_stdout_write.assert_called_once_with("No workers have been assigned with a decorator\n")
mock_startapp.assert_called_once_with(
Expand All @@ -88,38 +88,38 @@ def test_start_workers_no_workers(self, mock_stdout_write, mock_startapp):
@patch('meesee.startapp')
@patch('sys.stdout.write')
def test_start_workers_enough_workers(self, mock_stdout_write, mock_startapp):
self.box.worker_funcs = {'worker1': MagicMock(), 'worker2': MagicMock()}
self.box._worker_funcs = {'worker1': MagicMock(), 'worker2': MagicMock()}
self.box.start_workers(workers=3)
mock_stdout_write.assert_not_called()
mock_startapp.assert_called_once_with(
list(self.box.worker_funcs.values()),
list(self.box._worker_funcs.values()),
workers=3,
config=config,
)

@patch('meesee.startapp')
@patch('sys.stdout.write')
def test_start_workers_not_enough_workers(self, mock_stdout_write, mock_startapp):
self.box.worker_funcs = {'worker1': MagicMock(), 'worker2': MagicMock(), 'worker3': MagicMock()}
self.box._worker_funcs = {'worker1': MagicMock(), 'worker2': MagicMock(), 'worker3': MagicMock()}
self.box.start_workers(workers=2)
mock_stdout_write.assert_called_once_with(
"Not enough workers, increasing the workers started with: 2 we need atleast: 3\n"
)
mock_startapp.assert_called_once_with(
list(self.box.worker_funcs.values()),
list(self.box._worker_funcs.values()),
workers=3,
config=config,
)

@patch('meesee.startapp')
@patch('sys.stdout.write')
def test_start_workers_custom_config(self, mock_stdout_write, mock_startapp):
self.box.worker_funcs = {'worker1': MagicMock()}
self.box._worker_funcs = {'worker1': MagicMock()}
custom_config = {'custom': 'config'}
self.box.start_workers(workers=1, config=custom_config)
mock_stdout_write.assert_not_called()
mock_startapp.assert_called_once_with(
list(self.box.worker_funcs.values()),
list(self.box._worker_funcs.values()),
workers=1,
config=custom_config
)
Expand Down Expand Up @@ -364,5 +364,78 @@ def test_len(self):
self.assertEqual(len(self.queue), 5)


class TestProduceToDecorator(unittest.TestCase):
def setUp(self):
self.box = Meesee(workers=5, namespace="test", timeout=2)

@patch('meesee.RedisQueue')
def test_produce_to_decorator(self, mock_redis_queue):
# Mock the RedisQueue instance
mock_queue_instance = MagicMock()
mock_redis_queue.return_value = mock_queue_instance

# Define a function decorated with produce_to
@self.box.produce_to()
def produce_multi(items):
return items

# Test data
items = [
("foo1", "item1"),
("foo2", {"key": "item2"}),
("foo3", ["item3", "item3b"]),
("foo1", "item4"),
("foo2", "item5"),
("foo3", "item6"),
]

# Call the decorated function
produce_multi(items)

# Assertions
self.assertEqual(mock_redis_queue.call_count, 1)
self.assertEqual(mock_queue_instance.send_to.call_count, len(items))

# Check if send_to was called with correct arguments for each item
expected_calls = [
(("foo1", "item1")),
(("foo2", json.dumps({"key": "item2"}))),
(("foo3", json.dumps(["item3", "item3b"]))),
(("foo1", "item4")),
(("foo2", "item5")),
(("foo3", "item6")),
]

for (queue, item), result in zip(expected_calls, mock_queue_instance.send_to.call_args_list):
self.assertEqual(result[0][0], queue)
self.assertEqual(result[0][1], item)

@patch('meesee.RedisQueue')
def test_produce_to_with_custom_function(self, mock_redis_queue):
mock_queue_instance = MagicMock()
mock_redis_queue.return_value = mock_queue_instance

@self.box.produce_to()
def custom_produce():
yield "queue1", "item1"
yield "queue2", {"key": "item2"}
yield "queue3", ["item3", "item3b"]

custom_produce()

self.assertEqual(mock_redis_queue.call_count, 1)
self.assertEqual(mock_queue_instance.send_to.call_count, 3)

expected_calls = [
(("queue1", "item1")),
(("queue2", json.dumps({"key": "item2"}))),
(("queue3", json.dumps(["item3", "item3b"]))),
]

for (queue, item), result in zip(expected_calls, mock_queue_instance.send_to.call_args_list):
self.assertEqual(result[0][0], queue)
self.assertEqual(result[0][1], item)


if __name__ == '__main__':
unittest.main()