Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(207)

Side by Side Diff: appengine/swarming/server/task_to_run_test.py

Issue 2832203002: task_queues: Add more scaffolding (Closed)
Patch Set: Addressed comments Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/swarming/server/task_queues.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2014 The LUCI Authors. All rights reserved. 2 # Copyright 2014 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file. 4 # that can be found in the LICENSE file.
5 5
6 import datetime 6 import datetime
7 import hashlib 7 import hashlib
8 import logging 8 import logging
9 import os 9 import os
10 import random 10 import random
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
48 'priority': 50, 48 'priority': 50,
49 'properties': task_request.TaskProperties(**props), 49 'properties': task_request.TaskProperties(**props),
50 'expiration_ts': now + datetime.timedelta(seconds=60), 50 'expiration_ts': now + datetime.timedelta(seconds=60),
51 'tags': [u'tag:1'], 51 'tags': [u'tag:1'],
52 'user': 'Jesus', 52 'user': 'Jesus',
53 } 53 }
54 args.update(kwargs) 54 args.update(kwargs)
55 return task_request.TaskRequest(**args) 55 return task_request.TaskRequest(**args)
56 56
57 57
58 def mkreq(req):
59 # This function fits the old style where TaskRequest was stored first, before
60 # TaskToRun and TaskResultSummary.
61 task_request.init_new_request(req, True, None)
62 req.key = task_request.new_request_key()
63 req.put()
64 return req
65
66
67 def _task_to_run_to_dict(i): 58 def _task_to_run_to_dict(i):
68 """Converts the queue_number to hex for easier testing.""" 59 """Converts the queue_number to hex for easier testing."""
69 out = i.to_dict() 60 out = i.to_dict()
70 # Consistent formatting makes it easier to reason about. 61 # Consistent formatting makes it easier to reason about.
71 out['queue_number'] = '0x%016x' % out['queue_number'] 62 out['queue_number'] = '0x%016x' % out['queue_number']
72 return out 63 return out
73 64
74 65
75 def _yield_next_available_task_to_dispatch(bot_dimensions, deadline): 66 def _yield_next_available_task_to_dispatch(bot_dimensions, deadline):
76 bot_management.bot_event( 67 bot_management.bot_event(
77 'bot_connected', bot_dimensions[u'id'][0], '1.2.3.4', 'joe@localhost', 68 'bot_connected', bot_dimensions[u'id'][0], '1.2.3.4', 'joe@localhost',
78 bot_dimensions, {'state': 'real'}, '1234', False, None, None) 69 bot_dimensions, {'state': 'real'}, '1234', False, None, None)
79 task_queues.assert_bot(bot_dimensions) 70 task_queues.assert_bot(bot_dimensions)
80 return [ 71 return [
81 _task_to_run_to_dict(to_run) 72 _task_to_run_to_dict(to_run)
82 for _request, to_run in 73 for _request, to_run in
83 task_to_run.yield_next_available_task_to_dispatch( 74 task_to_run.yield_next_available_task_to_dispatch(
84 bot_dimensions, deadline) 75 bot_dimensions, deadline)
85 ] 76 ]
86 77
87 78
88 def _gen_new_task_to_run(**kwargs):
89 """Returns a TaskToRun saved in the DB."""
90 request = mkreq(_gen_request(**kwargs))
91 to_run = task_to_run.new_task_to_run(request)
92 to_run.put()
93 task_queues.assert_task(request)
94 return to_run
95
96
97 def _hash_dimensions(dimensions): 79 def _hash_dimensions(dimensions):
98 return task_to_run._hash_dimensions(utils.encode_to_json(dimensions)) 80 return task_to_run._hash_dimensions(utils.encode_to_json(dimensions))
99 81
100 82
101 class TestCase(test_case.TestCase): 83 class TestCase(test_case.TestCase):
102 def setUp(self): 84 def setUp(self):
103 super(TestCase, self).setUp() 85 super(TestCase, self).setUp()
104 auth_testing.mock_get_current_identity(self) 86 auth_testing.mock_get_current_identity(self)
105 87
106 88
(...skipping 160 matching lines...) Expand 10 before | Expand all | Expand 10 after
267 249
268 250
269 class TaskToRunApiTest(TestCase): 251 class TaskToRunApiTest(TestCase):
270 def setUp(self): 252 def setUp(self):
271 super(TaskToRunApiTest, self).setUp() 253 super(TaskToRunApiTest, self).setUp()
272 self.now = datetime.datetime(2014, 01, 02, 03, 04, 05, 06) 254 self.now = datetime.datetime(2014, 01, 02, 03, 04, 05, 06)
273 self.mock_now(self.now) 255 self.mock_now(self.now)
274 # The default expiration_secs for _gen_request(). 256 # The default expiration_secs for _gen_request().
275 self.expiration_ts = self.now + datetime.timedelta(seconds=60) 257 self.expiration_ts = self.now + datetime.timedelta(seconds=60)
276 258
259 def mkreq(self, req, nb_tasks=0):
260 """Stores a new initialized TaskRequest.
261
262 nb_task is 1 or 0. It is 1 when the request.properties.dimensions was new
263 (unseen before) and 0 otherwise.
264 """
265 task_request.init_new_request(req, True, None)
266 task_queues.assert_task(req)
267 self.assertEqual(nb_tasks, self.execute_tasks())
268 req.key = task_request.new_request_key()
269 req.put()
270 return req
271
272 def _gen_new_task_to_run(self, **kwargs):
273 """Returns a TaskToRun saved in the DB."""
274 request = self.mkreq(_gen_request(**kwargs))
275 to_run = task_to_run.new_task_to_run(request)
276 to_run.put()
277 return to_run
278
277 def test_all_apis_are_tested(self): 279 def test_all_apis_are_tested(self):
278 actual = frozenset(i[5:] for i in dir(self) if i.startswith('test_')) 280 actual = frozenset(i[5:] for i in dir(self) if i.startswith('test_'))
279 # Contains the list of all public APIs. 281 # Contains the list of all public APIs.
280 expected = frozenset( 282 expected = frozenset(
281 i for i in dir(task_to_run) 283 i for i in dir(task_to_run)
282 if i[0] != '_' and hasattr(getattr(task_to_run, i), 'func_name')) 284 if i[0] != '_' and hasattr(getattr(task_to_run, i), 'func_name'))
283 missing = expected - actual 285 missing = expected - actual
284 self.assertFalse(missing) 286 self.assertFalse(missing)
285 287
286 def test_task_to_run_key_to_request_key(self): 288 def test_task_to_run_key_to_request_key(self):
287 request = mkreq(_gen_request()) 289 request = self.mkreq(_gen_request())
288 task_key = task_to_run.request_to_task_to_run_key(request) 290 task_key = task_to_run.request_to_task_to_run_key(request)
289 actual = task_to_run.task_to_run_key_to_request_key(task_key) 291 actual = task_to_run.task_to_run_key_to_request_key(task_key)
290 self.assertEqual(request.key, actual) 292 self.assertEqual(request.key, actual)
291 293
292 def test_request_to_task_to_run_key(self): 294 def test_request_to_task_to_run_key(self):
293 self.mock(random, 'getrandbits', lambda _: 0x88) 295 self.mock(random, 'getrandbits', lambda _: 0x88)
294 request = mkreq(_gen_request()) 296 request = self.mkreq(_gen_request())
295 # Ensures that the hash value is constant for the same input. 297 # Ensures that the hash value is constant for the same input.
296 self.assertEqual( 298 self.assertEqual(
297 ndb.Key('TaskRequest', 0x7e296460f77ff77e, 'TaskToRun', 3420117132), 299 ndb.Key('TaskRequest', 0x7e296460f77ff77e, 'TaskToRun', 3420117132),
298 task_to_run.request_to_task_to_run_key(request)) 300 task_to_run.request_to_task_to_run_key(request))
299 301
300 def test_validate_to_run_key(self): 302 def test_validate_to_run_key(self):
301 request = mkreq(_gen_request()) 303 request = self.mkreq(_gen_request())
302 task_key = task_to_run.request_to_task_to_run_key(request) 304 task_key = task_to_run.request_to_task_to_run_key(request)
303 task_to_run.validate_to_run_key(task_key) 305 task_to_run.validate_to_run_key(task_key)
304 with self.assertRaises(ValueError): 306 with self.assertRaises(ValueError):
305 task_to_run.validate_to_run_key(ndb.Key('TaskRequest', 1, 'TaskToRun', 1)) 307 task_to_run.validate_to_run_key(ndb.Key('TaskRequest', 1, 'TaskToRun', 1))
306 308
307 def test_gen_queue_number(self): 309 def test_gen_queue_number(self):
308 # tuples of (input, expected). 310 # tuples of (input, expected).
309 # 2**47 / 365 / 24 / 60 / 60 / 1000. = 4462.756 311 # 2**47 / 365 / 24 / 60 / 60 / 1000. = 4462.756
310 data = [ 312 data = [
311 (('1970-01-01 00:00:00.000', 0), '0x0000000000000000'), 313 (('1970-01-01 00:00:00.000', 0), '0x0000000000000000'),
(...skipping 24 matching lines...) Expand all
336 data = _gen_request( 338 data = _gen_request(
337 properties={ 339 properties={
338 'command': [u'command1', u'arg1'], 340 'command': [u'command1', u'arg1'],
339 'dimensions': request_dimensions, 341 'dimensions': request_dimensions,
340 'env': {u'foo': u'bar'}, 342 'env': {u'foo': u'bar'},
341 'execution_timeout_secs': 30, 343 'execution_timeout_secs': 30,
342 }, 344 },
343 priority=20, 345 priority=20,
344 created_ts=now, 346 created_ts=now,
345 expiration_ts=now+datetime.timedelta(seconds=31)) 347 expiration_ts=now+datetime.timedelta(seconds=31))
346 task_to_run.new_task_to_run(mkreq(data)).put() 348 task_to_run.new_task_to_run(self.mkreq(data)).put()
347 349
348 # Create a second with higher priority. 350 # Create a second with higher priority.
349 self.mock(random, 'getrandbits', lambda _: 0x23) 351 self.mock(random, 'getrandbits', lambda _: 0x23)
350 data = _gen_request( 352 data = _gen_request(
351 properties={ 353 properties={
352 'command': [u'command1', u'arg1'], 354 'command': [u'command1', u'arg1'],
353 'dimensions': request_dimensions, 355 'dimensions': request_dimensions,
354 'env': {u'foo': u'bar'}, 356 'env': {u'foo': u'bar'},
355 'execution_timeout_secs': 30, 357 'execution_timeout_secs': 30,
356 }, 358 },
357 priority=10, 359 priority=10,
358 created_ts=now, 360 created_ts=now,
359 expiration_ts=now+datetime.timedelta(seconds=31)) 361 expiration_ts=now+datetime.timedelta(seconds=31))
360 task_to_run.new_task_to_run(mkreq(data)).put() 362 task_to_run.new_task_to_run(self.mkreq(data, nb_tasks=0)).put()
361 363
362 expected = [ 364 expected = [
363 { 365 {
364 'dimensions_hash': _hash_dimensions(request_dimensions), 366 'dimensions_hash': _hash_dimensions(request_dimensions),
365 'expiration_ts': self.now + datetime.timedelta(seconds=31), 367 'expiration_ts': self.now + datetime.timedelta(seconds=31),
366 'request_key': '0x7e296460f77ffdce', 368 'request_key': '0x7e296460f77ffdce',
367 # Lower priority value means higher priority. 369 # Lower priority value means higher priority.
368 'queue_number': '0x00060dc5849f1346', 370 'queue_number': '0x00060dc5849f1346',
369 }, 371 },
370 { 372 {
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
413 415
414 data_false = ( 416 data_false = (
415 ({'os': 'amiga'}, {'os': ['Win', 'Win-3.1']}), 417 ({'os': 'amiga'}, {'os': ['Win', 'Win-3.1']}),
416 ) 418 )
417 for request_dimensions, bot_dimensions in data_false: 419 for request_dimensions, bot_dimensions in data_false:
418 self.assertEqual( 420 self.assertEqual(
419 False, 421 False,
420 task_to_run.match_dimensions(request_dimensions, bot_dimensions)) 422 task_to_run.match_dimensions(request_dimensions, bot_dimensions))
421 423
422 def test_yield_next_available_task_to_dispatch_none(self): 424 def test_yield_next_available_task_to_dispatch_none(self):
423 _gen_new_task_to_run( 425 self._gen_new_task_to_run(
424 properties={ 426 properties={
425 'dimensions': {u'os': u'Windows-3.1.1', u'pool': u'default'}, 427 'dimensions': {u'os': u'Windows-3.1.1', u'pool': u'default'},
426 }) 428 })
427 # Bot declares no dimensions, so it will fail to match. 429 # Bot declares no dimensions, so it will fail to match.
428 bot_dimensions = {u'id': [u'bot1'], u'pool': [u'default']} 430 bot_dimensions = {u'id': [u'bot1'], u'pool': [u'default']}
429 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) 431 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None)
430 self.assertEqual([], actual) 432 self.assertEqual([], actual)
431 433
432 def test_yield_next_available_task_to_dispatch_none_mismatch(self): 434 def test_yield_next_available_task_to_dispatch_none_mismatch(self):
433 _gen_new_task_to_run( 435 self._gen_new_task_to_run(
434 properties={ 436 properties={
435 'dimensions': {u'os': u'Windows-3.1.1', u'pool': u'default'}, 437 'dimensions': {u'os': u'Windows-3.1.1', u'pool': u'default'},
436 }) 438 })
437 # Bot declares other dimensions, so it will fail to match. 439 # Bot declares other dimensions, so it will fail to match.
438 bot_dimensions = { 440 bot_dimensions = {
439 u'id': [u'bot1'], 441 u'id': [u'bot1'],
440 u'os': [u'Windows-3.0'], 442 u'os': [u'Windows-3.0'],
441 u'pool': [u'default'], 443 u'pool': [u'default'],
442 } 444 }
443 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) 445 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None)
444 self.assertEqual([], actual) 446 self.assertEqual([], actual)
445 447
446 def test_yield_next_available_task_to_dispatch(self): 448 def test_yield_next_available_task_to_dispatch(self):
447 request_dimensions = { 449 request_dimensions = {
448 u'foo': u'bar', 450 u'foo': u'bar',
449 u'os': u'Windows-3.1.1', 451 u'os': u'Windows-3.1.1',
450 u'pool': u'default', 452 u'pool': u'default',
451 } 453 }
452 _gen_new_task_to_run( 454 self._gen_new_task_to_run(
453 properties=dict(dimensions=request_dimensions)) 455 properties=dict(dimensions=request_dimensions))
454 # Bot declares exactly same dimensions so it matches. 456 # Bot declares exactly same dimensions so it matches.
455 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()} 457 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()}
456 bot_dimensions[u'id'] = [u'bot1'] 458 bot_dimensions[u'id'] = [u'bot1']
457 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) 459 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None)
458 expected = [ 460 expected = [
459 { 461 {
460 'dimensions_hash': _hash_dimensions(request_dimensions), 462 'dimensions_hash': _hash_dimensions(request_dimensions),
461 'expiration_ts': self.expiration_ts, 463 'expiration_ts': self.expiration_ts,
462 'queue_number': '0x000a890b67ba1346', 464 'queue_number': '0x000a890b67ba1346',
463 }, 465 },
464 ] 466 ]
465 self.assertEqual(expected, actual) 467 self.assertEqual(expected, actual)
466 468
467 def test_yield_next_available_task_to_dispatch_subset(self): 469 def test_yield_next_available_task_to_dispatch_subset(self):
468 request_dimensions = { 470 request_dimensions = {
469 u'os': u'Windows-3.1.1', 471 u'os': u'Windows-3.1.1',
470 u'pool': u'default', 472 u'pool': u'default',
471 } 473 }
472 _gen_new_task_to_run( 474 self._gen_new_task_to_run(
473 properties=dict(dimensions=request_dimensions)) 475 properties=dict(dimensions=request_dimensions))
474 # Bot declares more dimensions than needed, this is fine and it matches. 476 # Bot declares more dimensions than needed, this is fine and it matches.
475 bot_dimensions = { 477 bot_dimensions = {
476 u'id': [u'localhost'], 478 u'id': [u'localhost'],
477 u'os': [u'Windows-3.1.1'], 479 u'os': [u'Windows-3.1.1'],
478 u'pool': [u'default'], 480 u'pool': [u'default'],
479 } 481 }
480 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) 482 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None)
481 expected = [ 483 expected = [
482 { 484 {
483 'dimensions_hash': _hash_dimensions(request_dimensions), 485 'dimensions_hash': _hash_dimensions(request_dimensions),
484 'expiration_ts': self.expiration_ts, 486 'expiration_ts': self.expiration_ts,
485 'queue_number': '0x000a890b67ba1346', 487 'queue_number': '0x000a890b67ba1346',
486 }, 488 },
487 ] 489 ]
488 self.assertEqual(expected, actual) 490 self.assertEqual(expected, actual)
489 491
490 def test_yield_next_available_task_shard(self): 492 def test_yield_next_available_task_shard(self):
491 request_dimensions = { 493 request_dimensions = {
492 u'os': u'Windows-3.1.1', 494 u'os': u'Windows-3.1.1',
493 u'pool': u'default', 495 u'pool': u'default',
494 } 496 }
495 _gen_new_task_to_run(properties=dict(dimensions=request_dimensions)) 497 self._gen_new_task_to_run(properties=dict(dimensions=request_dimensions))
496 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()} 498 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()}
497 bot_dimensions[u'id'] = [u'bot1'] 499 bot_dimensions[u'id'] = [u'bot1']
498 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) 500 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None)
499 expected = [ 501 expected = [
500 { 502 {
501 'dimensions_hash': _hash_dimensions(request_dimensions), 503 'dimensions_hash': _hash_dimensions(request_dimensions),
502 'expiration_ts': self.expiration_ts, 504 'expiration_ts': self.expiration_ts,
503 'queue_number': '0x000a890b67ba1346', 505 'queue_number': '0x000a890b67ba1346',
504 }, 506 },
505 ] 507 ]
506 self.assertEqual(expected, actual) 508 self.assertEqual(expected, actual)
507 509
508 def test_yield_next_available_task_to_dispatch_subset_multivalue(self): 510 def test_yield_next_available_task_to_dispatch_subset_multivalue(self):
509 request_dimensions = { 511 request_dimensions = {
510 u'os': u'Windows-3.1.1', 512 u'os': u'Windows-3.1.1',
511 u'pool': u'default', 513 u'pool': u'default',
512 } 514 }
513 _gen_new_task_to_run( 515 self._gen_new_task_to_run(
514 properties=dict(dimensions=request_dimensions)) 516 properties=dict(dimensions=request_dimensions))
515 # Bot declares more dimensions than needed. 517 # Bot declares more dimensions than needed.
516 bot_dimensions = { 518 bot_dimensions = {
517 u'id': [u'localhost'], 519 u'id': [u'localhost'],
518 u'os': [u'Windows', u'Windows-3.1.1'], 520 u'os': [u'Windows', u'Windows-3.1.1'],
519 u'pool': [u'default'], 521 u'pool': [u'default'],
520 } 522 }
521 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) 523 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None)
522 expected = [ 524 expected = [
523 { 525 {
524 'dimensions_hash': _hash_dimensions(request_dimensions), 526 'dimensions_hash': _hash_dimensions(request_dimensions),
525 'expiration_ts': self.expiration_ts, 527 'expiration_ts': self.expiration_ts,
526 'queue_number': '0x000a890b67ba1346', 528 'queue_number': '0x000a890b67ba1346',
527 }, 529 },
528 ] 530 ]
529 self.assertEqual(expected, actual) 531 self.assertEqual(expected, actual)
530 532
531 def test_yield_next_available_task_to_dispatch_multi_normal(self): 533 def test_yield_next_available_task_to_dispatch_multi_normal(self):
532 # Task added one after the other, normal case. 534 # Task added one after the other, normal case.
533 request_dimensions_1 = { 535 request_dimensions_1 = {
534 u'foo': u'bar', 536 u'foo': u'bar',
535 u'os': u'Windows-3.1.1', 537 u'os': u'Windows-3.1.1',
536 u'pool': u'default', 538 u'pool': u'default',
537 } 539 }
538 _gen_new_task_to_run(properties=dict(dimensions=request_dimensions_1)) 540 self._gen_new_task_to_run(properties=dict(dimensions=request_dimensions_1))
539 541
540 # It's normally time ordered. 542 # It's normally time ordered.
541 self.mock_now(self.now, 1) 543 self.mock_now(self.now, 1)
542 request_dimensions_2 = {u'id': u'localhost', u'pool': u'default'} 544 request_dimensions_2 = {u'id': u'localhost', u'pool': u'default'}
543 _gen_new_task_to_run(properties=dict(dimensions=request_dimensions_2)) 545 self._gen_new_task_to_run(properties=dict(dimensions=request_dimensions_2))
544 546
545 bot_dimensions = { 547 bot_dimensions = {
546 u'foo': [u'bar'], 548 u'foo': [u'bar'],
547 u'id': [u'localhost'], 549 u'id': [u'localhost'],
548 u'os': [u'Windows-3.1.1'], 550 u'os': [u'Windows-3.1.1'],
549 u'pool': [u'default'], 551 u'pool': [u'default'],
550 } 552 }
551 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) 553 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None)
552 expected = [ 554 expected = [
553 { 555 {
(...skipping 12 matching lines...) Expand all
566 def test_yield_next_available_task_to_dispatch_clock_skew(self): 568 def test_yield_next_available_task_to_dispatch_clock_skew(self):
567 # Asserts that a TaskToRun added later in the DB (with a Key with an higher 569 # Asserts that a TaskToRun added later in the DB (with a Key with an higher
568 # value) but with a timestamp sooner (for example, time desynchronization 570 # value) but with a timestamp sooner (for example, time desynchronization
569 # between machines) is still returned in the timestamp order, e.g. priority 571 # between machines) is still returned in the timestamp order, e.g. priority
570 # is done based on timestamps and priority only. 572 # is done based on timestamps and priority only.
571 request_dimensions_1 = { 573 request_dimensions_1 = {
572 u'foo': u'bar', 574 u'foo': u'bar',
573 u'os': u'Windows-3.1.1', 575 u'os': u'Windows-3.1.1',
574 u'pool': u'default', 576 u'pool': u'default',
575 } 577 }
576 _gen_new_task_to_run(properties=dict(dimensions=request_dimensions_1)) 578 self._gen_new_task_to_run(properties=dict(dimensions=request_dimensions_1))
577 579
578 # The second shard is added before the first, potentially because of a 580 # The second shard is added before the first, potentially because of a
579 # desynchronized clock. It'll have higher priority. 581 # desynchronized clock. It'll have higher priority.
580 self.mock_now(self.now, -1) 582 self.mock_now(self.now, -1)
581 request_dimensions_2 = {u'id': u'localhost', u'pool': u'default'} 583 request_dimensions_2 = {u'id': u'localhost', u'pool': u'default'}
582 _gen_new_task_to_run(properties=dict(dimensions=request_dimensions_2)) 584 self._gen_new_task_to_run(properties=dict(dimensions=request_dimensions_2))
583 585
584 bot_dimensions = { 586 bot_dimensions = {
585 u'foo': [u'bar'], 587 u'foo': [u'bar'],
586 u'id': [u'localhost'], 588 u'id': [u'localhost'],
587 u'os': [u'Windows-3.1.1'], 589 u'os': [u'Windows-3.1.1'],
588 u'pool': [u'default'], 590 u'pool': [u'default'],
589 } 591 }
590 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) 592 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None)
591 expected = [ 593 expected = [
592 { 594 {
593 'dimensions_hash': _hash_dimensions(request_dimensions_2), 595 'dimensions_hash': _hash_dimensions(request_dimensions_2),
594 # Due to time being late on the second requester frontend. 596 # Due to time being late on the second requester frontend.
595 'expiration_ts': self.expiration_ts - datetime.timedelta(seconds=1), 597 'expiration_ts': self.expiration_ts - datetime.timedelta(seconds=1),
596 'queue_number': '0x000a890b67aad106', 598 'queue_number': '0x000a890b67aad106',
597 }, 599 },
598 { 600 {
599 'dimensions_hash': _hash_dimensions(request_dimensions_1), 601 'dimensions_hash': _hash_dimensions(request_dimensions_1),
600 'expiration_ts': self.expiration_ts, 602 'expiration_ts': self.expiration_ts,
601 'queue_number': '0x000a890b67ba1346', 603 'queue_number': '0x000a890b67ba1346',
602 }, 604 },
603 ] 605 ]
604 self.assertEqual(expected, actual) 606 self.assertEqual(expected, actual)
605 607
606 def test_yield_next_available_task_to_dispatch_priority(self): 608 def test_yield_next_available_task_to_dispatch_priority(self):
607 # Task added later but with higher priority are returned first. 609 # Task added later but with higher priority are returned first.
608 request_dimensions_1 = {u'os': u'Windows-3.1.1', u'pool': u'default'} 610 request_dimensions_1 = {u'os': u'Windows-3.1.1', u'pool': u'default'}
609 _gen_new_task_to_run(properties=dict(dimensions=request_dimensions_1)) 611 self._gen_new_task_to_run(properties=dict(dimensions=request_dimensions_1))
610 612
611 # This one is later but has higher priority. 613 # This one is later but has higher priority.
612 self.mock_now(self.now, 60) 614 self.mock_now(self.now, 60)
613 request_dimensions_2 = {u'os': u'Windows-3.1.1', u'pool': u'default'} 615 request_dimensions_2 = {u'os': u'Windows-3.1.1', u'pool': u'default'}
614 _gen_new_task_to_run( 616 request = self.mkreq(
615 properties=dict(dimensions=request_dimensions_2), priority=10) 617 _gen_request(
618 properties=dict(dimensions=request_dimensions_2), priority=10),
619 nb_tasks=0)
620 task_to_run.new_task_to_run(request).put()
616 621
617 # It should return them all, in the expected order. 622 # It should return them all, in the expected order.
618 expected = [ 623 expected = [
619 { 624 {
620 'dimensions_hash': _hash_dimensions(request_dimensions_1), 625 'dimensions_hash': _hash_dimensions(request_dimensions_1),
621 'expiration_ts': datetime.datetime(2014, 1, 2, 3, 6, 5, 6), 626 'expiration_ts': datetime.datetime(2014, 1, 2, 3, 6, 5, 6),
622 'queue_number': '0x00060dc588329a46', 627 'queue_number': '0x00060dc588329a46',
623 }, 628 },
624 { 629 {
625 'dimensions_hash': _hash_dimensions(request_dimensions_2), 630 'dimensions_hash': _hash_dimensions(request_dimensions_2),
626 'expiration_ts': datetime.datetime(2014, 1, 2, 3, 5, 5, 6), 631 'expiration_ts': datetime.datetime(2014, 1, 2, 3, 5, 5, 6),
627 'queue_number': '0x000a890b67ba1346', 632 'queue_number': '0x000a890b67ba1346',
628 }, 633 },
629 ] 634 ]
630 bot_dimensions = { 635 bot_dimensions = {
631 u'id': [u'localhost'], 636 u'id': [u'localhost'],
632 u'os': [u'Windows-3.1.1'], 637 u'os': [u'Windows-3.1.1'],
633 u'pool': [u'default'], 638 u'pool': [u'default'],
634 } 639 }
635 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) 640 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None)
636 self.assertEqual(expected, actual) 641 self.assertEqual(expected, actual)
637 642
638 def test_yield_next_available_task_to_run_task_exceeds_deadline(self): 643 def test_yield_next_available_task_to_run_task_exceeds_deadline(self):
639 request_dimensions = { 644 request_dimensions = {
640 u'foo': u'bar', 645 u'foo': u'bar',
641 u'id': u'localhost', 646 u'id': u'localhost',
642 u'os': u'Windows-3.1.1', 647 u'os': u'Windows-3.1.1',
643 u'pool': u'default', 648 u'pool': u'default',
644 } 649 }
645 _gen_new_task_to_run( 650 self._gen_new_task_to_run(
646 properties=dict(dimensions=request_dimensions)) 651 properties=dict(dimensions=request_dimensions))
647 # Bot declares exactly same dimensions so it matches. 652 # Bot declares exactly same dimensions so it matches.
648 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()} 653 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()}
649 actual = _yield_next_available_task_to_dispatch( 654 actual = _yield_next_available_task_to_dispatch(
650 bot_dimensions, datetime.datetime(1969, 1, 1)) 655 bot_dimensions, datetime.datetime(1969, 1, 1))
651 self.failIf(actual) 656 self.failIf(actual)
652 657
653 def test_yield_next_available_task_to_run_task_meets_deadline(self): 658 def test_yield_next_available_task_to_run_task_meets_deadline(self):
654 request_dimensions = { 659 request_dimensions = {
655 u'foo': u'bar', 660 u'foo': u'bar',
656 u'id': u'localhost', 661 u'id': u'localhost',
657 u'os': u'Windows-3.1.1', 662 u'os': u'Windows-3.1.1',
658 u'pool': u'default', 663 u'pool': u'default',
659 } 664 }
660 _gen_new_task_to_run( 665 self._gen_new_task_to_run(
661 properties=dict(dimensions=request_dimensions)) 666 properties=dict(dimensions=request_dimensions))
662 # Bot declares exactly same dimensions so it matches. 667 # Bot declares exactly same dimensions so it matches.
663 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()} 668 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()}
664 actual = _yield_next_available_task_to_dispatch( 669 actual = _yield_next_available_task_to_dispatch(
665 bot_dimensions, datetime.datetime(3000, 1, 1)) 670 bot_dimensions, datetime.datetime(3000, 1, 1))
666 expected = [ 671 expected = [
667 { 672 {
668 'dimensions_hash': _hash_dimensions(request_dimensions), 673 'dimensions_hash': _hash_dimensions(request_dimensions),
669 'expiration_ts': self.expiration_ts, 674 'expiration_ts': self.expiration_ts,
670 'queue_number': '0x000a890b67ba1346', 675 'queue_number': '0x000a890b67ba1346',
671 }, 676 },
672 ] 677 ]
673 self.assertEqual(expected, actual) 678 self.assertEqual(expected, actual)
674 679
675 def test_yield_next_available_task_to_run_task_terminate(self): 680 def test_yield_next_available_task_to_run_task_terminate(self):
676 request_dimensions = { 681 request_dimensions = {
677 u'id': u'fake-id', 682 u'id': u'fake-id',
678 } 683 }
679 task = _gen_new_task_to_run( 684 task = self._gen_new_task_to_run(
680 priority=0, 685 priority=0,
681 properties=dict( 686 properties=dict(
682 command=[], dimensions=request_dimensions, execution_timeout_secs=0, 687 command=[], dimensions=request_dimensions, execution_timeout_secs=0,
683 grace_period_secs=0)) 688 grace_period_secs=0))
684 self.assertTrue(task.key.parent().get().properties.is_terminate) 689 self.assertTrue(task.key.parent().get().properties.is_terminate)
685 # Bot declares exactly same dimensions so it matches. 690 # Bot declares exactly same dimensions so it matches.
686 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()} 691 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()}
687 bot_dimensions[u'pool'] = [u'default'] 692 bot_dimensions[u'pool'] = [u'default']
688 actual = _yield_next_available_task_to_dispatch(bot_dimensions, 0) 693 actual = _yield_next_available_task_to_dispatch(bot_dimensions, 0)
689 expected = [ 694 expected = [
690 { 695 {
691 'dimensions_hash': _hash_dimensions(request_dimensions), 696 'dimensions_hash': _hash_dimensions(request_dimensions),
692 'expiration_ts': self.expiration_ts, 697 'expiration_ts': self.expiration_ts,
693 'queue_number': '0x0004eef40bd85346', 698 'queue_number': '0x0004eef40bd85346',
694 }, 699 },
695 ] 700 ]
696 self.assertEqual(expected, actual) 701 self.assertEqual(expected, actual)
697 702
698 def test_yield_expired_task_to_run(self): 703 def test_yield_expired_task_to_run(self):
699 now = utils.utcnow() 704 now = utils.utcnow()
700 _gen_new_task_to_run( 705 self._gen_new_task_to_run(
701 created_ts=now, 706 created_ts=now,
702 expiration_ts=now+datetime.timedelta(seconds=60)) 707 expiration_ts=now+datetime.timedelta(seconds=60))
703 bot_dimensions = {u'id': [u'bot1'], u'pool': [u'default']} 708 bot_dimensions = {u'id': [u'bot1'], u'pool': [u'default']}
704 self.assertEqual( 709 self.assertEqual(
705 1, 710 1,
706 len(_yield_next_available_task_to_dispatch(bot_dimensions, None))) 711 len(_yield_next_available_task_to_dispatch(bot_dimensions, None)))
707 self.assertEqual( 712 self.assertEqual(
708 0, len(list(task_to_run.yield_expired_task_to_run()))) 713 0, len(list(task_to_run.yield_expired_task_to_run())))
709 714
710 # All tasks are now expired. Note that even if they still have .queue_number 715 # All tasks are now expired. Note that even if they still have .queue_number
711 # set because the cron job wasn't run, they are still not yielded by 716 # set because the cron job wasn't run, they are still not yielded by
712 # yield_next_available_task_to_dispatch() 717 # yield_next_available_task_to_dispatch()
713 self.mock_now(self.now, 61) 718 self.mock_now(self.now, 61)
714 self.assertEqual( 719 self.assertEqual(
715 0, len(_yield_next_available_task_to_dispatch(bot_dimensions, None))) 720 0, len(_yield_next_available_task_to_dispatch(bot_dimensions, None)))
716 self.assertEqual( 721 self.assertEqual(
717 1, len(list(task_to_run.yield_expired_task_to_run()))) 722 1, len(list(task_to_run.yield_expired_task_to_run())))
718 723
719 def test_is_reapable(self): 724 def test_is_reapable(self):
720 req_dimensions = {u'os': u'Windows-3.1.1', u'pool': u'default'} 725 req_dimensions = {u'os': u'Windows-3.1.1', u'pool': u'default'}
721 to_run = _gen_new_task_to_run(properties=dict(dimensions=req_dimensions)) 726 to_run = self._gen_new_task_to_run(
727 properties=dict(dimensions=req_dimensions))
722 bot_dimensions = { 728 bot_dimensions = {
723 u'id': [u'localhost'], 729 u'id': [u'localhost'],
724 u'os': [u'Windows-3.1.1'], 730 u'os': [u'Windows-3.1.1'],
725 u'pool': [u'default'], 731 u'pool': [u'default'],
726 } 732 }
727 self.assertEqual( 733 self.assertEqual(
728 1, len(_yield_next_available_task_to_dispatch(bot_dimensions, None))) 734 1, len(_yield_next_available_task_to_dispatch(bot_dimensions, None)))
729 735
730 self.assertEqual(True, to_run.is_reapable) 736 self.assertEqual(True, to_run.is_reapable)
731 to_run.queue_number = None 737 to_run.queue_number = None
732 to_run.put() 738 to_run.put()
733 self.assertEqual(False, to_run.is_reapable) 739 self.assertEqual(False, to_run.is_reapable)
734 740
735 def test_set_lookup_cache(self): 741 def test_set_lookup_cache(self):
736 to_run = _gen_new_task_to_run( 742 to_run = self._gen_new_task_to_run(
737 properties={ 743 properties={
738 'dimensions': {u'os': u'Windows-3.1.1', u'pool': u'default'}, 744 'dimensions': {u'os': u'Windows-3.1.1', u'pool': u'default'},
739 }) 745 })
740 self.assertEqual(False, task_to_run._lookup_cache_is_taken(to_run.key)) 746 self.assertEqual(False, task_to_run._lookup_cache_is_taken(to_run.key))
741 task_to_run.set_lookup_cache(to_run.key, True) 747 task_to_run.set_lookup_cache(to_run.key, True)
742 self.assertEqual(False, task_to_run._lookup_cache_is_taken(to_run.key)) 748 self.assertEqual(False, task_to_run._lookup_cache_is_taken(to_run.key))
743 task_to_run.set_lookup_cache(to_run.key, False) 749 task_to_run.set_lookup_cache(to_run.key, False)
744 self.assertEqual(True, task_to_run._lookup_cache_is_taken(to_run.key)) 750 self.assertEqual(True, task_to_run._lookup_cache_is_taken(to_run.key))
745 task_to_run.set_lookup_cache(to_run.key, True) 751 task_to_run.set_lookup_cache(to_run.key, True)
746 self.assertEqual(False, task_to_run._lookup_cache_is_taken(to_run.key)) 752 self.assertEqual(False, task_to_run._lookup_cache_is_taken(to_run.key))
747 753
748 754
749 if __name__ == '__main__': 755 if __name__ == '__main__':
750 if '-v' in sys.argv: 756 if '-v' in sys.argv:
751 unittest.TestCase.maxDiff = None 757 unittest.TestCase.maxDiff = None
752 logging.basicConfig( 758 logging.basicConfig(
753 level=logging.DEBUG if '-v' in sys.argv else logging.ERROR) 759 level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
754 unittest.main() 760 unittest.main()
OLDNEW
« no previous file with comments | « appengine/swarming/server/task_queues.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698