| Index: appengine/swarming/server/task_scheduler_test.py
|
| diff --git a/appengine/swarming/server/task_scheduler_test.py b/appengine/swarming/server/task_scheduler_test.py
|
| index c9884714a5e4d4ccc2901f2696017db5a3ccb449..93e8e804458142b4746efde8be21e13d0158bba3 100755
|
| --- a/appengine/swarming/server/task_scheduler_test.py
|
| +++ b/appengine/swarming/server/task_scheduler_test.py
|
| @@ -205,12 +205,12 @@ class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
|
| expected.update(**kwargs)
|
| return expected
|
|
|
| - def _quick_schedule(self, dims, nb_task=1):
|
| + def _quick_schedule(self, nb_task=1, **kwargs):
|
| """Schedules a task.
|
|
|
| nb_task is 1 if a GAE task queue rebuild-task-cache was enqueued.
|
| """
|
| - request = self._gen_request(properties={'dimensions': dims})
|
| + request = self._gen_request(**kwargs)
|
| result_summary = task_scheduler.schedule_request(request, None)
|
| self.assertEqual(nb_task, self.execute_tasks())
|
| return result_summary
|
| @@ -225,9 +225,9 @@ class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
|
| task_queues.assert_bot(bot_dimensions)
|
| self.assertEqual(nb_task, self.execute_tasks())
|
|
|
| - def _quick_reap(self, nb_task=1):
|
| + def _quick_reap(self, nb_task=1, **kwargs):
|
| """Reaps a task."""
|
| - self._quick_schedule({u'os': u'Windows-3.1.1', u'pool': u'default'})
|
| + self._quick_schedule(**kwargs)
|
| self._register_bot(self.bot_dimensions, nb_task=nb_task)
|
| reaped_request, _, run_result = task_scheduler.bot_reap_task(
|
| self.bot_dimensions, 'abc', None)
|
| @@ -685,8 +685,7 @@ class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
|
|
|
| def test_schedule_request(self):
|
| # It is tested indirectly in the other functions.
|
| - self.assertTrue(
|
| - self._quick_schedule({u'os': u'Windows-3.1.1', u'pool': u'default'}))
|
| + self.assertTrue(self._quick_schedule())
|
|
|
| def mock_dim_acls(self, mapping):
|
| self.mock(config, 'settings', lambda: config_pb2.SettingsCfg(
|
| @@ -698,22 +697,23 @@ class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
|
|
|
| def test_schedule_request_forbidden_dim(self):
|
| self.mock_dim_acls({u'pool:bad': u'noone'})
|
| - self._quick_schedule({u'pool': u'good'})
|
| + self._quick_schedule(properties={'dimensions': {u'pool': u'good'}})
|
| with self.assertRaises(auth.AuthorizationError):
|
| - self._quick_schedule({u'pool': u'bad'})
|
| + self._quick_schedule(properties={'dimensions': {u'pool': u'bad'}})
|
|
|
| def test_schedule_request_forbidden_dim_via_star(self):
|
| self.mock_dim_acls({u'abc:*': u'noone'})
|
| - self._quick_schedule({u'pool': u'default'})
|
| + self._quick_schedule(properties={'dimensions': {u'pool': u'default'}})
|
| with self.assertRaises(auth.AuthorizationError):
|
| - self._quick_schedule({u'pool': u'default', u'abc': u'blah'})
|
| + self._quick_schedule(
|
| + properties={'dimensions': {u'pool': u'default', u'abc': u'blah'}})
|
|
|
| def test_schedule_request_id_without_pool(self):
|
| self.mock_dim_acls({u'pool:good': u'mocked'})
|
| with self.assertRaises(auth.AuthorizationError):
|
| - self._quick_schedule({u'id': u'abc'})
|
| + self._quick_schedule(properties={'dimensions': {u'id': u'abc'}})
|
| auth_testing.mock_is_admin(self)
|
| - self._quick_schedule({u'id': u'abc'}, nb_task=0)
|
| + self._quick_schedule(properties={'dimensions': {u'id': u'abc'}}, nb_task=0)
|
|
|
| def test_schedule_request_id_and_pool(self):
|
| self.mock_dim_acls({u'pool:good': u'mocked'})
|
| @@ -725,10 +725,14 @@ class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
|
| return False
|
| self.mock(auth, 'is_group_member', mocked_is_group_member)
|
|
|
| - self._quick_schedule({u'id': u'abc', u'pool': u'unknown'}, nb_task=0)
|
| - self._quick_schedule({u'id': u'abc', u'pool': u'good'}, nb_task=0)
|
| + self._quick_schedule(
|
| + properties={'dimensions': {u'id': u'abc', u'pool': u'unknown'}},
|
| + nb_task=0)
|
| + self._quick_schedule(
|
| + properties={'dimensions': {u'id': u'abc', u'pool': u'good'}}, nb_task=0)
|
| with self.assertRaises(auth.AuthorizationError):
|
| - self._quick_schedule({u'id': u'abc', u'pool': u'bad'})
|
| + self._quick_schedule(
|
| + properties={'dimensions': {u'id': u'abc', u'pool': u'bad'}})
|
|
|
| def test_bot_update_task(self):
|
| run_result = self._quick_reap(nb_task=0)
|
| @@ -1145,6 +1149,137 @@ class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
|
| self.assertEqual(0, self.execute_tasks())
|
| self.assertEqual(4, len(pub_sub_calls)) # RUNNING -> COMPLETED
|
|
|
| + def test_cron_handle_bot_died_no_update_not_idempotent(self):
|
| + # A bot reaped a task but the handler returned HTTP 500, leaving the task in
|
| + # a lingering state.
|
| + pub_sub_calls = self.mock_pub_sub()
|
| +
|
| + # Test first retry, then success.
|
| + now = utils.utcnow()
|
| + request = self._gen_request(
|
| + created_ts=now,
|
| + expiration_ts=now+datetime.timedelta(seconds=600),
|
| + pubsub_topic='projects/abc/topics/def')
|
| + task_request.init_new_request(request, True, None)
|
| + _result_summary = task_scheduler.schedule_request(request, None)
|
| + self.assertEqual(1, self.execute_tasks())
|
| + self.assertEqual(0, len(pub_sub_calls))
|
| + self._register_bot(self.bot_dimensions, nb_task=0)
|
| + request, _, run_result = task_scheduler.bot_reap_task(
|
| + self.bot_dimensions, 'abc', None)
|
| + self.assertEqual(
|
| + task_result.State.RUNNING, run_result.result_summary_key.get().state)
|
| + self.assertEqual(1, self.execute_tasks())
|
| + self.assertEqual(1, len(pub_sub_calls)) # PENDING -> RUNNING
|
| + self.assertEqual(1, run_result.try_number)
|
| + self.assertEqual(task_result.State.RUNNING, run_result.state)
|
| + now_1 = self.mock_now(self.now + task_result.BOT_PING_TOLERANCE, 1)
|
| + self.assertEqual(([], 1, 0), task_scheduler.cron_handle_bot_died('f.local'))
|
| + self.assertEqual(1, self.execute_tasks())
|
| + self.assertEqual(2, len(pub_sub_calls)) # RUNNING -> PENDING
|
| +
|
| + # Refresh and compare:
|
| + expected = self._gen_result_summary_reaped(
|
| + costs_usd=[0.],
|
| + id='1d69b9f088008910',
|
| + modified_ts=now_1,
|
| + state=task_result.State.PENDING,
|
| + try_number=1)
|
| + self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
|
| + expected = self._gen_run_result(
|
| + abandoned_ts=now_1,
|
| + id='1d69b9f088008911',
|
| + internal_failure=True,
|
| + modified_ts=now_1,
|
| + state=task_result.State.BOT_DIED)
|
| + self.assertEqual(expected, run_result.key.get().to_dict())
|
| +
|
| + # Task was retried.
|
| + now_2 = self.mock_now(self.now + task_result.BOT_PING_TOLERANCE, 2)
|
| + bot_dimensions_second = self.bot_dimensions.copy()
|
| + bot_dimensions_second[u'id'] = [u'localhost-second']
|
| + self._register_bot(bot_dimensions_second, nb_task=0)
|
| + _request, _, run_result = task_scheduler.bot_reap_task(
|
| + bot_dimensions_second, 'abc', None)
|
| + self.assertEqual(1, self.execute_tasks())
|
| + self.assertEqual(3, len(pub_sub_calls)) # PENDING -> RUNNING
|
| + logging.info('%s', [t.to_dict() for t in task_to_run.TaskToRun.query()])
|
| + self.assertEqual(2, run_result.try_number)
|
| + self.assertEqual(
|
| + task_result.State.COMPLETED,
|
| + task_scheduler.bot_update_task(
|
| + run_result_key=run_result.key,
|
| + bot_id='localhost-second',
|
| + cipd_pins=None,
|
| + output='Foo1',
|
| + output_chunk_start=0,
|
| + exit_code=0,
|
| + duration=0.1,
|
| + hard_timeout=False,
|
| + io_timeout=False,
|
| + cost_usd=0.1,
|
| + outputs_ref=None,
|
| + performance_stats=None))
|
| + expected = self._gen_result_summary_reaped(
|
| + bot_dimensions=bot_dimensions_second,
|
| + bot_id=u'localhost-second',
|
| + completed_ts=now_2,
|
| + costs_usd=[0., 0.1],
|
| + duration=0.1,
|
| + exit_code=0,
|
| + id='1d69b9f088008910',
|
| + modified_ts=now_2,
|
| + started_ts=now_2,
|
| + state=task_result.State.COMPLETED,
|
| + try_number=2)
|
| + self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
|
| + self.assertEqual(0.1, run_result.key.get().cost_usd)
|
| +
|
| + self.assertEqual(0, self.execute_tasks())
|
| + self.assertEqual(4, len(pub_sub_calls)) # RUNNING -> COMPLETED
|
| +
|
| + def test_bot_poll_http_500_but_bot_reapears_after_BOT_PING_TOLERANCE(self):
|
| + # A bot reaped a task, sleeps for over BOT_PING_TOLERANCE (2 minutes), then
|
| + # sends a ping.
|
| + # In the meantime the cron job ran, saw the job idle with 0 update for more
|
| + # than BOT_PING_TOLERANCE, re-enqueue it.
|
| + run_result = self._quick_reap(
|
| + expiration_ts=self.now+(3*task_result.BOT_PING_TOLERANCE),
|
| + nb_task=0)
|
| + to_run_key = task_to_run.request_to_task_to_run_key(
|
| + run_result.request_key.get())
|
| + self.assertEqual(None, to_run_key.get().queue_number)
|
| +
|
| + # See _handle_dead_bot() with special case about non-idempotent task that
|
| + # were never updated.
|
| + now_1 = self.mock_now(self.now + task_result.BOT_PING_TOLERANCE, 1)
|
| + #logging.info('%s', [t.to_dict() for t in task_to_run.TaskToRun.query()])
|
| + self.assertEqual(([], 1, 0), task_scheduler.cron_handle_bot_died('f.local'))
|
| +
|
| + # Now the task is available. Bot magically wakes up (let's say a laptop that
|
| + # went to sleep). The update is denied.
|
| + self.assertEqual(
|
| + None,
|
| + task_scheduler.bot_update_task(
|
| + run_result_key=run_result.key,
|
| + bot_id='localhost-second',
|
| + cipd_pins=None,
|
| + output='Foo1',
|
| + output_chunk_start=0,
|
| + exit_code=0,
|
| + duration=0.1,
|
| + hard_timeout=False,
|
| + io_timeout=False,
|
| + cost_usd=0.1,
|
| + outputs_ref=None,
|
| + performance_stats=None))
|
| + # Confirm it is denied.
|
| + run_result = run_result.key.get()
|
| + self.assertEqual(State.BOT_DIED, run_result.state)
|
| + result_summary = run_result.result_summary_key.get()
|
| + self.assertEqual(State.PENDING, result_summary.state)
|
| + self.assertTrue(to_run_key.get().queue_number)
|
| +
|
| def test_cron_handle_bot_died_same_bot_denied(self):
|
| # Test first retry, then success.
|
| now = utils.utcnow()
|
|
|