Skip to content

Commit

Permalink
Merge pull request spotify#1471 from Tarrasch/fix-email-colors
Browse files Browse the repository at this point in the history
notifications: More emails with proper coloring
  • Loading branch information
Tarrasch committed Jan 5, 2016
2 parents 3ac680f + f93d614 commit 4460b98
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 23 deletions.
30 changes: 17 additions & 13 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def run(self):
elif status == DONE:
self.task.trigger_event(
Event.PROCESSING_TIME, self.task, time.time() - t0)
expl = json.dumps(self.task.on_success())
expl = self.task.on_success()
logger.info('[pid %s] Worker %s done %s', os.getpid(),
self.worker_id, self.task)
self.task.trigger_event(Event.SUCCESS, self.task)
Expand All @@ -181,19 +181,12 @@ def run(self):
logger.exception("[pid %s] Worker %s failed %s", os.getpid(), self.worker_id, self.task)
self.task.trigger_event(Event.FAILURE, self.task, ex)
raw_error_message = self.task.on_failure(ex)
expl = json.dumps(raw_error_message)
self._send_error_notification(raw_error_message)
expl = raw_error_message

finally:
self.result_queue.put(
(self.task.task_id, status, expl, missing, new_deps))

def _send_error_notification(self, raw_error_message):
subject = "Luigi: %s FAILED" % self.task
notification_error_message = notifications.wrap_traceback(raw_error_message)
formatted_error_message = notifications.format_task_error(subject, self.task,
formatted_exception=notification_error_message)
notifications.send_error_email(subject, formatted_error_message, self.task.owner_email)

def _recursive_terminate(self):
import psutil

Expand Down Expand Up @@ -492,6 +485,12 @@ def _email_unexpected_error(self, task, formatted_traceback):
headline="Luigi framework error",
)

def _email_task_failure(self, task, formatted_traceback):
self._email_error(task, formatted_traceback,
subject="Luigi: {task} FAILED. Host: {host}",
headline="A task failed when running. Most likely run() raised an exception.",
)

def _email_error(self, task, formatted_traceback, subject, headline):
formatted_subject = subject.format(task=task, host=self.host)
message = notifications.format_task_error(headline, task, formatted_traceback)
Expand Down Expand Up @@ -723,10 +722,10 @@ def _purge_children(self):
"""
for task_id, p in six.iteritems(self._running_tasks):
if not p.is_alive() and p.exitcode:
error_msg = 'Worker task %s died unexpectedly with exit code %s' % (task_id, p.exitcode)
error_msg = 'Task %s died unexpectedly with exit code %s' % (task_id, p.exitcode)
elif p.timeout_time is not None and time.time() > float(p.timeout_time) and p.is_alive():
p.terminate()
error_msg = 'Worker task %s timed out and was terminated.' % task_id
error_msg = 'Task %s timed out and was terminated.' % task_id
else:
continue

Expand Down Expand Up @@ -757,6 +756,11 @@ def _handle_next_task(self):
continue
# Not a running task. Probably already removed.
# Maybe it yielded something?

if status == FAILED and expl:
# If no expl, it is because of a retry-external-task failure.
self._email_task_failure(task, expl)

new_deps = []
if new_requirements:
new_req = [load_task(module, name, params)
Expand All @@ -768,7 +772,7 @@ def _handle_next_task(self):
self._add_task(worker=self._id,
task_id=task_id,
status=status,
expl=expl,
expl=json.dumps(expl),
resources=task.process_resources(),
runnable=None,
params=task.to_str_params(),
Expand Down
9 changes: 6 additions & 3 deletions test/worker_task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import sys

from helpers import LuigiTestCase
import json
import mock
from psutil import Process
from time import sleep
Expand Down Expand Up @@ -56,6 +55,8 @@ def f():
class TaskProcessTest(LuigiTestCase):

def test_update_result_queue_on_success(self):
# IMO this test makes no sense as it tests internal behavior and have
# already broken once during internal non-changing refactoring
class SuccessTask(luigi.Task):
def on_success(self):
return "test success expl"
Expand All @@ -66,9 +67,11 @@ def on_success(self):

with mock.patch.object(result_queue, 'put') as mock_put:
task_process.run()
mock_put.assert_called_once_with((task.task_id, DONE, json.dumps("test success expl"), [], None))
mock_put.assert_called_once_with((task.task_id, DONE, "test success expl", [], None))

def test_update_result_queue_on_failure(self):
# IMO this test makes no sense as it tests internal behavior and have
# already broken once during internal non-changing refactoring
class FailTask(luigi.Task):
def run(self):
raise BaseException("Uh oh.")
Expand All @@ -82,7 +85,7 @@ def on_failure(self, exception):

with mock.patch.object(result_queue, 'put') as mock_put:
task_process.run()
mock_put.assert_called_once_with((task.task_id, FAILED, json.dumps("test failure expl"), [], []))
mock_put.assert_called_once_with((task.task_id, FAILED, "test failure expl", [], []))

def test_cleanup_children_on_terminate(self):
"""
Expand Down
42 changes: 35 additions & 7 deletions test/worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,13 +708,12 @@ def custom_email_patch(config):
return functools.partial(email_patch, email_config=config)


class WorkerEmailTest(unittest.TestCase):
class WorkerEmailTest(LuigiTestCase):

def run(self, result=None):
super(WorkerEmailTest, self).setUp()
sch = CentralPlannerScheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10)
with Worker(scheduler=sch, worker_id="foo") as worker:
self.worker = worker
with Worker(scheduler=sch, worker_id="foo") as self.worker:
super(WorkerEmailTest, self).run(result)

@email_patch
Expand Down Expand Up @@ -786,10 +785,6 @@ def complete(self):
@email_patch
def test_run_error(self, emails):
class A(luigi.Task):

def complete(self):
return False

def run(self):
raise Exception("b0rk")

Expand All @@ -799,6 +794,39 @@ def run(self):
self.worker.run()
self.assertTrue(emails[0].find("Luigi: %s FAILED" % (a,)) != -1)

@email_patch
def test_task_process_dies(self, emails):
a = SuicidalWorker(signal.SIGKILL)
luigi.build([a], workers=2, local_scheduler=True)
self.assertTrue(emails[0].find("Luigi: %s FAILED" % (a,)) != -1)
self.assertTrue(emails[0].find("died unexpectedly with exit code -9") != -1)

@email_patch
def test_task_times_out(self, emails):
class A(luigi.Task):
worker_timeout = 0.00001

def run(self):
time.sleep(5)

a = A()
luigi.build([a], workers=2, local_scheduler=True)
self.assertTrue(emails[0].find("Luigi: %s FAILED" % (a,)) != -1)
self.assertTrue(emails[0].find("timed out and was terminated.") != -1)

@with_config(dict(worker=dict(retry_external_tasks='true')))
@email_patch
def test_external_task_retries(self, emails):
"""
Test that we do not send error emails on the failures of external tasks
"""
class A(luigi.ExternalTask):
pass

a = A()
luigi.build([a], workers=2, local_scheduler=True)
self.assertEqual(emails, [])

@email_patch
def test_no_error(self, emails):
class A(DummyTask):
Expand Down

0 comments on commit 4460b98

Please sign in to comment.