From 5de0129360d934011d7f07c40b4d914e0fba13f3 Mon Sep 17 00:00:00 2001 From: Martin Macko Date: Tue, 21 Oct 2014 21:33:08 +0200 Subject: [PATCH] poleno.timewarp unittests --- manage.py | 4 - poleno/timewarp/__init__.py | 9 + poleno/timewarp/admin.py | 5 +- .../timewarp/management/commands/timewarp.py | 2 +- poleno/timewarp/tests/__init__.py | 0 poleno/timewarp/tests/test_admin.py | 98 +++++ poleno/timewarp/tests/test_management.py | 99 +++++ poleno/timewarp/tests/test_timewarp.py | 353 ++++++++++++++++++ poleno/timewarp/timewarp.py | 245 ++++++------ poleno/utils/tests/test_date.py | 2 - 10 files changed, 700 insertions(+), 117 deletions(-) create mode 100644 poleno/timewarp/tests/__init__.py create mode 100644 poleno/timewarp/tests/test_admin.py create mode 100644 poleno/timewarp/tests/test_management.py create mode 100644 poleno/timewarp/tests/test_timewarp.py diff --git a/manage.py b/manage.py index 8ab6d999d..40577ecbf 100644 --- a/manage.py +++ b/manage.py @@ -5,10 +5,6 @@ if __name__ == "__main__": os.environ.setdefault("DJANGO_SETTINGS_MODULE", "chcemvediet.settings") - # Timewarp is automaticaly disabled if settings.DEBUG is not True. - from poleno.timewarp import timewarp - timewarp.init() - from django.core.management import execute_from_command_line execute_from_command_line(sys.argv) diff --git a/poleno/timewarp/__init__.py b/poleno/timewarp/__init__.py index e69de29bb..f2ab665e8 100644 --- a/poleno/timewarp/__init__.py +++ b/poleno/timewarp/__init__.py @@ -0,0 +1,9 @@ +# vim: expandtab +# -*- coding: utf-8 -*- +from django.core.exceptions import ImproperlyConfigured +from django.conf import settings + +from .timewarp import timewarp + +assert settings.DEBUG, u'Timewarp may NOT be enabled if settings.DEBUG is False.' +timewarp.enable() diff --git a/poleno/timewarp/admin.py b/poleno/timewarp/admin.py index b4acb1cd4..05f6d7fc3 100644 --- a/poleno/timewarp/admin.py +++ b/poleno/timewarp/admin.py @@ -27,9 +27,12 @@ def index(request): timewarp.jump(jumpto, speedup) return HttpResponseRedirect(reverse(u'admin:timewarp')) - if button == u'reset': + elif button == u'reset': timewarp.reset() return HttpResponseRedirect(reverse(u'admin:timewarp')) + + else: # invalid button + form = forms.WarpForm() else: form = forms.WarpForm() diff --git a/poleno/timewarp/management/commands/timewarp.py b/poleno/timewarp/management/commands/timewarp.py index 761812aa2..8530af026 100644 --- a/poleno/timewarp/management/commands/timewarp.py +++ b/poleno/timewarp/management/commands/timewarp.py @@ -73,7 +73,7 @@ def handle(self, *args, **options): else: raise CommandError(u'Invalid date: "%s".' % joined) else: - # Notice that this datetime may already be warped. + # Remember this datetime may already be warped. date = datetime.datetime.now() delta = relativedelta(**delta_options) diff --git a/poleno/timewarp/tests/__init__.py b/poleno/timewarp/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/poleno/timewarp/tests/test_admin.py b/poleno/timewarp/tests/test_admin.py new file mode 100644 index 000000000..b5a5d1914 --- /dev/null +++ b/poleno/timewarp/tests/test_admin.py @@ -0,0 +1,98 @@ +# vim: expandtab +# -*- coding: utf-8 -*- +import datetime + +from django.core.urlresolvers import reverse +from django.http import HttpResponse, HttpResponseRedirect +from django.contrib.auth.models import User +from django.test import TestCase + +from ..timewarp import timewarp + +class TimewarpAdminTest(TestCase): + u""" + Tests timewarp admin views. Only checks if the views retrieve and update ``timewarp`` + attributes correctly. Timewarp functionality is tested in ``test_timewarp.py``. + """ + + def setUp(self): + timewarp.enable() + timewarp.reset() + + self.admin = User.objects.create_superuser(username=u'admin', email=u'admin@example.com', password=u'top_secret') + self.client.login(username=u'admin', password=u'top_secret') + + def tearDown(self): + timewarp.reset() + + + def _parse_dt(self, value): + dt = datetime.datetime.strptime(value, u'%Y-%m-%d %H:%M:%S') + return dt + + def _check_dt(self, value, expected): + self.assertEqual(repr(type(value)), u"") + self.assertRegexpMatches(value.strftime(u'%Y-%m-%d %H:%M:%S.%f'), expected) + + def _check_ts(self, value, expected): + dt = datetime.datetime.fromtimestamp(value) + self._check_dt(dt, expected) + + def _check_response(self, response, template, klass=HttpResponse, status_code=200): + self.assertIs(type(response), klass) + self.assertEqual(response.status_code, status_code) + self.assertTemplateUsed(response, template) + + + def test_head(self): + response = self.client.head(reverse(u'admin:timewarp')) + self._check_response(response, u'timewarp/timewarp.html') + + def test_get(self): + response = self.client.get(reverse(u'admin:timewarp')) + self._check_response(response, u'timewarp/timewarp.html') + + def test_post_reset(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05'), speed=2) + self.assertTrue(timewarp.is_warped) + response = self.client.post(reverse(u'admin:timewarp'), data={u'button': u'reset'}, follow=True) + self.assertRedirects(response, reverse(u'admin:timewarp')) + self._check_response(response, u'timewarp/timewarp.html') + self.assertFalse(timewarp.is_warped) + + def test_post_jump_with_jumpto_and_speedup(self): + response = self.client.post(reverse(u'admin:timewarp'), data={u'button': u'jump', u'jumpto': u'2014-10-03', u'speedup': 3}, follow=True) + self.assertRedirects(response, reverse(u'admin:timewarp')) + self._check_response(response, u'timewarp/timewarp.html') + self._check_ts(timewarp.warped_to, u'2014-10-03 00:00:00.000000') + self.assertEqual(timewarp.speedup, 3) + + def test_post_jump_with_speedup_only(self): + response = self.client.post(reverse(u'admin:timewarp'), data={u'button': u'jump', u'speedup': 3}, follow=True) + self.assertRedirects(response, reverse(u'admin:timewarp')) + self._check_response(response, u'timewarp/timewarp.html') + self.assertAlmostEqual(timewarp.warped_from, timewarp.warped_to, places=2) + self.assertEqual(timewarp.speedup, 3) + + def test_post_jump_without_jumpto_nor_speedup(self): + response = self.client.post(reverse(u'admin:timewarp'), data={u'button': u'jump'}, follow=True) + self.assertRedirects(response, reverse(u'admin:timewarp')) + self._check_response(response, u'timewarp/timewarp.html') + self.assertFalse(timewarp.is_warped) + + def test_post_jump_with_invalid_jumpto(self): + response = self.client.post(reverse(u'admin:timewarp'), data={u'button': u'jump', u'jumpto': u'2014-10-xx'}, follow=True) + self._check_response(response, u'timewarp/timewarp.html') + self.assertFormError(response, u'form', u'jumpto', u'Enter a valid date/time.') + self.assertFalse(timewarp.is_warped) + + def test_post_jump_with_invalid_speedup(self): + response = self.client.post(reverse(u'admin:timewarp'), data={u'button': u'jump', u'speedup': u'invalid'}, follow=True) + self._check_response(response, u'timewarp/timewarp.html') + self.assertFormError(response, u'form', u'speedup', u'Enter a number.') + self.assertFalse(timewarp.is_warped) + + def test_post_with_invalid_button(self): + response = self.client.post(reverse(u'admin:timewarp'), data={u'button': u'invalid'}, follow=True) + self._check_response(response, u'timewarp/timewarp.html') + self.assertFalse(timewarp.is_warped) diff --git a/poleno/timewarp/tests/test_management.py b/poleno/timewarp/tests/test_management.py new file mode 100644 index 000000000..0c8bf80f0 --- /dev/null +++ b/poleno/timewarp/tests/test_management.py @@ -0,0 +1,99 @@ +# vim: expandtab +# -*- coding: utf-8 -*- +import sys +import datetime +from StringIO import StringIO + +from django.core import management +from django.core.management.base import CommandError +from django.test import TestCase + +from ..timewarp import timewarp + +class TimewarpManagementTest(TestCase): + u""" + Tests timewarp management command ``timewarp``. Only checks if the management command sets all + ``timewarp`` attributes correctly. Timewarp functionality is tested in ``test_timewarp.py``. + """ + + def setUp(self): + timewarp.enable() + timewarp.reset() + + def tearDown(self): + timewarp.reset() + + + def _parse_dt(self, value): + dt = datetime.datetime.strptime(value, u'%Y-%m-%d %H:%M:%S') + return dt + + def _check_dt(self, value, expected): + self.assertEqual(repr(type(value)), u"") + self.assertRegexpMatches(value.strftime(u'%Y-%m-%d %H:%M:%S.%f'), expected) + + def _check_ts(self, value, expected): + dt = datetime.datetime.fromtimestamp(value) + self._check_dt(dt, expected) + + + def _call_timewarp(self, *args, **kwargs): + try: + orig_stdout = sys.stdout + sys.stdout = StringIO() + management.call_command(u'timewarp', *args, **kwargs) + sys.stdout.seek(0) + return sys.stdout.read() + finally: + sys.stdout = orig_stdout + + + def test_without_arguments(self): + self._call_timewarp() + self.assertFalse(timewarp.is_warped) + + def test_reset(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05'), speed=2) + self.assertTrue(timewarp.is_warped) + self._call_timewarp(reset=True) + self.assertFalse(timewarp.is_warped) + + def test_jump_yyyy_mm_dd_hh_mm_ss(self): + self._call_timewarp(u'2014-10-03 14:40:05') + self._check_ts(timewarp.warped_to, u'2014-10-03 14:40:05.000000') + + def test_jump_yyyy_mm_dd_hh_mm(self): + self._call_timewarp(u'2014-10-03 14:40') + self._check_ts(timewarp.warped_to, u'2014-10-03 14:40:00.000000') + + def test_jump_yyyy_mm_dd(self): + self._call_timewarp(u'2014-10-03') + self._check_ts(timewarp.warped_to, u'2014-10-03 00:00:00.000000') + + def test_jump_yyyy_mm_dd_and_hh_mm_ss(self): + self._call_timewarp(u'2014-10-03', u'14:40:05') + self._check_ts(timewarp.warped_to, u'2014-10-03 14:40:05.000000') + + def test_jump_invalid_date(self): + with self.assertRaisesMessage(CommandError, u'Invalid date: "2014-10-03 :40:05".'): + self._call_timewarp(u'2014-10-03 :40:05') + + def test_only_delta_options(self): + self._call_timewarp( + year=2014, month=10, day=6, hour=10, minute=34, second=44, + years=-3, months=-5, weeks=+1, days=+3, hours=-3, minutes=-10, seconds=+5, + weekday=3) + self._check_ts(timewarp.warped_to, u'2011-05-19 07:24:49.000000') + + def test_delta_options_with_date(self): + self._call_timewarp(u'2014-10-03', weekday=1, hours=10) + self._check_ts(timewarp.warped_to, u'2014-10-07 10:00:00.000000') + + def test_speedup(self): + self._call_timewarp(speedup=4) + self.assertEqual(timewarp.speedup, 4) + + def test_jump_with_delta_options_and_speedup_together(self): + self._call_timewarp(u'2014-10-03', weekday=1, hours=10, speedup=4) + self._check_ts(timewarp.warped_to, u'2014-10-07 10:00:00.000000') + self.assertEqual(timewarp.speedup, 4) diff --git a/poleno/timewarp/tests/test_timewarp.py b/poleno/timewarp/tests/test_timewarp.py new file mode 100644 index 000000000..4cf098c42 --- /dev/null +++ b/poleno/timewarp/tests/test_timewarp.py @@ -0,0 +1,353 @@ +# vim: expandtab +# -*- coding: utf-8 -*- +import time +import datetime +import pickle + +from django.utils import timezone +from django.test import TestCase + +from ..timewarp import timewarp + +class TimewarpTest(TestCase): + u""" + Tests ``Timewarp`` singleton and ``_WarpedTime`` and ``_WarpedDatetime`` fake ``time`` and + ``datetime`` modules. Checks that the fake ``time`` and ``datetime`` modules track warped time + and adjust all module functions correctly. + """ + + def setUp(self): + timewarp.enable() + timewarp.reset() + + def tearDown(self): + timewarp.reset() + + + def _parse_dt(self, value): + dt = datetime.datetime.strptime(value, u'%Y-%m-%d %H:%M:%S') + return dt + + def _parse_tt(self, value): + dt = self._parse_dt(value) + tt = dt.timetuple() + return tt + + def _parse_ts(self, value): + tt = self._parse_tt(value) + ts = time.mktime(tt) + return ts + + def _check_dt(self, value, expected): + self.assertEqual(repr(type(value)), u"") + self.assertRegexpMatches(value.strftime(u'%Y-%m-%d %H:%M:%S.%f'), expected) + + def _check_ts(self, value, expected): + dt = datetime.datetime.fromtimestamp(value) + self._check_dt(dt, expected) + + def _check_date(self, value, expected): + self.assertEqual(repr(type(value)), u"") + self.assertEqual(value.strftime(u'%Y-%m-%d'), expected) + + + # ``_WarpedTime`` class + def test_time_asctime(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = time.asctime() + expected = timewarp._time_orig.asctime(self._parse_tt(u'2014-10-03 14:40:05')) + self.assertEqual(value, expected) + + def test_time_asctime_with_argument(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = time.asctime(time.localtime(1234556)) + expected = timewarp._time_orig.asctime(timewarp._time_orig.localtime(1234556)) + self.assertEqual(value, expected) + + def test_time_ctime(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = time.ctime() + expected = timewarp._time_orig.ctime(self._parse_ts(u'2014-10-03 14:40:05')) + self.assertEqual(value, expected) + + def test_time_ctime_with_argument(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = time.ctime(1234556) + expected = timewarp._time_orig.ctime(1234556) + self.assertEqual(value, expected) + + def test_time_gmtime(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = time.gmtime() + expected = timewarp._time_orig.gmtime(self._parse_ts(u'2014-10-03 14:40:05')) + self.assertEqual(value, expected) + + def test_time_gmtime_with_argument(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = time.gmtime(1234556) + expected = timewarp._time_orig.gmtime(1234556) + self.assertEqual(value, expected) + + def test_time_localtime(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = time.localtime() + expected = timewarp._time_orig.localtime(self._parse_ts(u'2014-10-03 14:40:05')) + self.assertEqual(value, expected) + + def test_time_localtime_with_argument(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = time.localtime(1234556) + expected = timewarp._time_orig.localtime(1234556) + self.assertEqual(value, expected) + + def test_time_strftime(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = time.strftime(u'%c') + expected = timewarp._time_orig.strftime(u'%c', self._parse_tt(u'2014-10-03 14:40:05')) + self.assertEqual(value, expected) + + def test_time_strftime_with_argument(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = time.strftime(u'%c', time.localtime(1234556)) + expected = timewarp._time_orig.strftime(u'%c', timewarp._time_orig.localtime(1234556)) + self.assertEqual(value, expected) + + def test_time_time(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = time.time() + self.assertAlmostEqual(value, 1412340005, places=0) + + + # ``_WarpedDatetime`` class + def test_datetime_date(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = datetime.date(2014, 10, 5) + self._check_date(value, u'2014-10-05') + + def test_datetime_date_today(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = datetime.date.today() + self._check_date(value, u'2014-10-03') + + def test_datetime_datetime(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = datetime.datetime(2014, 10, 5, 10, 33, 3) + self._check_dt(value, u'2014-10-05 10:33:03.000000') + + def test_datetime_datetime_today(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = datetime.datetime.today() + self._check_dt(value, u'2014-10-03 14:40:05') + + def test_datetime_datetime_now(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = datetime.datetime.now() + self._check_dt(value, u'2014-10-03 14:40:05') + + def test_datetime_datetime_utcnow(self): + timewarp.jump(self._parse_dt(u'2014-12-03 14:40:05')) + value = time.mktime(datetime.datetime.utcnow().timetuple()) + expected = self._parse_ts(u'2014-12-03 14:40:05') + timewarp._time_orig.timezone + self.assertEqual(value, expected) + + def test_datetime_date_is_instance(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = datetime.date.today() + self.assertIsInstance(value, datetime.date) + self.assertIsInstance(value, timewarp._datetime_orig.date) + self.assertNotIsInstance(value, datetime.datetime) + self.assertNotIsInstance(value, timewarp._datetime_orig.datetime) + + def test_datetime_datetime_is_instance(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = datetime.datetime.now() + self.assertIsInstance(value, datetime.datetime) + self.assertIsInstance(value, timewarp._datetime_orig.datetime) + self.assertIsInstance(value, datetime.date) + self.assertIsInstance(value, timewarp._datetime_orig.date) + + def test_datetime_date_is_subclass(self): + class SubDate(timewarp._datetime_orig.date): + pass + self.assertTrue(issubclass(SubDate, datetime.date)) + self.assertFalse(issubclass(datetime.date, SubDate)) + self.assertTrue(issubclass(datetime.date, timewarp._datetime_orig.date)) + self.assertTrue(issubclass(timewarp._datetime_orig.date, datetime.date)) + + def test_datetime_datetime_is_subclass(self): + class SubDatetime(timewarp._datetime_orig.datetime): + pass + self.assertTrue(issubclass(SubDatetime, datetime.datetime)) + self.assertFalse(issubclass(datetime.datetime, SubDatetime)) + self.assertTrue(issubclass(datetime.datetime, timewarp._datetime_orig.datetime)) + self.assertTrue(issubclass(timewarp._datetime_orig.datetime, datetime.datetime)) + + def test_datetime_date_pickle(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = datetime.date.today() + self._check_date(value, u'2014-10-03') + + pickled = pickle.dumps(value) + loaded = pickle.loads(pickled) + self._check_date(loaded, u'2014-10-03') + + def test_datetime_date_pickle_after_disable(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + timewarp.disable() + value = datetime.date(2014, 10, 5) + pickled = pickle.dumps(value) + loaded = pickle.loads(pickled) + self._check_date(loaded, u'2014-10-05') + + def test_datetime_datetime_pickle(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = datetime.datetime.today() + self._check_dt(value, u'2014-10-03 14:40:05') + + pickled = pickle.dumps(value) + loaded = pickle.loads(pickled) + self._check_dt(loaded, u'2014-10-03 14:40:05') + + def test_datetime_datetime_pickle_after_disable(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + timewarp.disable() + value = datetime.datetime(2014, 10, 5, 10, 33, 3) + self._check_dt(value, u'2014-10-05 10:33:03.000000') + + pickled = pickle.dumps(value) + loaded = pickle.loads(pickled) + self._check_dt(loaded, u'2014-10-05 10:33:03.000000') + + + # ``Timewarp`` singleton + def test_enable_and_disable(self): + timewarp.enable() + timewarp.enable() + timewarp.disable() + timewarp.disable() + timewarp.enable() + timewarp.enable() + + def test_properties_before_jump(self): + self.assertEqual(timewarp.warped_from, None) + self.assertEqual(timewarp.warped_to, None) + self.assertEqual(timewarp.speedup, 1) + self.assertEqual(timewarp.is_warped, False) + self.assertAlmostEqual(timewarp.real_time, timewarp._time_orig.time(), places=2) + self.assertAlmostEqual(timewarp.warped_time, timewarp._time_orig.time(), places=2) + + def test_properties_after_jump(self): + before = timewarp._time_orig.time() + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05'), speed=2) + self.assertAlmostEqual(timewarp.warped_from, before, places=2) + self._check_ts(timewarp.warped_to, u'2014-10-03 14:40:05.000000') + self.assertEqual(timewarp.speedup, 2) + self.assertTrue(timewarp.is_warped) + self.assertAlmostEqual(timewarp.real_time, timewarp._time_orig.time(), places=2) + self._check_ts(timewarp.warped_time, u'2014-10-03 14:40:05') + + def test_properties_after_reset(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05'), speed=2) + timewarp.reset() + self.assertEqual(timewarp.warped_from, None) + self.assertEqual(timewarp.warped_to, None) + self.assertEqual(timewarp.speedup, 1) + self.assertEqual(timewarp.is_warped, False) + self.assertAlmostEqual(timewarp.real_time, timewarp._time_orig.time(), places=2) + self.assertAlmostEqual(timewarp.warped_time, timewarp._time_orig.time(), places=2) + + def test_properties_after_disable(self): + u""" + If timewarp is disabled, all properties looks like reseted. + """ + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05'), speed=2) + timewarp.disable() + self.assertEqual(timewarp.warped_from, None) + self.assertEqual(timewarp.warped_to, None) + self.assertEqual(timewarp.speedup, 1) + self.assertEqual(timewarp.is_warped, False) + self.assertAlmostEqual(timewarp.real_time, timewarp._time_orig.time(), places=2) + self.assertAlmostEqual(timewarp.warped_time, timewarp._time_orig.time(), places=2) + + def test_properties_after_disable_and_another_enable(self): + u""" + If timewarp is enabled once again, all properties get their previous values. + """ + before = timewarp._time_orig.time() + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05'), speed=2) + timewarp.disable() + timewarp.enable() + self.assertAlmostEqual(timewarp.warped_from, before, places=2) + self._check_ts(timewarp.warped_to, u'2014-10-03 14:40:05.000000') + self.assertEqual(timewarp.speedup, 2) + self.assertTrue(timewarp.is_warped) + self.assertAlmostEqual(timewarp.real_time, timewarp._time_orig.time(), places=2) + self._check_ts(timewarp.warped_time, u'2014-10-03 14:40:0.') + + def test_jump(self): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + value = datetime.datetime.now() + self._check_dt(value, u'2014-10-03 14:40:05') + + def test_jump_speedup(self): + u""" + Checks that sleeping for 0.1 real seconds takes about 100 warped seconds if the time is + running 1000 times faster than normal. + """ + timewarp.jump(speed=1000) + before = datetime.datetime.now() + time.sleep(0.1) + after = datetime.datetime.now() + elapsed = (after-before).total_seconds() + self.assertAlmostEqual(elapsed, 100, delta=10) + + def test_jump_slowdown(self): + u""" + Checks that sleeping for 0.1 real seconds takes about 100 warped microseconds if the time + is running 1000 times slower than normal. + """ + timewarp.jump(speed=0.001) + before = datetime.datetime.now() + time.sleep(0.1) + after = datetime.datetime.now() + elapsed = (after-before).total_seconds() + self.assertAlmostEqual(elapsed, 0.0001, places=5) + + def test_jump_negative_speedup(self): + u""" + Checks that the time runs backwards if speedup is negative. + """ + timewarp.jump(speed=-1) + before = datetime.datetime.now() + time.sleep(0.1) + after = datetime.datetime.now() + elapsed = (after-before).total_seconds() + self.assertAlmostEqual(elapsed, -0.1, places=2) + + def test_jump_stopped_time(self): + u""" + Checks that the time stops if speedup is zero. + """ + timewarp.jump(speed=0) + before = datetime.datetime.now() + time.sleep(0.1) + after = datetime.datetime.now() + elapsed = (after-before).total_seconds() + self.assertEqual(elapsed, 0) + + def test_jump_with_disabled_timewarp_raises_error(self): + timewarp.disable() + with self.assertRaisesMessage(RuntimeError, u'Timewarp is disabled.'): + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + + def test_reset_returns_warped_time_back(self): + before = datetime.datetime.now() + timewarp.jump(self._parse_dt(u'2014-10-03 14:40:05')) + after = datetime.datetime.now() + elapsed = (after-before).total_seconds() + self.assertNotAlmostEqual(elapsed, 0, places=0) + + timewarp.reset() + after = datetime.datetime.now() + elapsed = (after-before).total_seconds() + self.assertAlmostEqual(elapsed, 0, places=0) diff --git a/poleno/timewarp/timewarp.py b/poleno/timewarp/timewarp.py index 0ead3a533..5bec10738 100644 --- a/poleno/timewarp/timewarp.py +++ b/poleno/timewarp/timewarp.py @@ -1,72 +1,11 @@ # vim: expandtab # -*- coding: utf-8 -*- +import sys import time as time_orig import datetime as datetime_orig +import copy_reg -timewarp = None - -class Timewarp(object): - def __init__(self): - from django.core.cache import cache as _cache - self._cache = _cache - self._recursive = False - self._lastupdate = None - self._warped_from = None - self._warped_to = None - self._speedup = 1 - - def _update(self): - if self._recursive: - return - if self._lastupdate and self._lastupdate + 1 > time_orig.time(): - return - self._recursive = True - self._warped_from = self._cache.get(u'timewarp.warped_from') - self._warped_to = self._cache.get(u'timewarp.warped_to') - self._speedup = self._cache.get(u'timewarp.speedup', 1) - self._recursive = False - self._lastupdate = time_orig.time() - - @property - def warped_from(self): - self._update() - return self._warped_from - - @property - def warped_to(self): - self._update() - return self._warped_to - - @property - def speedup(self): - self._update() - return self._speedup - - @property - def is_warped(self): - return self.warped_from is not None and self.warped_to is not None - - @property - def real_time(self): - return time_orig.time() - - @property - def warped_time(self): - if self.is_warped: - return self.warped_to + (self.real_time - self.warped_from) * self.speedup - return self.real_time - - def jump(self, date=None, speed=None): - self._cache.set_many({ - u'timewarp.warped_from': self.real_time, - u'timewarp.warped_to': time_orig.mktime(date.timetuple()) if date is not None else self.warped_time, - u'timewarp.speedup': speed if speed is not None else self.speedup, - }, timeout=None) - self._lastupdate = None - - def reset(self): - self._cache.delete_many([u'timewarp.warped_from', u'timewarp.warped_to', u'timewarp.speedup']) - self._lastupdate = None +from django.core.cache import cache def _meta_factory(cls): @@ -94,19 +33,19 @@ def asctime(cls, t=None): @classmethod def ctime(cls, secs=None): if secs is None: - secs = warped_time() + secs = timewarp.warped_time return time_orig.ctime(secs) @classmethod def gmtime(cls, secs=None): if secs is None: - secs = warped_time() + secs = timewarp.warped_time return time_orig.gmtime(secs) @classmethod def localtime(cls, secs=None): if secs is None: - secs = warped_time() + secs = timewarp.warped_time return time_orig.localtime(secs) @classmethod @@ -117,7 +56,7 @@ def strftime(cls, format, t=None): @classmethod def time(cls): - return warped_time() + return timewarp.warped_time def __getattr__(self, attr): return getattr(time_orig, attr) @@ -132,7 +71,7 @@ def __new__(cls, *args, **kwargs): @classmethod def today(cls): - return datetime_orig.datetime.fromtimestamp(warped_time()).date() + return datetime_orig.datetime.fromtimestamp(timewarp.warped_time).date() class datetime(datetime_orig.datetime): __metaclass__ = _meta_factory(datetime_orig.datetime) @@ -142,54 +81,142 @@ def __new__(cls, *args, **kwargs): @classmethod def today(cls): - return datetime_orig.datetime.fromtimestamp(warped_time()) + return datetime_orig.datetime.fromtimestamp(timewarp.warped_time) @classmethod def now(cls, tz=None): - return datetime_orig.datetime.fromtimestamp(warped_time(), tz) + return datetime_orig.datetime.fromtimestamp(timewarp.warped_time, tz) @classmethod def utcnow(cls): - return datetime_orig.datetime.utcfromtimestamp(warped_time()) + return datetime_orig.datetime.utcfromtimestamp(timewarp.warped_time) def __getattr__(self, attr): return getattr(datetime_orig, attr) -def init(): - import os - import sys - import warnings - import copy_reg - from django.utils.importlib import import_module - - # We may not import ``django.conf.settings`` because this would initialize ``timezone`` as - # well. But we need ``timezone`` to be initialized only after ``datetime`` module is wrapped. - env_name = u'DJANGO_SETTINGS_MODULE' - try: - settings_path = os.environ[env_name] - except KeyError: - warnings.warn(RuntimeWarning(u'Timewarp could not find `%s` environment variable. Timewarp disabled.' % env_name)) - return - - try: - settings = import_module(settings_path) - except ImportError: - warnings.warn(RuntimeWarning(u'Timewarp could not import settings module. Timewarp disabled.')) - return - - # Timewarp MUST NOT be enabled if DEBUG is not True. - if settings.DEBUG: - sys.modules[u'time'] = _WarpedTime() - sys.modules[u'datetime'] = _WarpedDatetime() - copy_reg.pickle(datetime_orig.date, lambda d: (_WarpedDatetime.date,) + d.__reduce__()[1:]) - copy_reg.pickle(datetime_orig.datetime, lambda d: (_WarpedDatetime.datetime,) + d.__reduce__()[1:]) - - global timewarp - timewarp = Timewarp() - -def real_time(): - return time_orig.time() - -def warped_time(): - return timewarp.warped_time if timewarp else time_orig.time() +class Timewarp(object): + def __init__(self): + self._enabled = False + self._recursive = False + self._lastupdate = None + self._warped_from = None + self._warped_to = None + self._speedup = 1 + + self._time_warped = _WarpedTime() + self._datetime_warped = _WarpedDatetime() + + self._remap = ( + (time_orig, self._time_warped), + (time_orig.asctime, self._time_warped.asctime), + (time_orig.ctime, self._time_warped.ctime), + (time_orig.gmtime, self._time_warped.gmtime), + (time_orig.localtime, self._time_warped.localtime), + (time_orig.strftime, self._time_warped.strftime), + (time_orig.time, self._time_warped.time), + (datetime_orig, self._datetime_warped), + (datetime_orig.date, self._datetime_warped.date), + (datetime_orig.datetime, self._datetime_warped.datetime), + ) + + def enable(self): + if not self._enabled: + self._enabled = True + self._remap_modules({a: b for a, b in self._remap}) + copy_reg.pickle(datetime_orig.date, lambda d: (_WarpedDatetime.date,) + d.__reduce__()[1:]) + copy_reg.pickle(datetime_orig.datetime, lambda d: (_WarpedDatetime.datetime,) + d.__reduce__()[1:]) + + def disable(self): + if self._enabled: + self._enabled = False + self._remap_modules({b: a for a, b in self._remap}) + copy_reg.pickle(datetime_orig.date, lambda d: d.__reduce__()) + copy_reg.pickle(datetime_orig.datetime, lambda d: d.__reduce__()) + + def _remap_modules(self, remap): + for name, module in sys.modules.items(): + if module is None: + continue + if name == __name__: # Skip this module + continue + if module in remap: + sys.modules[name] = remap[module] + continue + for var, value in module.__dict__.items(): + try: + if value in remap: + setattr(module, var, remap[value]) + except TypeError: + pass + + @property + def _time_orig(self): + return time_orig + + @property + def _datetime_orig(self): + return datetime_orig + + def _update(self): + if self._recursive: + return + if self._lastupdate and self._lastupdate + 1 > time_orig.time(): + return + self._recursive = True + self._warped_from = cache.get(u'timewarp.warped_from') + self._warped_to = cache.get(u'timewarp.warped_to') + self._speedup = cache.get(u'timewarp.speedup', 1) + self._recursive = False + self._lastupdate = time_orig.time() + + @property + def warped_from(self): + if not self._enabled: + return None + self._update() + return self._warped_from + + @property + def warped_to(self): + if not self._enabled: + return None + self._update() + return self._warped_to + + @property + def speedup(self): + if not self._enabled: + return 1 + self._update() + return self._speedup + + @property + def is_warped(self): + return self.warped_to is not None + + @property + def real_time(self): + return time_orig.time() + + @property + def warped_time(self): + if self.is_warped: + return self.warped_to + (self.real_time - self.warped_from) * self.speedup + return self.real_time + + def jump(self, date=None, speed=None): + if not self._enabled: + raise RuntimeError(u'Timewarp is disabled.') + cache.set_many({ + u'timewarp.warped_from': self.real_time, + u'timewarp.warped_to': time_orig.mktime(date.timetuple()) if date is not None else self.warped_time, + u'timewarp.speedup': speed if speed is not None else self.speedup, + }, timeout=None) + self._lastupdate = None + + def reset(self): + cache.delete_many([u'timewarp.warped_from', u'timewarp.warped_to', u'timewarp.speedup']) + self._lastupdate = None + +timewarp = Timewarp() diff --git a/poleno/utils/tests/test_date.py b/poleno/utils/tests/test_date.py index 35dce4cff..072139122 100644 --- a/poleno/utils/tests/test_date.py +++ b/poleno/utils/tests/test_date.py @@ -41,12 +41,10 @@ def _parse_time(self, value): return dt.time() def _check_dt(self, value, expected): - # FIXME: We should check against the class, not its name. After fixing timewarp. self.assertEqual(repr(type(value)), u"") self.assertEqual(value.strftime(u'%Y-%m-%d %H:%M:%S.%f %Z %z'), expected) def _check_date(self, value, expected): - # FIXME: We should check against the class, not its name. After fixing timewarp. self.assertEqual(repr(type(value)), u"") self.assertEqual(value.strftime(u'%Y-%m-%d'), expected)