| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright 2014 The LUCI Authors. All rights reserved. | 2 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
| 5 | 5 |
| 6 import datetime | 6 import datetime |
| 7 import logging | 7 import logging |
| 8 import os | 8 import os |
| 9 import random | 9 import random |
| 10 import sys | 10 import sys |
| (...skipping 187 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 198 'modified_ts': self.now, | 198 'modified_ts': self.now, |
| 199 'outputs_ref': None, | 199 'outputs_ref': None, |
| 200 'server_versions': [u'v1a'], | 200 'server_versions': [u'v1a'], |
| 201 'started_ts': self.now, | 201 'started_ts': self.now, |
| 202 'state': State.RUNNING, | 202 'state': State.RUNNING, |
| 203 'try_number': 1, | 203 'try_number': 1, |
| 204 } | 204 } |
| 205 expected.update(**kwargs) | 205 expected.update(**kwargs) |
| 206 return expected | 206 return expected |
| 207 | 207 |
| 208 def _quick_schedule(self, dims, nb_task=1): | 208 def _quick_schedule(self, nb_task=1, **kwargs): |
| 209 """Schedules a task. | 209 """Schedules a task. |
| 210 | 210 |
| 211 nb_task is 1 if a GAE task queue rebuild-task-cache was enqueued. | 211 nb_task is 1 if a GAE task queue rebuild-task-cache was enqueued. |
| 212 """ | 212 """ |
| 213 request = self._gen_request(properties={'dimensions': dims}) | 213 request = self._gen_request(**kwargs) |
| 214 result_summary = task_scheduler.schedule_request(request, None) | 214 result_summary = task_scheduler.schedule_request(request, None) |
| 215 self.assertEqual(nb_task, self.execute_tasks()) | 215 self.assertEqual(nb_task, self.execute_tasks()) |
| 216 return result_summary | 216 return result_summary |
| 217 | 217 |
| 218 def _register_bot(self, bot_dimensions, nb_task=1): | 218 def _register_bot(self, bot_dimensions, nb_task=1): |
| 219 """Registers the bot so the task queues knows there's a worker than can run | 219 """Registers the bot so the task queues knows there's a worker than can run |
| 220 the task. | 220 the task. |
| 221 """ | 221 """ |
| 222 bot_management.bot_event( | 222 bot_management.bot_event( |
| 223 'bot_connected', bot_dimensions[u'id'][0], '1.2.3.4', 'joe@localhost', | 223 'bot_connected', bot_dimensions[u'id'][0], '1.2.3.4', 'joe@localhost', |
| 224 bot_dimensions, {'state': 'real'}, '1234', False, None, None) | 224 bot_dimensions, {'state': 'real'}, '1234', False, None, None) |
| 225 task_queues.assert_bot(bot_dimensions) | 225 task_queues.assert_bot(bot_dimensions) |
| 226 self.assertEqual(nb_task, self.execute_tasks()) | 226 self.assertEqual(nb_task, self.execute_tasks()) |
| 227 | 227 |
| 228 def _quick_reap(self, nb_task=1): | 228 def _quick_reap(self, nb_task=1, **kwargs): |
| 229 """Reaps a task.""" | 229 """Reaps a task.""" |
| 230 self._quick_schedule({u'os': u'Windows-3.1.1', u'pool': u'default'}) | 230 self._quick_schedule(**kwargs) |
| 231 self._register_bot(self.bot_dimensions, nb_task=nb_task) | 231 self._register_bot(self.bot_dimensions, nb_task=nb_task) |
| 232 reaped_request, _, run_result = task_scheduler.bot_reap_task( | 232 reaped_request, _, run_result = task_scheduler.bot_reap_task( |
| 233 self.bot_dimensions, 'abc', None) | 233 self.bot_dimensions, 'abc', None) |
| 234 return run_result | 234 return run_result |
| 235 | 235 |
| 236 def test_all_apis_are_tested(self): | 236 def test_all_apis_are_tested(self): |
| 237 # Ensures there's a test for each public API. | 237 # Ensures there's a test for each public API. |
| 238 # TODO(maruel): Remove this once coverage is asserted. | 238 # TODO(maruel): Remove this once coverage is asserted. |
| 239 module = task_scheduler | 239 module = task_scheduler |
| 240 expected = set( | 240 expected = set( |
| (...skipping 437 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 678 exit_code=1, | 678 exit_code=1, |
| 679 failure=True, | 679 failure=True, |
| 680 id='1d69b9f088008911', | 680 id='1d69b9f088008911', |
| 681 started_ts=self.now, | 681 started_ts=self.now, |
| 682 state=State.COMPLETED), | 682 state=State.COMPLETED), |
| 683 ] | 683 ] |
| 684 self.assertEqual(expected, [t.to_dict() for t in run_results]) | 684 self.assertEqual(expected, [t.to_dict() for t in run_results]) |
| 685 | 685 |
| 686 def test_schedule_request(self): | 686 def test_schedule_request(self): |
| 687 # It is tested indirectly in the other functions. | 687 # It is tested indirectly in the other functions. |
| 688 self.assertTrue( | 688 self.assertTrue(self._quick_schedule()) |
| 689 self._quick_schedule({u'os': u'Windows-3.1.1', u'pool': u'default'})) | |
| 690 | 689 |
| 691 def mock_dim_acls(self, mapping): | 690 def mock_dim_acls(self, mapping): |
| 692 self.mock(config, 'settings', lambda: config_pb2.SettingsCfg( | 691 self.mock(config, 'settings', lambda: config_pb2.SettingsCfg( |
| 693 dimension_acls=config_pb2.DimensionACLs(entry=[ | 692 dimension_acls=config_pb2.DimensionACLs(entry=[ |
| 694 config_pb2.DimensionACLs.Entry(dimension=[d], usable_by=g) | 693 config_pb2.DimensionACLs.Entry(dimension=[d], usable_by=g) |
| 695 for d, g in sorted(mapping.iteritems()) | 694 for d, g in sorted(mapping.iteritems()) |
| 696 ]), | 695 ]), |
| 697 )) | 696 )) |
| 698 | 697 |
| 699 def test_schedule_request_forbidden_dim(self): | 698 def test_schedule_request_forbidden_dim(self): |
| 700 self.mock_dim_acls({u'pool:bad': u'noone'}) | 699 self.mock_dim_acls({u'pool:bad': u'noone'}) |
| 701 self._quick_schedule({u'pool': u'good'}) | 700 self._quick_schedule(properties={'dimensions': {u'pool': u'good'}}) |
| 702 with self.assertRaises(auth.AuthorizationError): | 701 with self.assertRaises(auth.AuthorizationError): |
| 703 self._quick_schedule({u'pool': u'bad'}) | 702 self._quick_schedule(properties={'dimensions': {u'pool': u'bad'}}) |
| 704 | 703 |
| 705 def test_schedule_request_forbidden_dim_via_star(self): | 704 def test_schedule_request_forbidden_dim_via_star(self): |
| 706 self.mock_dim_acls({u'abc:*': u'noone'}) | 705 self.mock_dim_acls({u'abc:*': u'noone'}) |
| 707 self._quick_schedule({u'pool': u'default'}) | 706 self._quick_schedule(properties={'dimensions': {u'pool': u'default'}}) |
| 708 with self.assertRaises(auth.AuthorizationError): | 707 with self.assertRaises(auth.AuthorizationError): |
| 709 self._quick_schedule({u'pool': u'default', u'abc': u'blah'}) | 708 self._quick_schedule( |
| 709 properties={'dimensions': {u'pool': u'default', u'abc': u'blah'}}) |
| 710 | 710 |
| 711 def test_schedule_request_id_without_pool(self): | 711 def test_schedule_request_id_without_pool(self): |
| 712 self.mock_dim_acls({u'pool:good': u'mocked'}) | 712 self.mock_dim_acls({u'pool:good': u'mocked'}) |
| 713 with self.assertRaises(auth.AuthorizationError): | 713 with self.assertRaises(auth.AuthorizationError): |
| 714 self._quick_schedule({u'id': u'abc'}) | 714 self._quick_schedule(properties={'dimensions': {u'id': u'abc'}}) |
| 715 auth_testing.mock_is_admin(self) | 715 auth_testing.mock_is_admin(self) |
| 716 self._quick_schedule({u'id': u'abc'}, nb_task=0) | 716 self._quick_schedule(properties={'dimensions': {u'id': u'abc'}}, nb_task=0) |
| 717 | 717 |
| 718 def test_schedule_request_id_and_pool(self): | 718 def test_schedule_request_id_and_pool(self): |
| 719 self.mock_dim_acls({u'pool:good': u'mocked'}) | 719 self.mock_dim_acls({u'pool:good': u'mocked'}) |
| 720 self.mock_dim_acls({u'pool:bad': u'unknown'}) | 720 self.mock_dim_acls({u'pool:bad': u'unknown'}) |
| 721 | 721 |
| 722 def mocked_is_group_member(group, ident): | 722 def mocked_is_group_member(group, ident): |
| 723 if group == 'mocked' and ident == auth_testing.DEFAULT_MOCKED_IDENTITY: | 723 if group == 'mocked' and ident == auth_testing.DEFAULT_MOCKED_IDENTITY: |
| 724 return True | 724 return True |
| 725 return False | 725 return False |
| 726 self.mock(auth, 'is_group_member', mocked_is_group_member) | 726 self.mock(auth, 'is_group_member', mocked_is_group_member) |
| 727 | 727 |
| 728 self._quick_schedule({u'id': u'abc', u'pool': u'unknown'}, nb_task=0) | 728 self._quick_schedule( |
| 729 self._quick_schedule({u'id': u'abc', u'pool': u'good'}, nb_task=0) | 729 properties={'dimensions': {u'id': u'abc', u'pool': u'unknown'}}, |
| 730 nb_task=0) |
| 731 self._quick_schedule( |
| 732 properties={'dimensions': {u'id': u'abc', u'pool': u'good'}}, nb_task=0) |
| 730 with self.assertRaises(auth.AuthorizationError): | 733 with self.assertRaises(auth.AuthorizationError): |
| 731 self._quick_schedule({u'id': u'abc', u'pool': u'bad'}) | 734 self._quick_schedule( |
| 735 properties={'dimensions': {u'id': u'abc', u'pool': u'bad'}}) |
| 732 | 736 |
| 733 def test_bot_update_task(self): | 737 def test_bot_update_task(self): |
| 734 run_result = self._quick_reap(nb_task=0) | 738 run_result = self._quick_reap(nb_task=0) |
| 735 self.assertEqual( | 739 self.assertEqual( |
| 736 task_result.State.RUNNING, | 740 task_result.State.RUNNING, |
| 737 task_scheduler.bot_update_task( | 741 task_scheduler.bot_update_task( |
| 738 run_result_key=run_result.key, | 742 run_result_key=run_result.key, |
| 739 bot_id='localhost', | 743 bot_id='localhost', |
| 740 cipd_pins=None, | 744 cipd_pins=None, |
| 741 output='hi', | 745 output='hi', |
| (...skipping 396 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1138 properties_hash=request.properties_hash.encode('hex'), | 1142 properties_hash=request.properties_hash.encode('hex'), |
| 1139 started_ts=now_2, | 1143 started_ts=now_2, |
| 1140 state=task_result.State.COMPLETED, | 1144 state=task_result.State.COMPLETED, |
| 1141 try_number=2) | 1145 try_number=2) |
| 1142 self.assertEqual(expected, run_result.result_summary_key.get().to_dict()) | 1146 self.assertEqual(expected, run_result.result_summary_key.get().to_dict()) |
| 1143 self.assertEqual(0.1, run_result.key.get().cost_usd) | 1147 self.assertEqual(0.1, run_result.key.get().cost_usd) |
| 1144 | 1148 |
| 1145 self.assertEqual(0, self.execute_tasks()) | 1149 self.assertEqual(0, self.execute_tasks()) |
| 1146 self.assertEqual(4, len(pub_sub_calls)) # RUNNING -> COMPLETED | 1150 self.assertEqual(4, len(pub_sub_calls)) # RUNNING -> COMPLETED |
| 1147 | 1151 |
| 1152 def test_cron_handle_bot_died_no_update_not_idempotent(self): |
| 1153 # A bot reaped a task but the handler returned HTTP 500, leaving the task in |
| 1154 # a lingering state. |
| 1155 pub_sub_calls = self.mock_pub_sub() |
| 1156 |
| 1157 # Test first retry, then success. |
| 1158 now = utils.utcnow() |
| 1159 request = self._gen_request( |
| 1160 created_ts=now, |
| 1161 expiration_ts=now+datetime.timedelta(seconds=600), |
| 1162 pubsub_topic='projects/abc/topics/def') |
| 1163 task_request.init_new_request(request, True, None) |
| 1164 _result_summary = task_scheduler.schedule_request(request, None) |
| 1165 self.assertEqual(1, self.execute_tasks()) |
| 1166 self.assertEqual(0, len(pub_sub_calls)) |
| 1167 self._register_bot(self.bot_dimensions, nb_task=0) |
| 1168 request, _, run_result = task_scheduler.bot_reap_task( |
| 1169 self.bot_dimensions, 'abc', None) |
| 1170 self.assertEqual( |
| 1171 task_result.State.RUNNING, run_result.result_summary_key.get().state) |
| 1172 self.assertEqual(1, self.execute_tasks()) |
| 1173 self.assertEqual(1, len(pub_sub_calls)) # PENDING -> RUNNING |
| 1174 self.assertEqual(1, run_result.try_number) |
| 1175 self.assertEqual(task_result.State.RUNNING, run_result.state) |
| 1176 now_1 = self.mock_now(self.now + task_result.BOT_PING_TOLERANCE, 1) |
| 1177 self.assertEqual(([], 1, 0), task_scheduler.cron_handle_bot_died('f.local')) |
| 1178 self.assertEqual(1, self.execute_tasks()) |
| 1179 self.assertEqual(2, len(pub_sub_calls)) # RUNNING -> PENDING |
| 1180 |
| 1181 # Refresh and compare: |
| 1182 expected = self._gen_result_summary_reaped( |
| 1183 costs_usd=[0.], |
| 1184 id='1d69b9f088008910', |
| 1185 modified_ts=now_1, |
| 1186 state=task_result.State.PENDING, |
| 1187 try_number=1) |
| 1188 self.assertEqual(expected, run_result.result_summary_key.get().to_dict()) |
| 1189 expected = self._gen_run_result( |
| 1190 abandoned_ts=now_1, |
| 1191 id='1d69b9f088008911', |
| 1192 internal_failure=True, |
| 1193 modified_ts=now_1, |
| 1194 state=task_result.State.BOT_DIED) |
| 1195 self.assertEqual(expected, run_result.key.get().to_dict()) |
| 1196 |
| 1197 # Task was retried. |
| 1198 now_2 = self.mock_now(self.now + task_result.BOT_PING_TOLERANCE, 2) |
| 1199 bot_dimensions_second = self.bot_dimensions.copy() |
| 1200 bot_dimensions_second[u'id'] = [u'localhost-second'] |
| 1201 self._register_bot(bot_dimensions_second, nb_task=0) |
| 1202 _request, _, run_result = task_scheduler.bot_reap_task( |
| 1203 bot_dimensions_second, 'abc', None) |
| 1204 self.assertEqual(1, self.execute_tasks()) |
| 1205 self.assertEqual(3, len(pub_sub_calls)) # PENDING -> RUNNING |
| 1206 logging.info('%s', [t.to_dict() for t in task_to_run.TaskToRun.query()]) |
| 1207 self.assertEqual(2, run_result.try_number) |
| 1208 self.assertEqual( |
| 1209 task_result.State.COMPLETED, |
| 1210 task_scheduler.bot_update_task( |
| 1211 run_result_key=run_result.key, |
| 1212 bot_id='localhost-second', |
| 1213 cipd_pins=None, |
| 1214 output='Foo1', |
| 1215 output_chunk_start=0, |
| 1216 exit_code=0, |
| 1217 duration=0.1, |
| 1218 hard_timeout=False, |
| 1219 io_timeout=False, |
| 1220 cost_usd=0.1, |
| 1221 outputs_ref=None, |
| 1222 performance_stats=None)) |
| 1223 expected = self._gen_result_summary_reaped( |
| 1224 bot_dimensions=bot_dimensions_second, |
| 1225 bot_id=u'localhost-second', |
| 1226 completed_ts=now_2, |
| 1227 costs_usd=[0., 0.1], |
| 1228 duration=0.1, |
| 1229 exit_code=0, |
| 1230 id='1d69b9f088008910', |
| 1231 modified_ts=now_2, |
| 1232 started_ts=now_2, |
| 1233 state=task_result.State.COMPLETED, |
| 1234 try_number=2) |
| 1235 self.assertEqual(expected, run_result.result_summary_key.get().to_dict()) |
| 1236 self.assertEqual(0.1, run_result.key.get().cost_usd) |
| 1237 |
| 1238 self.assertEqual(0, self.execute_tasks()) |
| 1239 self.assertEqual(4, len(pub_sub_calls)) # RUNNING -> COMPLETED |
| 1240 |
| 1241 def test_bot_poll_http_500_but_bot_reapears_after_BOT_PING_TOLERANCE(self): |
| 1242 # A bot reaped a task, sleeps for over BOT_PING_TOLERANCE (2 minutes), then |
| 1243 # sends a ping. |
| 1244 # In the meantime the cron job ran, saw the job idle with 0 update for more |
| 1245 # than BOT_PING_TOLERANCE, re-enqueue it. |
| 1246 run_result = self._quick_reap( |
| 1247 expiration_ts=self.now+(3*task_result.BOT_PING_TOLERANCE), |
| 1248 nb_task=0) |
| 1249 to_run_key = task_to_run.request_to_task_to_run_key( |
| 1250 run_result.request_key.get()) |
| 1251 self.assertEqual(None, to_run_key.get().queue_number) |
| 1252 |
| 1253 # See _handle_dead_bot() with special case about non-idempotent task that |
| 1254 # were never updated. |
| 1255 now_1 = self.mock_now(self.now + task_result.BOT_PING_TOLERANCE, 1) |
| 1256 #logging.info('%s', [t.to_dict() for t in task_to_run.TaskToRun.query()]) |
| 1257 self.assertEqual(([], 1, 0), task_scheduler.cron_handle_bot_died('f.local')) |
| 1258 |
| 1259 # Now the task is available. Bot magically wakes up (let's say a laptop that |
| 1260 # went to sleep). The update is denied. |
| 1261 self.assertEqual( |
| 1262 None, |
| 1263 task_scheduler.bot_update_task( |
| 1264 run_result_key=run_result.key, |
| 1265 bot_id='localhost-second', |
| 1266 cipd_pins=None, |
| 1267 output='Foo1', |
| 1268 output_chunk_start=0, |
| 1269 exit_code=0, |
| 1270 duration=0.1, |
| 1271 hard_timeout=False, |
| 1272 io_timeout=False, |
| 1273 cost_usd=0.1, |
| 1274 outputs_ref=None, |
| 1275 performance_stats=None)) |
| 1276 # Confirm it is denied. |
| 1277 run_result = run_result.key.get() |
| 1278 self.assertEqual(State.BOT_DIED, run_result.state) |
| 1279 result_summary = run_result.result_summary_key.get() |
| 1280 self.assertEqual(State.PENDING, result_summary.state) |
| 1281 self.assertTrue(to_run_key.get().queue_number) |
| 1282 |
| 1148 def test_cron_handle_bot_died_same_bot_denied(self): | 1283 def test_cron_handle_bot_died_same_bot_denied(self): |
| 1149 # Test first retry, then success. | 1284 # Test first retry, then success. |
| 1150 now = utils.utcnow() | 1285 now = utils.utcnow() |
| 1151 request = self._gen_request( | 1286 request = self._gen_request( |
| 1152 properties={'idempotent': True}, | 1287 properties={'idempotent': True}, |
| 1153 created_ts=now, | 1288 created_ts=now, |
| 1154 expiration_ts=now+datetime.timedelta(seconds=600)) | 1289 expiration_ts=now+datetime.timedelta(seconds=600)) |
| 1155 task_request.init_new_request(request, True, None) | 1290 task_request.init_new_request(request, True, None) |
| 1156 _result_summary = task_scheduler.schedule_request(request, None) | 1291 _result_summary = task_scheduler.schedule_request(request, None) |
| 1157 self._register_bot(self.bot_dimensions) | 1292 self._register_bot(self.bot_dimensions) |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1248 (['1d69b9f088008911'], 0, 0), | 1383 (['1d69b9f088008911'], 0, 0), |
| 1249 task_scheduler.cron_handle_bot_died('f.local')) | 1384 task_scheduler.cron_handle_bot_died('f.local')) |
| 1250 | 1385 |
| 1251 | 1386 |
| 1252 if __name__ == '__main__': | 1387 if __name__ == '__main__': |
| 1253 if '-v' in sys.argv: | 1388 if '-v' in sys.argv: |
| 1254 unittest.TestCase.maxDiff = None | 1389 unittest.TestCase.maxDiff = None |
| 1255 logging.basicConfig( | 1390 logging.basicConfig( |
| 1256 level=logging.DEBUG if '-v' in sys.argv else logging.CRITICAL) | 1391 level=logging.DEBUG if '-v' in sys.argv else logging.CRITICAL) |
| 1257 unittest.main() | 1392 unittest.main() |
| OLD | NEW |