|
18 | 18 | assert_callback_failed, |
19 | 19 | assert_callback_succeeded, |
20 | 20 | ) |
| 21 | +from .fixtures import FakeOperation |
21 | 22 |
|
22 | 23 | logging.basicConfig(level=logging.DEBUG) |
23 | 24 |
|
@@ -49,188 +50,206 @@ def stage(mocker, arbitrary_exception, arbitrary_base_exception): |
49 | 50 |
|
50 | 51 | @pytest.mark.describe("PipelineFlow - ._send_worker_op_down()") |
51 | 52 | class TestSendWorkerOpDown(object): |
52 | | - @pytest.mark.it("Runs the new op and does not continue running the original op") |
53 | | - def test_runs_new_op(self, mocker, stage, op, new_op): |
54 | | - stage._send_worker_op_down(worker_op=new_op, op=op) |
| 53 | + @pytest.fixture |
| 54 | + def arbitrary_worker_op(self, mocker): |
| 55 | + op = FakeOperation(callback=mocker.MagicMock()) |
| 56 | + op.name = "arbitrary_worker_op" |
| 57 | + return op |
| 58 | + |
| 59 | + @pytest.mark.it("Runs the worker op and does not continue running the original op") |
| 60 | + def test_runs_worker_op(self, mocker, stage, arbitrary_op, arbitrary_worker_op): |
| 61 | + stage._send_worker_op_down(worker_op=arbitrary_worker_op, op=arbitrary_op) |
55 | 62 | assert stage.next.run_op.call_count == 1 |
56 | | - assert stage.next.run_op.call_args == mocker.call(new_op) |
| 63 | + assert stage.next.run_op.call_args == mocker.call(arbitrary_worker_op) |
57 | 64 |
|
58 | | - @pytest.mark.it("Completes the original op after the new op completes") |
59 | | - def test_completes_original_op_after_new_op_completes(self, stage, op, new_op): |
60 | | - callback = op.callback |
61 | | - new_op.action = "pend" |
| 65 | + @pytest.mark.it("Completes the original op after the worker op completes") |
| 66 | + def test_completes_original_op_after_worker_op_completes( |
| 67 | + self, stage, arbitrary_op, arbitrary_worker_op |
| 68 | + ): |
| 69 | + callback = arbitrary_op.callback |
| 70 | + arbitrary_worker_op.action = "pend" |
62 | 71 |
|
63 | | - stage._send_worker_op_down(worker_op=new_op, op=op) |
64 | | - assert callback.call_count == 0 # because new_op is pending |
| 72 | + stage._send_worker_op_down(worker_op=arbitrary_worker_op, op=arbitrary_op) |
| 73 | + assert callback.call_count == 0 # because arbitrary_worker_op is pending |
65 | 74 |
|
66 | | - stage.next._complete_op(new_op) |
67 | | - assert_callback_succeeded(op=op) |
| 75 | + stage.next._complete_op(arbitrary_worker_op) |
| 76 | + assert_callback_succeeded(op=arbitrary_op) |
68 | 77 |
|
69 | | - @pytest.mark.it("Returns the new op failure in the original op if new op fails") |
70 | | - def test_returns_new_op_failure_in_original_op(self, stage, op, new_op, arbitrary_exception): |
71 | | - new_op.action = "fail" |
72 | | - stage._send_worker_op_down(worker_op=new_op, op=op) |
73 | | - assert_callback_failed(op, error=arbitrary_exception) |
| 78 | + @pytest.mark.it("Returns the worker op failure in the original op if worker op fails") |
| 79 | + def test_returns_worker_op_failure_in_original_op( |
| 80 | + self, stage, arbitrary_op, arbitrary_worker_op, arbitrary_exception |
| 81 | + ): |
| 82 | + arbitrary_worker_op.action = "fail" |
| 83 | + stage._send_worker_op_down(worker_op=arbitrary_worker_op, op=arbitrary_op) |
| 84 | + assert_callback_failed(arbitrary_op, error=arbitrary_exception) |
74 | 85 |
|
75 | 86 |
|
76 | 87 | @pytest.mark.describe("PipelineFlow - ._send_op_down()") |
77 | 88 | class TestSendOpDown(object): |
78 | 89 | @pytest.mark.it("Fails the op if there is no next stage") |
79 | | - def test_fails_op_when_no_next_stage(self, stage, op): |
| 90 | + def test_fails_op_when_no_next_stage(self, stage, arbitrary_op): |
80 | 91 | stage.next = None |
81 | | - stage._send_op_down(op) |
82 | | - assert_callback_failed(op=op, error=pipeline_exceptions.PipelineError) |
| 92 | + stage._send_op_down(arbitrary_op) |
| 93 | + assert_callback_failed(op=arbitrary_op, error=pipeline_exceptions.PipelineError) |
83 | 94 |
|
84 | 95 | @pytest.mark.it("Passes the op to the next stage") |
85 | | - def test_passes_op_to_next_stage(self, mocker, stage, op): |
86 | | - stage._send_op_down(op) |
| 96 | + def test_passes_op_to_next_stage(self, mocker, stage, arbitrary_op): |
| 97 | + stage._send_op_down(arbitrary_op) |
87 | 98 | assert stage.next.run_op.call_count == 1 |
88 | | - assert stage.next.run_op.call_args == mocker.call(op) |
| 99 | + assert stage.next.run_op.call_args == mocker.call(arbitrary_op) |
89 | 100 |
|
90 | 101 |
|
91 | 102 | @pytest.mark.describe("PipelineFlow - ._complete_op()") |
92 | 103 | class TestCompleteOp(object): |
93 | 104 | @pytest.mark.it("Calls the op callback on success") |
94 | | - def test_calls_callback_on_success(self, stage, op): |
95 | | - stage._complete_op(op) |
96 | | - assert_callback_succeeded(op) |
| 105 | + def test_calls_callback_on_success(self, stage, arbitrary_op): |
| 106 | + stage._complete_op(arbitrary_op) |
| 107 | + assert_callback_succeeded(arbitrary_op) |
97 | 108 |
|
98 | 109 | @pytest.mark.it("Calls the op callback on failure") |
99 | | - def test_calls_callback_on_error(self, stage, op, arbitrary_exception): |
100 | | - stage._complete_op(op, error=arbitrary_exception) |
101 | | - assert_callback_failed(op=op, error=arbitrary_exception) |
| 110 | + def test_calls_callback_on_error(self, stage, arbitrary_op, arbitrary_exception): |
| 111 | + stage._complete_op(arbitrary_op, error=arbitrary_exception) |
| 112 | + assert_callback_failed(op=arbitrary_op, error=arbitrary_exception) |
102 | 113 |
|
103 | 114 | @pytest.mark.it( |
104 | 115 | "Calls handle_background_exception with a PipelineError if the op has previously been completed" |
105 | 116 | ) |
106 | | - def test_background_exception_if_called_twice(self, stage, op, mocker): |
| 117 | + def test_background_exception_if_called_twice(self, stage, arbitrary_op, mocker): |
107 | 118 | mocker.spy(handle_exceptions, "handle_background_exception") |
108 | | - stage._complete_op(op) |
109 | | - stage._complete_op(op) |
| 119 | + stage._complete_op(arbitrary_op) |
| 120 | + stage._complete_op(arbitrary_op) |
110 | 121 | assert handle_exceptions.handle_background_exception.call_count == 1 |
111 | 122 | assert ( |
112 | 123 | handle_exceptions.handle_background_exception.call_args[0][0].__class__ |
113 | 124 | == pipeline_exceptions.PipelineError |
114 | 125 | ) |
115 | 126 |
|
116 | 127 | @pytest.mark.it("Does not call the callback if the op has previously been completed") |
117 | | - def test_no_callback_if_called_twice(self, stage, op): |
118 | | - stage._complete_op(op) |
119 | | - op.callback.reset_mock() |
120 | | - stage._complete_op(op) |
121 | | - assert op.callback.call_count == 0 |
| 128 | + def test_no_callback_if_called_twice(self, stage, arbitrary_op): |
| 129 | + stage._complete_op(arbitrary_op) |
| 130 | + arbitrary_op.callback.reset_mock() |
| 131 | + stage._complete_op(arbitrary_op) |
| 132 | + assert arbitrary_op.callback.call_count == 0 |
122 | 133 |
|
123 | 134 | @pytest.mark.it( |
124 | 135 | "Handles Exceptions raised in operation callback and passes them to the unhandled error handler" |
125 | 136 | ) |
126 | 137 | def test_op_callback_raises_exception( |
127 | | - self, stage, op, arbitrary_exception, mocker, unhandled_error_handler |
| 138 | + self, stage, arbitrary_op, arbitrary_exception, mocker, unhandled_error_handler |
128 | 139 | ): |
129 | | - op.callback = mocker.Mock(side_effect=arbitrary_exception) |
130 | | - stage._complete_op(op) |
131 | | - assert op.callback.call_count == 1 |
132 | | - assert op.callback.call_args == mocker.call(op, error=None) |
| 140 | + arbitrary_op.callback = mocker.Mock(side_effect=arbitrary_exception) |
| 141 | + stage._complete_op(arbitrary_op) |
| 142 | + assert arbitrary_op.callback.call_count == 1 |
| 143 | + assert arbitrary_op.callback.call_args == mocker.call(arbitrary_op, error=None) |
133 | 144 | assert unhandled_error_handler.call_count == 1 |
134 | 145 | assert unhandled_error_handler.call_args == mocker.call(arbitrary_exception) |
135 | 146 |
|
136 | 147 | @pytest.mark.it("Allows any BaseExceptions raised in operation callback to propagate") |
137 | | - def test_op_callback_raises_base_exception(self, stage, op, arbitrary_base_exception, mocker): |
138 | | - op.callback = mocker.Mock(side_effect=arbitrary_base_exception) |
| 148 | + def test_op_callback_raises_base_exception( |
| 149 | + self, stage, arbitrary_op, arbitrary_base_exception, mocker |
| 150 | + ): |
| 151 | + arbitrary_op.callback = mocker.Mock(side_effect=arbitrary_base_exception) |
139 | 152 | with pytest.raises(arbitrary_base_exception.__class__): |
140 | | - stage._complete_op(op) |
| 153 | + stage._complete_op(arbitrary_op) |
141 | 154 |
|
142 | 155 |
|
143 | 156 | @pytest.mark.describe("PipelineFlow - ._send_completed_op_up()") |
144 | 157 | class TestSendCompletedOpUp(object): |
145 | 158 | @pytest.mark.it("Calls the op callback on success") |
146 | | - def test_calls_callback_on_success(self, stage, op): |
147 | | - op.completed = True |
148 | | - stage._send_completed_op_up(op) |
149 | | - assert_callback_succeeded(op) |
| 159 | + def test_calls_callback_on_success(self, stage, arbitrary_op): |
| 160 | + arbitrary_op.completed = True |
| 161 | + stage._send_completed_op_up(arbitrary_op) |
| 162 | + assert_callback_succeeded(arbitrary_op) |
150 | 163 |
|
151 | 164 | @pytest.mark.it("Calls the op callback on failure") |
152 | | - def test_calls_callback_on_error(self, stage, op, arbitrary_exception): |
153 | | - op.completed = True |
154 | | - stage._send_completed_op_up(op, error=arbitrary_exception) |
155 | | - assert_callback_failed(op=op, error=arbitrary_exception) |
| 165 | + def test_calls_callback_on_error(self, stage, arbitrary_op, arbitrary_exception): |
| 166 | + arbitrary_op.completed = True |
| 167 | + stage._send_completed_op_up(arbitrary_op, error=arbitrary_exception) |
| 168 | + assert_callback_failed(op=arbitrary_op, error=arbitrary_exception) |
156 | 169 |
|
157 | 170 | @pytest.mark.it( |
158 | 171 | "Calls the callback with a PipelineError if the operation has not been completed" |
159 | 172 | ) |
160 | | - def test_error_if_not_completed(self, stage, op): |
| 173 | + def test_error_if_not_completed(self, stage, arbitrary_op): |
161 | 174 | with pytest.raises(pipeline_exceptions.PipelineError): |
162 | | - stage._send_completed_op_up(op) |
| 175 | + stage._send_completed_op_up(arbitrary_op) |
163 | 176 |
|
164 | 177 | @pytest.mark.it( |
165 | 178 | "Handles Exceptions raised in operation callback and passes them to the unhandled error handler" |
166 | 179 | ) |
167 | 180 | def test_op_callback_raises_exception( |
168 | | - self, stage, op, arbitrary_exception, mocker, unhandled_error_handler |
| 181 | + self, stage, arbitrary_op, arbitrary_exception, mocker, unhandled_error_handler |
169 | 182 | ): |
170 | | - op.callback = mocker.Mock(side_effect=arbitrary_exception) |
171 | | - op.completed = True |
172 | | - stage._send_completed_op_up(op) |
173 | | - assert op.callback.call_count == 1 |
174 | | - assert op.callback.call_args == mocker.call(op, error=None) |
| 183 | + arbitrary_op.callback = mocker.Mock(side_effect=arbitrary_exception) |
| 184 | + arbitrary_op.completed = True |
| 185 | + stage._send_completed_op_up(arbitrary_op) |
| 186 | + assert arbitrary_op.callback.call_count == 1 |
| 187 | + assert arbitrary_op.callback.call_args == mocker.call(arbitrary_op, error=None) |
175 | 188 | assert unhandled_error_handler.call_count == 1 |
176 | 189 | assert unhandled_error_handler.call_args == mocker.call(arbitrary_exception) |
177 | 190 |
|
178 | 191 | @pytest.mark.it("Allows any BaseExceptions raised in operation callback to propagate") |
179 | | - def test_op_callback_raises_base_exception(self, stage, op, arbitrary_base_exception, mocker): |
180 | | - op.callback = mocker.Mock(side_effect=arbitrary_base_exception) |
181 | | - op.completed = True |
| 192 | + def test_op_callback_raises_base_exception( |
| 193 | + self, stage, arbitrary_op, arbitrary_base_exception, mocker |
| 194 | + ): |
| 195 | + arbitrary_op.callback = mocker.Mock(side_effect=arbitrary_base_exception) |
| 196 | + arbitrary_op.completed = True |
182 | 197 | with pytest.raises(arbitrary_base_exception.__class__): |
183 | | - stage._send_completed_op_up(op) |
| 198 | + stage._send_completed_op_up(arbitrary_op) |
184 | 199 |
|
185 | 200 |
|
186 | 201 | @pytest.mark.describe("PipelineFlow - ._send_op_down_and_intercept_return()") |
187 | 202 | class TestSendOpDownAndInterceptReturn(object): |
188 | 203 | @pytest.mark.it("Calls _send_op_down to send the op down") |
189 | | - def test_sends_op_down(self, stage, op, mocker): |
| 204 | + def test_sends_op_down(self, stage, arbitrary_op, mocker): |
190 | 205 | intercepted_return = mocker.MagicMock() |
191 | 206 | mocker.spy(stage, "_send_op_down") |
192 | | - stage._send_op_down_and_intercept_return(op, intercepted_return) |
| 207 | + stage._send_op_down_and_intercept_return(arbitrary_op, intercepted_return) |
193 | 208 | assert stage._send_op_down.call_count == 1 |
194 | | - assert stage._send_op_down.call_args == mocker.call(op) |
| 209 | + assert stage._send_op_down.call_args == mocker.call(arbitrary_op) |
195 | 210 |
|
196 | 211 | @pytest.mark.it("Calls the intercepted_return function when the op succeeds") |
197 | | - def test_calls_intercepted_return_on_op_success(self, stage, op, mocker): |
| 212 | + def test_calls_intercepted_return_on_op_success(self, stage, arbitrary_op, mocker): |
198 | 213 | intercepted_return = mocker.MagicMock() |
199 | | - stage._send_op_down_and_intercept_return(op, intercepted_return) |
200 | | - assert intercepted_return.call_args == mocker.call(op=op, error=None) |
| 214 | + stage._send_op_down_and_intercept_return(arbitrary_op, intercepted_return) |
| 215 | + assert intercepted_return.call_args == mocker.call(op=arbitrary_op, error=None) |
201 | 216 |
|
202 | 217 | @pytest.mark.it("Calls the intercepted_return function when the op fails") |
203 | | - def test_calls_intercepted_return_on_op_failure(self, stage, op, mocker, arbitrary_exception): |
204 | | - op.action = "fail" |
| 218 | + def test_calls_intercepted_return_on_op_failure( |
| 219 | + self, stage, arbitrary_op, mocker, arbitrary_exception |
| 220 | + ): |
| 221 | + arbitrary_op.action = "fail" |
205 | 222 | intercepted_return = mocker.MagicMock() |
206 | | - stage._send_op_down_and_intercept_return(op, intercepted_return) |
207 | | - assert intercepted_return.call_args == mocker.call(op=op, error=arbitrary_exception) |
| 223 | + stage._send_op_down_and_intercept_return(arbitrary_op, intercepted_return) |
| 224 | + assert intercepted_return.call_args == mocker.call( |
| 225 | + op=arbitrary_op, error=arbitrary_exception |
| 226 | + ) |
208 | 227 |
|
209 | 228 | @pytest.mark.it( |
210 | 229 | "Ensures that the op callback is set to its original value when the intercepted_return function is called" |
211 | 230 | ) |
212 | | - def test_restores_callback_before_calling_intercepted_return(self, stage, op, mocker): |
213 | | - saved_callback = op.callback |
| 231 | + def test_restores_callback_before_calling_intercepted_return(self, stage, arbitrary_op, mocker): |
| 232 | + saved_callback = arbitrary_op.callback |
214 | 233 | intercepted_return = mocker.MagicMock() |
215 | | - stage._send_op_down_and_intercept_return(op, intercepted_return) |
| 234 | + stage._send_op_down_and_intercept_return(arbitrary_op, intercepted_return) |
216 | 235 | assert intercepted_return.call_args[1]["op"].callback == saved_callback |
217 | 236 |
|
218 | 237 |
|
219 | 238 | @pytest.mark.describe("PipelineFlow - ._send_event_up()") |
220 | 239 | class TestSendEventUp(object): |
221 | 240 | @pytest.mark.it("Calls handle_pipeline_event on the previous stage") |
222 | | - def test_calls_handle_pipeline_event(self, stage, event, mocker): |
| 241 | + def test_calls_handle_pipeline_event(self, stage, arbitrary_event, mocker): |
223 | 242 | mocker.spy(stage, "handle_pipeline_event") |
224 | | - stage.next._send_event_up(event) |
| 243 | + stage.next._send_event_up(arbitrary_event) |
225 | 244 | assert stage.handle_pipeline_event.call_count == 1 |
226 | | - assert stage.handle_pipeline_event.call_args == mocker.call(event) |
| 245 | + assert stage.handle_pipeline_event.call_args == mocker.call(arbitrary_event) |
227 | 246 |
|
228 | 247 | @pytest.mark.it( |
229 | 248 | "Calls handle_background_exception with a PipelineError if there is no previous stage" |
230 | 249 | ) |
231 | | - def test_no_previous_stage(self, stage, event, mocker): |
| 250 | + def test_no_previous_stage(self, stage, arbitrary_event, mocker): |
232 | 251 | mocker.spy(handle_exceptions, "handle_background_exception") |
233 | | - stage._send_event_up(event) |
| 252 | + stage._send_event_up(arbitrary_event) |
234 | 253 | assert handle_exceptions.handle_background_exception.call_count == 1 |
235 | 254 | assert ( |
236 | 255 | handle_exceptions.handle_background_exception.call_args[0][0].__class__ |
|
0 commit comments