forked from spotify/luigi
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrpc_test.py
120 lines (94 loc) · 3.82 KB
/
rpc_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from helpers import unittest
try:
from unittest import mock
except ImportError:
import mock
import luigi.rpc
from luigi.scheduler import Scheduler
import scheduler_api_test
import luigi.server
from server_test import ServerTestBase
import time
import socket
class RemoteSchedulerTest(unittest.TestCase):
def testUrlArgumentVariations(self):
for url in ['http://zorg.com', 'http://zorg.com/']:
for suffix in ['api/123', '/api/123']:
s = luigi.rpc.RemoteScheduler(url, 42)
with mock.patch.object(s, '_fetcher') as fetcher:
s._fetch(suffix, '{}')
fetcher.fetch.assert_called_once_with('http://zorg.com/api/123', '{}', 42)
def get_work(self, fetcher_side_effect):
class ShorterWaitRemoteScheduler(luigi.rpc.RemoteScheduler):
"""
A RemoteScheduler which waits shorter than usual before retrying (to speed up tests).
"""
def _wait(self):
time.sleep(1)
scheduler = ShorterWaitRemoteScheduler('http://zorg.com', 42)
with mock.patch.object(scheduler, '_fetcher') as fetcher:
fetcher.raises = socket.timeout
fetcher.fetch.side_effect = fetcher_side_effect
return scheduler.get_work("fake_worker")
def test_retry_rpc_method(self):
"""
Tests that a call to a RPC method is re-tried 3 times.
"""
fetch_results = [socket.timeout, socket.timeout, '{"response":{}}']
self.assertEqual({}, self.get_work(fetch_results))
def test_retry_rpc_limited(self):
"""
Tests that a call to an RPC method fails after the third attempt
"""
fetch_results = [socket.timeout, socket.timeout, socket.timeout]
self.assertRaises(luigi.rpc.RPCError, self.get_work, fetch_results)
def test_get_work_retries_on_null(self):
"""
Tests that get_work will retry if the response is null
"""
fetch_results = ['{"response": null}', '{"response": {"pass": true}}']
self.assertEqual({'pass': True}, self.get_work(fetch_results))
def test_get_work_retries_on_null_limited(self):
"""
Tests that get_work will give up after the third null response
"""
fetch_results = ['{"response": null}'] * 3 + ['{"response": {}}']
self.assertRaises(luigi.rpc.RPCError, self.get_work, fetch_results)
class RPCTest(scheduler_api_test.SchedulerApiTest, ServerTestBase):
def get_app(self):
conf = self.get_scheduler_config()
sch = Scheduler(**conf)
return luigi.server.app(sch)
def setUp(self):
super(RPCTest, self).setUp()
self.sch = luigi.rpc.RemoteScheduler(self.get_url(''))
self.sch._wait = lambda: None
# disable test that doesn't work with remote scheduler
def test_task_first_failure_time(self):
pass
def test_task_first_failure_time_remains_constant(self):
pass
def test_task_has_excessive_failures(self):
pass
def test_quadratic_behavior(self):
""" This would be too slow to run through network """
pass
def test_get_work_speed(self):
""" This would be too slow to run through network """
pass