Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(684)

Side by Side Diff: appengine/swarming/server/task_scheduler_test.py

Issue 2914803004: Fix non-idempotent task that /poll reaped but returned HTTP 500. (Closed)
Patch Set: Add test case Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/swarming/server/task_scheduler.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2014 The LUCI Authors. All rights reserved. 2 # Copyright 2014 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file. 4 # that can be found in the LICENSE file.
5 5
6 import datetime 6 import datetime
7 import logging 7 import logging
8 import os 8 import os
9 import random 9 import random
10 import sys 10 import sys
(...skipping 187 matching lines...) Expand 10 before | Expand all | Expand 10 after
198 'modified_ts': self.now, 198 'modified_ts': self.now,
199 'outputs_ref': None, 199 'outputs_ref': None,
200 'server_versions': [u'v1a'], 200 'server_versions': [u'v1a'],
201 'started_ts': self.now, 201 'started_ts': self.now,
202 'state': State.RUNNING, 202 'state': State.RUNNING,
203 'try_number': 1, 203 'try_number': 1,
204 } 204 }
205 expected.update(**kwargs) 205 expected.update(**kwargs)
206 return expected 206 return expected
207 207
208 def _quick_schedule(self, dims, nb_task=1): 208 def _quick_schedule(self, nb_task=1, **kwargs):
209 """Schedules a task. 209 """Schedules a task.
210 210
211 nb_task is 1 if a GAE task queue rebuild-task-cache was enqueued. 211 nb_task is 1 if a GAE task queue rebuild-task-cache was enqueued.
212 """ 212 """
213 request = self._gen_request(properties={'dimensions': dims}) 213 request = self._gen_request(**kwargs)
214 result_summary = task_scheduler.schedule_request(request, None) 214 result_summary = task_scheduler.schedule_request(request, None)
215 self.assertEqual(nb_task, self.execute_tasks()) 215 self.assertEqual(nb_task, self.execute_tasks())
216 return result_summary 216 return result_summary
217 217
218 def _register_bot(self, bot_dimensions, nb_task=1): 218 def _register_bot(self, bot_dimensions, nb_task=1):
219 """Registers the bot so the task queues knows there's a worker than can run 219 """Registers the bot so the task queues knows there's a worker than can run
220 the task. 220 the task.
221 """ 221 """
222 bot_management.bot_event( 222 bot_management.bot_event(
223 'bot_connected', bot_dimensions[u'id'][0], '1.2.3.4', 'joe@localhost', 223 'bot_connected', bot_dimensions[u'id'][0], '1.2.3.4', 'joe@localhost',
224 bot_dimensions, {'state': 'real'}, '1234', False, None, None) 224 bot_dimensions, {'state': 'real'}, '1234', False, None, None)
225 task_queues.assert_bot(bot_dimensions) 225 task_queues.assert_bot(bot_dimensions)
226 self.assertEqual(nb_task, self.execute_tasks()) 226 self.assertEqual(nb_task, self.execute_tasks())
227 227
228 def _quick_reap(self, nb_task=1): 228 def _quick_reap(self, nb_task=1, **kwargs):
229 """Reaps a task.""" 229 """Reaps a task."""
230 self._quick_schedule({u'os': u'Windows-3.1.1', u'pool': u'default'}) 230 self._quick_schedule(**kwargs)
231 self._register_bot(self.bot_dimensions, nb_task=nb_task) 231 self._register_bot(self.bot_dimensions, nb_task=nb_task)
232 reaped_request, _, run_result = task_scheduler.bot_reap_task( 232 reaped_request, _, run_result = task_scheduler.bot_reap_task(
233 self.bot_dimensions, 'abc', None) 233 self.bot_dimensions, 'abc', None)
234 return run_result 234 return run_result
235 235
236 def test_all_apis_are_tested(self): 236 def test_all_apis_are_tested(self):
237 # Ensures there's a test for each public API. 237 # Ensures there's a test for each public API.
238 # TODO(maruel): Remove this once coverage is asserted. 238 # TODO(maruel): Remove this once coverage is asserted.
239 module = task_scheduler 239 module = task_scheduler
240 expected = set( 240 expected = set(
(...skipping 437 matching lines...) Expand 10 before | Expand all | Expand 10 after
678 exit_code=1, 678 exit_code=1,
679 failure=True, 679 failure=True,
680 id='1d69b9f088008911', 680 id='1d69b9f088008911',
681 started_ts=self.now, 681 started_ts=self.now,
682 state=State.COMPLETED), 682 state=State.COMPLETED),
683 ] 683 ]
684 self.assertEqual(expected, [t.to_dict() for t in run_results]) 684 self.assertEqual(expected, [t.to_dict() for t in run_results])
685 685
686 def test_schedule_request(self): 686 def test_schedule_request(self):
687 # It is tested indirectly in the other functions. 687 # It is tested indirectly in the other functions.
688 self.assertTrue( 688 self.assertTrue(self._quick_schedule())
689 self._quick_schedule({u'os': u'Windows-3.1.1', u'pool': u'default'}))
690 689
691 def mock_dim_acls(self, mapping): 690 def mock_dim_acls(self, mapping):
692 self.mock(config, 'settings', lambda: config_pb2.SettingsCfg( 691 self.mock(config, 'settings', lambda: config_pb2.SettingsCfg(
693 dimension_acls=config_pb2.DimensionACLs(entry=[ 692 dimension_acls=config_pb2.DimensionACLs(entry=[
694 config_pb2.DimensionACLs.Entry(dimension=[d], usable_by=g) 693 config_pb2.DimensionACLs.Entry(dimension=[d], usable_by=g)
695 for d, g in sorted(mapping.iteritems()) 694 for d, g in sorted(mapping.iteritems())
696 ]), 695 ]),
697 )) 696 ))
698 697
699 def test_schedule_request_forbidden_dim(self): 698 def test_schedule_request_forbidden_dim(self):
700 self.mock_dim_acls({u'pool:bad': u'noone'}) 699 self.mock_dim_acls({u'pool:bad': u'noone'})
701 self._quick_schedule({u'pool': u'good'}) 700 self._quick_schedule(properties={'dimensions': {u'pool': u'good'}})
702 with self.assertRaises(auth.AuthorizationError): 701 with self.assertRaises(auth.AuthorizationError):
703 self._quick_schedule({u'pool': u'bad'}) 702 self._quick_schedule(properties={'dimensions': {u'pool': u'bad'}})
704 703
705 def test_schedule_request_forbidden_dim_via_star(self): 704 def test_schedule_request_forbidden_dim_via_star(self):
706 self.mock_dim_acls({u'abc:*': u'noone'}) 705 self.mock_dim_acls({u'abc:*': u'noone'})
707 self._quick_schedule({u'pool': u'default'}) 706 self._quick_schedule(properties={'dimensions': {u'pool': u'default'}})
708 with self.assertRaises(auth.AuthorizationError): 707 with self.assertRaises(auth.AuthorizationError):
709 self._quick_schedule({u'pool': u'default', u'abc': u'blah'}) 708 self._quick_schedule(
709 properties={'dimensions': {u'pool': u'default', u'abc': u'blah'}})
710 710
711 def test_schedule_request_id_without_pool(self): 711 def test_schedule_request_id_without_pool(self):
712 self.mock_dim_acls({u'pool:good': u'mocked'}) 712 self.mock_dim_acls({u'pool:good': u'mocked'})
713 with self.assertRaises(auth.AuthorizationError): 713 with self.assertRaises(auth.AuthorizationError):
714 self._quick_schedule({u'id': u'abc'}) 714 self._quick_schedule(properties={'dimensions': {u'id': u'abc'}})
715 auth_testing.mock_is_admin(self) 715 auth_testing.mock_is_admin(self)
716 self._quick_schedule({u'id': u'abc'}, nb_task=0) 716 self._quick_schedule(properties={'dimensions': {u'id': u'abc'}}, nb_task=0)
717 717
718 def test_schedule_request_id_and_pool(self): 718 def test_schedule_request_id_and_pool(self):
719 self.mock_dim_acls({u'pool:good': u'mocked'}) 719 self.mock_dim_acls({u'pool:good': u'mocked'})
720 self.mock_dim_acls({u'pool:bad': u'unknown'}) 720 self.mock_dim_acls({u'pool:bad': u'unknown'})
721 721
722 def mocked_is_group_member(group, ident): 722 def mocked_is_group_member(group, ident):
723 if group == 'mocked' and ident == auth_testing.DEFAULT_MOCKED_IDENTITY: 723 if group == 'mocked' and ident == auth_testing.DEFAULT_MOCKED_IDENTITY:
724 return True 724 return True
725 return False 725 return False
726 self.mock(auth, 'is_group_member', mocked_is_group_member) 726 self.mock(auth, 'is_group_member', mocked_is_group_member)
727 727
728 self._quick_schedule({u'id': u'abc', u'pool': u'unknown'}, nb_task=0) 728 self._quick_schedule(
729 self._quick_schedule({u'id': u'abc', u'pool': u'good'}, nb_task=0) 729 properties={'dimensions': {u'id': u'abc', u'pool': u'unknown'}},
730 nb_task=0)
731 self._quick_schedule(
732 properties={'dimensions': {u'id': u'abc', u'pool': u'good'}}, nb_task=0)
730 with self.assertRaises(auth.AuthorizationError): 733 with self.assertRaises(auth.AuthorizationError):
731 self._quick_schedule({u'id': u'abc', u'pool': u'bad'}) 734 self._quick_schedule(
735 properties={'dimensions': {u'id': u'abc', u'pool': u'bad'}})
732 736
733 def test_bot_update_task(self): 737 def test_bot_update_task(self):
734 run_result = self._quick_reap(nb_task=0) 738 run_result = self._quick_reap(nb_task=0)
735 self.assertEqual( 739 self.assertEqual(
736 task_result.State.RUNNING, 740 task_result.State.RUNNING,
737 task_scheduler.bot_update_task( 741 task_scheduler.bot_update_task(
738 run_result_key=run_result.key, 742 run_result_key=run_result.key,
739 bot_id='localhost', 743 bot_id='localhost',
740 cipd_pins=None, 744 cipd_pins=None,
741 output='hi', 745 output='hi',
(...skipping 396 matching lines...) Expand 10 before | Expand all | Expand 10 after
1138 properties_hash=request.properties_hash.encode('hex'), 1142 properties_hash=request.properties_hash.encode('hex'),
1139 started_ts=now_2, 1143 started_ts=now_2,
1140 state=task_result.State.COMPLETED, 1144 state=task_result.State.COMPLETED,
1141 try_number=2) 1145 try_number=2)
1142 self.assertEqual(expected, run_result.result_summary_key.get().to_dict()) 1146 self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
1143 self.assertEqual(0.1, run_result.key.get().cost_usd) 1147 self.assertEqual(0.1, run_result.key.get().cost_usd)
1144 1148
1145 self.assertEqual(0, self.execute_tasks()) 1149 self.assertEqual(0, self.execute_tasks())
1146 self.assertEqual(4, len(pub_sub_calls)) # RUNNING -> COMPLETED 1150 self.assertEqual(4, len(pub_sub_calls)) # RUNNING -> COMPLETED
1147 1151
1152 def test_cron_handle_bot_died_no_update_not_idempotent(self):
1153 # A bot reaped a task but the handler returned HTTP 500, leaving the task in
1154 # a lingering state.
1155 pub_sub_calls = self.mock_pub_sub()
1156
1157 # Test first retry, then success.
1158 now = utils.utcnow()
1159 request = self._gen_request(
1160 created_ts=now,
1161 expiration_ts=now+datetime.timedelta(seconds=600),
1162 pubsub_topic='projects/abc/topics/def')
1163 task_request.init_new_request(request, True, None)
1164 _result_summary = task_scheduler.schedule_request(request, None)
1165 self.assertEqual(1, self.execute_tasks())
1166 self.assertEqual(0, len(pub_sub_calls))
1167 self._register_bot(self.bot_dimensions, nb_task=0)
1168 request, _, run_result = task_scheduler.bot_reap_task(
1169 self.bot_dimensions, 'abc', None)
1170 self.assertEqual(
1171 task_result.State.RUNNING, run_result.result_summary_key.get().state)
1172 self.assertEqual(1, self.execute_tasks())
1173 self.assertEqual(1, len(pub_sub_calls)) # PENDING -> RUNNING
1174 self.assertEqual(1, run_result.try_number)
1175 self.assertEqual(task_result.State.RUNNING, run_result.state)
1176 now_1 = self.mock_now(self.now + task_result.BOT_PING_TOLERANCE, 1)
1177 self.assertEqual(([], 1, 0), task_scheduler.cron_handle_bot_died('f.local'))
1178 self.assertEqual(1, self.execute_tasks())
1179 self.assertEqual(2, len(pub_sub_calls)) # RUNNING -> PENDING
1180
1181 # Refresh and compare:
1182 expected = self._gen_result_summary_reaped(
1183 costs_usd=[0.],
1184 id='1d69b9f088008910',
1185 modified_ts=now_1,
1186 state=task_result.State.PENDING,
1187 try_number=1)
1188 self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
1189 expected = self._gen_run_result(
1190 abandoned_ts=now_1,
1191 id='1d69b9f088008911',
1192 internal_failure=True,
1193 modified_ts=now_1,
1194 state=task_result.State.BOT_DIED)
1195 self.assertEqual(expected, run_result.key.get().to_dict())
1196
1197 # Task was retried.
1198 now_2 = self.mock_now(self.now + task_result.BOT_PING_TOLERANCE, 2)
1199 bot_dimensions_second = self.bot_dimensions.copy()
1200 bot_dimensions_second[u'id'] = [u'localhost-second']
1201 self._register_bot(bot_dimensions_second, nb_task=0)
1202 _request, _, run_result = task_scheduler.bot_reap_task(
1203 bot_dimensions_second, 'abc', None)
1204 self.assertEqual(1, self.execute_tasks())
1205 self.assertEqual(3, len(pub_sub_calls)) # PENDING -> RUNNING
1206 logging.info('%s', [t.to_dict() for t in task_to_run.TaskToRun.query()])
1207 self.assertEqual(2, run_result.try_number)
1208 self.assertEqual(
1209 task_result.State.COMPLETED,
1210 task_scheduler.bot_update_task(
1211 run_result_key=run_result.key,
1212 bot_id='localhost-second',
1213 cipd_pins=None,
1214 output='Foo1',
1215 output_chunk_start=0,
1216 exit_code=0,
1217 duration=0.1,
1218 hard_timeout=False,
1219 io_timeout=False,
1220 cost_usd=0.1,
1221 outputs_ref=None,
1222 performance_stats=None))
1223 expected = self._gen_result_summary_reaped(
1224 bot_dimensions=bot_dimensions_second,
1225 bot_id=u'localhost-second',
1226 completed_ts=now_2,
1227 costs_usd=[0., 0.1],
1228 duration=0.1,
1229 exit_code=0,
1230 id='1d69b9f088008910',
1231 modified_ts=now_2,
1232 started_ts=now_2,
1233 state=task_result.State.COMPLETED,
1234 try_number=2)
1235 self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
1236 self.assertEqual(0.1, run_result.key.get().cost_usd)
1237
1238 self.assertEqual(0, self.execute_tasks())
1239 self.assertEqual(4, len(pub_sub_calls)) # RUNNING -> COMPLETED
1240
1241 def test_bot_poll_http_500_but_bot_reapears_after_BOT_PING_TOLERANCE(self):
1242 # A bot reaped a task, sleeps for over BOT_PING_TOLERANCE (2 minutes), then
1243 # sends a ping.
1244 # In the meantime the cron job ran, saw the job idle with 0 update for more
1245 # than BOT_PING_TOLERANCE, re-enqueue it.
1246 run_result = self._quick_reap(
1247 expiration_ts=self.now+(3*task_result.BOT_PING_TOLERANCE),
1248 nb_task=0)
1249 to_run_key = task_to_run.request_to_task_to_run_key(
1250 run_result.request_key.get())
1251 self.assertEqual(None, to_run_key.get().queue_number)
1252
1253 # See _handle_dead_bot() with special case about non-idempotent task that
1254 # were never updated.
1255 now_1 = self.mock_now(self.now + task_result.BOT_PING_TOLERANCE, 1)
1256 #logging.info('%s', [t.to_dict() for t in task_to_run.TaskToRun.query()])
1257 self.assertEqual(([], 1, 0), task_scheduler.cron_handle_bot_died('f.local'))
1258
1259 # Now the task is available. Bot magically wakes up (let's say a laptop that
1260 # went to sleep). The update is denied.
1261 self.assertEqual(
1262 None,
1263 task_scheduler.bot_update_task(
1264 run_result_key=run_result.key,
1265 bot_id='localhost-second',
1266 cipd_pins=None,
1267 output='Foo1',
1268 output_chunk_start=0,
1269 exit_code=0,
1270 duration=0.1,
1271 hard_timeout=False,
1272 io_timeout=False,
1273 cost_usd=0.1,
1274 outputs_ref=None,
1275 performance_stats=None))
1276 # Confirm it is denied.
1277 run_result = run_result.key.get()
1278 self.assertEqual(State.BOT_DIED, run_result.state)
1279 result_summary = run_result.result_summary_key.get()
1280 self.assertEqual(State.PENDING, result_summary.state)
1281 self.assertTrue(to_run_key.get().queue_number)
1282
1148 def test_cron_handle_bot_died_same_bot_denied(self): 1283 def test_cron_handle_bot_died_same_bot_denied(self):
1149 # Test first retry, then success. 1284 # Test first retry, then success.
1150 now = utils.utcnow() 1285 now = utils.utcnow()
1151 request = self._gen_request( 1286 request = self._gen_request(
1152 properties={'idempotent': True}, 1287 properties={'idempotent': True},
1153 created_ts=now, 1288 created_ts=now,
1154 expiration_ts=now+datetime.timedelta(seconds=600)) 1289 expiration_ts=now+datetime.timedelta(seconds=600))
1155 task_request.init_new_request(request, True, None) 1290 task_request.init_new_request(request, True, None)
1156 _result_summary = task_scheduler.schedule_request(request, None) 1291 _result_summary = task_scheduler.schedule_request(request, None)
1157 self._register_bot(self.bot_dimensions) 1292 self._register_bot(self.bot_dimensions)
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
1248 (['1d69b9f088008911'], 0, 0), 1383 (['1d69b9f088008911'], 0, 0),
1249 task_scheduler.cron_handle_bot_died('f.local')) 1384 task_scheduler.cron_handle_bot_died('f.local'))
1250 1385
1251 1386
1252 if __name__ == '__main__': 1387 if __name__ == '__main__':
1253 if '-v' in sys.argv: 1388 if '-v' in sys.argv:
1254 unittest.TestCase.maxDiff = None 1389 unittest.TestCase.maxDiff = None
1255 logging.basicConfig( 1390 logging.basicConfig(
1256 level=logging.DEBUG if '-v' in sys.argv else logging.CRITICAL) 1391 level=logging.DEBUG if '-v' in sys.argv else logging.CRITICAL)
1257 unittest.main() 1392 unittest.main()
OLDNEW
« no previous file with comments | « appengine/swarming/server/task_scheduler.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698