Skip to content

Commit

Permalink
Fix race condition between Celery and database transactions (#74)
Browse files Browse the repository at this point in the history
* move all db ops to the celery task when enqueuing messages
* add log message to track message creation
* allow sending queued messages to keep compatibility with previous yubin version
  • Loading branch information
sastred authored Jan 9, 2024
1 parent 5c3aebf commit efcd7e9
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 49 deletions.
1 change: 1 addition & 0 deletions django_yubin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def queue_email_message(email_message, fail_silently=False):
subject=email_message.subject,
message_data=email_message.message().as_string(),
storage=settings.MAILER_STORAGE_BACKEND)
message.add_log("Message created")

if message.enqueue('Enqueued from a Backend or django-yubin itself.'):
return 1
Expand Down
14 changes: 9 additions & 5 deletions django_yubin/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


@transaction.atomic
def send_db_message(message_pk):
def send_db_message(message_pk, log_message=None):
"""
Sends a django_yubin.models.Message by its PK.
"""
Expand All @@ -28,12 +28,16 @@ def send_db_message(message_pk):
logger.exception(msg, extra={'message_pk': message_pk})
return False

# Messages in STATUS_QUEUED can be sent to keep compatibility with previous yubin version.
# In future versions that condition can be removed and only check `can_be_enqueued()`.
if message.status != models.Message.STATUS_QUEUED:
msg = "Message is not in queue status, ignoring the email."
logger.warning(msg)
message.add_log(msg)
return False
if not message.can_be_enqueued():
msg = "Message can not be enqueued in it's current status."
logger.warning(msg)
message.add_log(msg)
return False

message.mark_as(models.Message.STATUS_QUEUED, log_message)
message.mark_as(models.Message.STATUS_IN_PROCESS, "Trying to send the message.")

recipients = message.recipients()
Expand Down
24 changes: 5 additions & 19 deletions django_yubin/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
from email import policy
from email import encoders as Encoders
from email.mime.base import MIMEBase
from functools import partial

from django.core.exceptions import FieldError
from django.core.mail.message import (
ADDRESS_HEADERS,
EmailMessage,
EmailMultiAlternatives,
)
from django.db import models
from django.db import models, transaction
from django.db.models import F
from django.utils.module_loading import import_string
from django.utils.text import Truncator
Expand Down Expand Up @@ -240,29 +241,14 @@ def can_be_enqueued(self):

def enqueue(self, log_message=None):
"""
Sends the task to enqueue itself taking care of undoing changes if the delivery fails.
Sends the task to enqueue the message on commit.
"""
if not self.can_be_enqueued():
self.add_log("Message can not be enqueued in it's current status")
return False

backup = {
'date_enqueued': self.date_enqueued,
'enqueued_count': self.enqueued_count,
'status': self.status,
}
self.mark_as(self.STATUS_QUEUED, log_message)

try:
tasks.send_email.delay(self.pk)
return True
except Exception as e:
self.date_enqueued = backup['date_enqueued']
self.enqueued_count = backup['enqueued_count']
self.status = backup['status']
self.save()
self.add_log('Error enqueuing email: {}'.format(e))
return False
transaction.on_commit(partial(tasks.send_email.delay, message_pk=self.pk, log_message=log_message))
return True

@classmethod
def retry_messages(cls, max_retries=3):
Expand Down
4 changes: 2 additions & 2 deletions django_yubin/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@


@shared_task()
def send_email(message_pk):
def send_email(message_pk, log_message=None):
"""
Send an email from a database Message PK.
"""
from .engine import send_db_message
return send_db_message(message_pk)
return send_db_message(message_pk, log_message)


@shared_task()
Expand Down
16 changes: 13 additions & 3 deletions tests/tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,26 @@ class TestSendDBMessage(MessageMixin, TestCase):
Tests engine function that sends db messages.
"""
def setUp(self):
self.message = self.create_message(status=Message.STATUS_QUEUED)
self.message = self.create_message(status=Message.STATUS_CREATED)

def test_send_email_not_found(self):
self.assertFalse(send_db_message(-1))

def test_send_email_not_queued(self):
self.message.status = Message.STATUS_SENT
def test_send_email_not_queueable(self):
self.message.status = Message.STATUS_IN_PROCESS
self.message.save()
self.assertFalse(send_db_message(self.message.pk))

last_log_action = self.message.log_set.first().action
self.assertEqual(last_log_action, Message.STATUS_IN_PROCESS)

def test_send_email_queued(self):
# Messages in STATUS_QUEUED can be sent to keep compatibility with previous yubin version.
# In future versions that condition can be removed with this test.
self.message.status = Message.STATUS_QUEUED
self.message.save()
self.assertTrue(send_db_message(self.message.pk))

last_log_action = self.message.log_set.first().action
self.assertEqual(last_log_action, Message.STATUS_SENT)

Expand Down
36 changes: 16 additions & 20 deletions tests/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from django.test import TestCase
from django.utils import timezone

from django_yubin import tasks
from django_yubin.models import Message

from .base import MessageMixin
Expand Down Expand Up @@ -51,11 +52,12 @@ def test_mark_as_queued(self):
self.assertEqual(self.message.enqueued_count, enqueued_count+1)

def test_enqueue_wrong_status(self):
self.message.mark_as(Message.STATUS_QUEUED)
self.message.mark_as(Message.STATUS_IN_PROCESS)
self.assertFalse(self.message.enqueue())

@patch("django_yubin.tasks.send_email.delay", side_effect=Exception)
def test_enqueue_exception(self, send_email_mock):
def test_enqueue_exception(self):
self.message.status = Message.STATUS_SENT
self.message.save()
backup = {
'date_enqueued': self.message.date_enqueued,
'enqueued_count': self.message.enqueued_count,
Expand All @@ -66,10 +68,11 @@ def test_enqueue_exception(self, send_email_mock):
self.assertEqual(self.message.enqueued_count, backup["enqueued_count"])
self.assertEqual(self.message.status, backup["status"])

@patch("django_yubin.tasks.send_email.delay")
def test_enqueue_ok(self, send_email_mock):
self.assertTrue(self.message.enqueue())
self.assertEqual(self.message.status, Message.STATUS_QUEUED)
def test_enqueue_ok(self):
with self.captureOnCommitCallbacks() as callbacks:
self.assertTrue(self.message.enqueue())
self.assertEqual(len(callbacks), 1)
self.assertEqual(callbacks[0].func, tasks.send_email.delay)

def test_retry_messages_none(self):
enqueued, failed = Message.retry_messages()
Expand All @@ -88,13 +91,6 @@ def test_retry_messages_max_retries(self):
enqueued, failed = Message.retry_messages()
self.assertEqual((enqueued, failed), (0, 0))

@patch("django_yubin.tasks.send_email.delay", side_effect=Exception('Mock exception'))
def test_retry_messages_enqueue_failed(self, send_mail_mock):
self.message.status = Message.STATUS_FAILED
self.message.save()
enqueued, failed = Message.retry_messages()
self.assertEqual((enqueued, failed), (0, 1))

def test_delete_old(self):
days = 7
message = self.create_message()
Expand Down Expand Up @@ -203,12 +199,12 @@ def reset_mock():

def test_email_with_long_subject(self):
email_message = EmailMessage(
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor "
"incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco "
"laboris nisi ut aliquip ex ea commodo consequat.",
"Message body",
'[email protected]',
['[email protected]']
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor "
"incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud "
"exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.",
"Message body",
'[email protected]',
['[email protected]']
)

message = Message.objects.create(
Expand Down

0 comments on commit efcd7e9

Please sign in to comment.