| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright 2014 The LUCI Authors. All rights reserved. | 2 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
| 5 | 5 |
| 6 import datetime | 6 import datetime |
| 7 import logging | 7 import logging |
| 8 import os | 8 import os |
| 9 import random | 9 import random |
| 10 import sys | 10 import sys |
| (...skipping 17 matching lines...) Expand all Loading... |
| 28 from server import bot_management | 28 from server import bot_management |
| 29 from server import task_queues | 29 from server import task_queues |
| 30 from server import task_request | 30 from server import task_request |
| 31 from server import task_to_run | 31 from server import task_to_run |
| 32 | 32 |
| 33 | 33 |
| 34 # pylint: disable=W0212 | 34 # pylint: disable=W0212 |
| 35 # Method could be a function - pylint: disable=R0201 | 35 # Method could be a function - pylint: disable=R0201 |
| 36 | 36 |
| 37 | 37 |
| 38 def _flatten(request_dimensions): |
| 39 return [u'%s:%s' % (k, v) for k, v in request_dimensions] |
| 40 |
| 41 |
| 38 def _gen_request(properties=None, **kwargs): | 42 def _gen_request(properties=None, **kwargs): |
| 39 """Creates a TaskRequest.""" | 43 """Creates a TaskRequest.""" |
| 40 props = { | 44 props = { |
| 41 'command': [u'command1'], | 45 'command': [u'command1'], |
| 42 'dimensions': {u'pool': u'default'}, | 46 'dimensions_flat': [u'pool:default'], |
| 43 'env': {}, | 47 'env': {}, |
| 44 'execution_timeout_secs': 24*60*60, | 48 'execution_timeout_secs': 24*60*60, |
| 45 'io_timeout_secs': None, | 49 'io_timeout_secs': None, |
| 46 } | 50 } |
| 47 props.update(properties or {}) | 51 props.update(properties or {}) |
| 48 props['dimensions_dict'] = props.pop('dimensions') | |
| 49 now = utils.utcnow() | 52 now = utils.utcnow() |
| 50 args = { | 53 args = { |
| 51 'created_ts': now, | 54 'created_ts': now, |
| 52 'name': 'Request name', | 55 'name': 'Request name', |
| 53 'priority': 50, | 56 'priority': 50, |
| 54 'properties': task_request.TaskProperties(**props), | 57 'properties': task_request.TaskProperties(**props), |
| 55 'expiration_ts': now + datetime.timedelta(seconds=60), | 58 'expiration_ts': now + datetime.timedelta(seconds=60), |
| 56 'tags': [u'tag:1'], | 59 'tags': [u'tag:1'], |
| 57 'user': 'Jesus', | 60 'user': 'Jesus', |
| 58 } | 61 } |
| (...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 182 ] | 185 ] |
| 183 for i, ((dimensions_hash, timestamp, priority), expected) in enumerate( | 186 for i, ((dimensions_hash, timestamp, priority), expected) in enumerate( |
| 184 data): | 187 data): |
| 185 d = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f') | 188 d = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f') |
| 186 actual = '0x%016x' % task_to_run._gen_queue_number( | 189 actual = '0x%016x' % task_to_run._gen_queue_number( |
| 187 dimensions_hash, d, priority) | 190 dimensions_hash, d, priority) |
| 188 self.assertEquals((i, expected), (i, actual)) | 191 self.assertEquals((i, expected), (i, actual)) |
| 189 | 192 |
| 190 def test_new_task_to_run(self): | 193 def test_new_task_to_run(self): |
| 191 self.mock(random, 'getrandbits', lambda _: 0x12) | 194 self.mock(random, 'getrandbits', lambda _: 0x12) |
| 192 request_dimensions = {u'os': u'Windows-3.1.1', u'pool': u'default'} | 195 request_dimensions = [(u'os', u'Windows-3.1.1'), (u'pool', u'default')] |
| 193 now = utils.utcnow() | 196 now = utils.utcnow() |
| 194 data = _gen_request( | 197 data = _gen_request( |
| 195 properties={ | 198 properties={ |
| 196 'command': [u'command1', u'arg1'], | 199 'command': [u'command1', u'arg1'], |
| 197 'dimensions': request_dimensions, | 200 'dimensions_flat': _flatten(request_dimensions), |
| 198 'env': {u'foo': u'bar'}, | 201 'env': {u'foo': u'bar'}, |
| 199 'execution_timeout_secs': 30, | 202 'execution_timeout_secs': 30, |
| 200 }, | 203 }, |
| 201 priority=20, | 204 priority=20, |
| 202 created_ts=now, | 205 created_ts=now, |
| 203 expiration_ts=now+datetime.timedelta(seconds=31)) | 206 expiration_ts=now+datetime.timedelta(seconds=31)) |
| 204 task_to_run.new_task_to_run(self.mkreq(data)).put() | 207 task_to_run.new_task_to_run(self.mkreq(data)).put() |
| 205 | 208 |
| 206 # Create a second with higher priority. | 209 # Create a second with higher priority. |
| 207 self.mock(random, 'getrandbits', lambda _: 0x23) | 210 self.mock(random, 'getrandbits', lambda _: 0x23) |
| 208 data = _gen_request( | 211 data = _gen_request( |
| 209 properties={ | 212 properties={ |
| 210 'command': [u'command1', u'arg1'], | 213 'command': [u'command1', u'arg1'], |
| 211 'dimensions': request_dimensions, | 214 'dimensions_flat': _flatten(request_dimensions), |
| 212 'env': {u'foo': u'bar'}, | 215 'env': {u'foo': u'bar'}, |
| 213 'execution_timeout_secs': 30, | 216 'execution_timeout_secs': 30, |
| 214 }, | 217 }, |
| 215 priority=10, | 218 priority=10, |
| 216 created_ts=now, | 219 created_ts=now, |
| 217 expiration_ts=now+datetime.timedelta(seconds=31)) | 220 expiration_ts=now+datetime.timedelta(seconds=31)) |
| 218 task_to_run.new_task_to_run(self.mkreq(data, nb_task=0)).put() | 221 task_to_run.new_task_to_run(self.mkreq(data, nb_task=0)).put() |
| 219 | 222 |
| 220 expected = [ | 223 expected = [ |
| 221 { | 224 { |
| (...skipping 17 matching lines...) Expand all Loading... |
| 239 return out | 242 return out |
| 240 | 243 |
| 241 # Warning: Ordering by key doesn't work because of TaskToRunShard; e.g. | 244 # Warning: Ordering by key doesn't work because of TaskToRunShard; e.g. |
| 242 # the entity key ordering DOES NOT correlate with .queue_number | 245 # the entity key ordering DOES NOT correlate with .queue_number |
| 243 # Ensure they come out in expected order. | 246 # Ensure they come out in expected order. |
| 244 q = task_to_run.TaskToRun.query().order(task_to_run.TaskToRun.queue_number) | 247 q = task_to_run.TaskToRun.query().order(task_to_run.TaskToRun.queue_number) |
| 245 self.assertEqual(expected, map(flatten, q.fetch())) | 248 self.assertEqual(expected, map(flatten, q.fetch())) |
| 246 | 249 |
| 247 def test_match_dimensions(self): | 250 def test_match_dimensions(self): |
| 248 data_true = ( | 251 data_true = ( |
| 249 ({}, {}), | 252 ([], {}), |
| 250 ({}, {'a': 'b'}), | 253 ([], {'a': 'b'}), |
| 251 ({'a': 'b'}, {'a': 'b'}), | 254 ([('a', 'b')], {'a': 'b'}), |
| 252 ({'os': 'amiga'}, {'os': ['amiga', 'amiga-3.1']}), | 255 ([('os', 'amiga')], {'os': ['amiga', 'amiga-3.1']}), |
| 253 ( {'os': 'amiga', 'foo': 'bar'}, | 256 ( [('os', 'amiga'), ('foo', 'bar')], |
| 254 {'os': ['amiga', 'amiga-3.1'], 'a': 'b', 'foo': 'bar'}), | 257 {'os': ['amiga', 'amiga-3.1'], 'a': 'b', 'foo': 'bar'}), |
| 255 ) | 258 ) |
| 256 | 259 |
| 257 for request_dimensions, bot_dimensions in data_true: | 260 for request_dimensions, bot_dimensions in data_true: |
| 258 self.assertEqual( | 261 self.assertEqual( |
| 259 True, | 262 True, |
| 260 task_to_run.match_dimensions(request_dimensions, bot_dimensions)) | 263 task_to_run.match_dimensions(request_dimensions, bot_dimensions)) |
| 261 | 264 |
| 262 data_false = ( | 265 data_false = ( |
| 263 ({'os': 'amiga'}, {'os': ['Win', 'Win-3.1']}), | 266 ([('os', 'amiga')], {'os': ['Win', 'Win-3.1']}), |
| 264 ) | 267 ) |
| 265 for request_dimensions, bot_dimensions in data_false: | 268 for request_dimensions, bot_dimensions in data_false: |
| 266 self.assertEqual( | 269 self.assertEqual( |
| 267 False, | 270 False, |
| 268 task_to_run.match_dimensions(request_dimensions, bot_dimensions)) | 271 task_to_run.match_dimensions(request_dimensions, bot_dimensions)) |
| 269 | 272 |
| 270 def test_yield_next_available_task_to_dispatch_none(self): | 273 def test_yield_next_available_task_to_dispatch_none(self): |
| 274 request_dimensions = [ |
| 275 (u'os', u'Windows-3.1.1'), |
| 276 (u'pool', u'default'), |
| 277 ] |
| 271 self._gen_new_task_to_run( | 278 self._gen_new_task_to_run( |
| 272 properties={ | 279 properties=dict(dimensions_flat=_flatten(request_dimensions))) |
| 273 'dimensions': {u'os': u'Windows-3.1.1', u'pool': u'default'}, | |
| 274 }) | |
| 275 # Bot declares no dimensions, so it will fail to match. | 280 # Bot declares no dimensions, so it will fail to match. |
| 276 bot_dimensions = {u'id': [u'bot1'], u'pool': [u'default']} | 281 bot_dimensions = {u'id': [u'bot1'], u'pool': [u'default']} |
| 277 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) | 282 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) |
| 278 self.assertEqual([], actual) | 283 self.assertEqual([], actual) |
| 279 | 284 |
| 280 def test_yield_next_available_task_to_dispatch_none_mismatch(self): | 285 def test_yield_next_available_task_to_dispatch_none_mismatch(self): |
| 286 request_dimensions = [ |
| 287 (u'os', u'Windows-3.1.1'), |
| 288 (u'pool', u'default'), |
| 289 ] |
| 281 self._gen_new_task_to_run( | 290 self._gen_new_task_to_run( |
| 282 properties={ | 291 properties=dict(dimensions_flat=_flatten(request_dimensions))) |
| 283 'dimensions': {u'os': u'Windows-3.1.1', u'pool': u'default'}, | |
| 284 }) | |
| 285 # Bot declares other dimensions, so it will fail to match. | 292 # Bot declares other dimensions, so it will fail to match. |
| 286 bot_dimensions = { | 293 bot_dimensions = { |
| 287 u'id': [u'bot1'], | 294 u'id': [u'bot1'], |
| 288 u'os': [u'Windows-3.0'], | 295 u'os': [u'Windows-3.0'], |
| 289 u'pool': [u'default'], | 296 u'pool': [u'default'], |
| 290 } | 297 } |
| 291 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) | 298 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) |
| 292 self.assertEqual([], actual) | 299 self.assertEqual([], actual) |
| 293 | 300 |
| 294 def test_yield_next_available_task_to_dispatch(self): | 301 def test_yield_next_available_task_to_dispatch(self): |
| 295 request_dimensions = { | 302 request_dimensions = [ |
| 296 u'foo': u'bar', | 303 (u'foo', u'bar'), |
| 297 u'os': u'Windows-3.1.1', | 304 (u'os', u'Windows-3.1.1'), |
| 298 u'pool': u'default', | 305 (u'pool', u'default'), |
| 299 } | 306 ] |
| 300 self._gen_new_task_to_run( | 307 self._gen_new_task_to_run( |
| 301 properties=dict(dimensions=request_dimensions)) | 308 properties=dict(dimensions_flat=_flatten(request_dimensions))) |
| 302 # Bot declares exactly same dimensions so it matches. | 309 # Bot declares exactly same dimensions so it matches. |
| 303 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()} | 310 bot_dimensions = {k: [v] for k, v in request_dimensions} |
| 304 bot_dimensions[u'id'] = [u'bot1'] | 311 bot_dimensions[u'id'] = [u'bot1'] |
| 305 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) | 312 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) |
| 306 expected = [ | 313 expected = [ |
| 307 { | 314 { |
| 308 'dimensions_hash': _hash_dimensions(request_dimensions), | 315 'dimensions_hash': _hash_dimensions(request_dimensions), |
| 309 'expiration_ts': self.expiration_ts, | 316 'expiration_ts': self.expiration_ts, |
| 310 'queue_number': '0x613fbb330c8ede72', | 317 'queue_number': '0x613fbb330c8ede72', |
| 311 }, | 318 }, |
| 312 ] | 319 ] |
| 313 self.assertEqual(expected, actual) | 320 self.assertEqual(expected, actual) |
| 314 | 321 |
| 315 def test_yield_next_available_task_to_dispatch_subset(self): | 322 def test_yield_next_available_task_to_dispatch_subset(self): |
| 316 request_dimensions = { | 323 request_dimensions = [ |
| 317 u'os': u'Windows-3.1.1', | 324 (u'os', u'Windows-3.1.1'), |
| 318 u'pool': u'default', | 325 (u'pool', u'default'), |
| 319 } | 326 ] |
| 320 self._gen_new_task_to_run( | 327 self._gen_new_task_to_run( |
| 321 properties=dict(dimensions=request_dimensions)) | 328 properties=dict(dimensions_flat=_flatten(request_dimensions))) |
| 322 # Bot declares more dimensions than needed, this is fine and it matches. | 329 # Bot declares more dimensions than needed, this is fine and it matches. |
| 323 bot_dimensions = { | 330 bot_dimensions = { |
| 324 u'id': [u'localhost'], | 331 u'id': [u'localhost'], |
| 325 u'os': [u'Windows-3.1.1'], | 332 u'os': [u'Windows-3.1.1'], |
| 326 u'pool': [u'default'], | 333 u'pool': [u'default'], |
| 327 } | 334 } |
| 328 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) | 335 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) |
| 329 expected = [ | 336 expected = [ |
| 330 { | 337 { |
| 331 'dimensions_hash': _hash_dimensions(request_dimensions), | 338 'dimensions_hash': _hash_dimensions(request_dimensions), |
| 332 'expiration_ts': self.expiration_ts, | 339 'expiration_ts': self.expiration_ts, |
| 333 'queue_number': '0x1a3aa6630c8ede72', | 340 'queue_number': '0x1a3aa6630c8ede72', |
| 334 }, | 341 }, |
| 335 ] | 342 ] |
| 336 self.assertEqual(expected, actual) | 343 self.assertEqual(expected, actual) |
| 337 | 344 |
| 338 def test_yield_next_available_task_shard(self): | 345 def test_yield_next_available_task_shard(self): |
| 339 request_dimensions = { | 346 request_dimensions = [ |
| 340 u'os': u'Windows-3.1.1', | 347 (u'os', u'Windows-3.1.1'), |
| 341 u'pool': u'default', | 348 (u'pool', u'default'), |
| 342 } | 349 ] |
| 343 self._gen_new_task_to_run(properties=dict(dimensions=request_dimensions)) | 350 self._gen_new_task_to_run( |
| 344 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()} | 351 properties=dict(dimensions_flat=_flatten(request_dimensions))) |
| 352 bot_dimensions = {k: [v] for k, v in request_dimensions} |
| 345 bot_dimensions[u'id'] = [u'bot1'] | 353 bot_dimensions[u'id'] = [u'bot1'] |
| 346 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) | 354 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) |
| 347 expected = [ | 355 expected = [ |
| 348 { | 356 { |
| 349 'dimensions_hash': _hash_dimensions(request_dimensions), | 357 'dimensions_hash': _hash_dimensions(request_dimensions), |
| 350 'expiration_ts': self.expiration_ts, | 358 'expiration_ts': self.expiration_ts, |
| 351 'queue_number': '0x1a3aa6630c8ede72', | 359 'queue_number': '0x1a3aa6630c8ede72', |
| 352 }, | 360 }, |
| 353 ] | 361 ] |
| 354 self.assertEqual(expected, actual) | 362 self.assertEqual(expected, actual) |
| 355 | 363 |
| 356 def test_yield_next_available_task_to_dispatch_subset_multivalue(self): | 364 def test_yield_next_available_task_to_dispatch_subset_multivalue(self): |
| 357 request_dimensions = { | 365 request_dimensions = [ |
| 358 u'os': u'Windows-3.1.1', | 366 (u'os', u'Windows-3.1.1'), |
| 359 u'pool': u'default', | 367 (u'pool', u'default'), |
| 360 } | 368 ] |
| 361 self._gen_new_task_to_run( | 369 self._gen_new_task_to_run( |
| 362 properties=dict(dimensions=request_dimensions)) | 370 properties=dict(dimensions_flat=_flatten(request_dimensions))) |
| 363 # Bot declares more dimensions than needed. | 371 # Bot declares more dimensions than needed. |
| 364 bot_dimensions = { | 372 bot_dimensions = { |
| 365 u'id': [u'localhost'], | 373 u'id': [u'localhost'], |
| 366 u'os': [u'Windows', u'Windows-3.1.1'], | 374 u'os': [u'Windows', u'Windows-3.1.1'], |
| 367 u'pool': [u'default'], | 375 u'pool': [u'default'], |
| 368 } | 376 } |
| 369 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) | 377 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) |
| 370 expected = [ | 378 expected = [ |
| 371 { | 379 { |
| 372 'dimensions_hash': _hash_dimensions(request_dimensions), | 380 'dimensions_hash': _hash_dimensions(request_dimensions), |
| 373 'expiration_ts': self.expiration_ts, | 381 'expiration_ts': self.expiration_ts, |
| 374 'queue_number': '0x1a3aa6630c8ede72', | 382 'queue_number': '0x1a3aa6630c8ede72', |
| 375 }, | 383 }, |
| 376 ] | 384 ] |
| 377 self.assertEqual(expected, actual) | 385 self.assertEqual(expected, actual) |
| 378 | 386 |
| 379 def test_yield_next_available_task_to_dispatch_multi_normal(self): | 387 def test_yield_next_available_task_to_dispatch_multi_normal(self): |
| 380 # Task added one after the other, normal case. | 388 # Task added one after the other, normal case. |
| 381 request_dimensions_1 = { | 389 request_dimensions_1 = [ |
| 382 u'foo': u'bar', | 390 (u'foo', u'bar'), |
| 383 u'os': u'Windows-3.1.1', | 391 (u'os', u'Windows-3.1.1'), |
| 384 u'pool': u'default', | 392 (u'pool', u'default'), |
| 385 } | 393 ] |
| 386 self._gen_new_task_to_run(properties=dict(dimensions=request_dimensions_1)) | 394 self._gen_new_task_to_run( |
| 395 properties=dict(dimensions_flat=_flatten(request_dimensions_1))) |
| 387 | 396 |
| 388 # It's normally time ordered. | 397 # It's normally time ordered. |
| 389 self.mock_now(self.now, 1) | 398 self.mock_now(self.now, 1) |
| 390 request_dimensions_2 = {u'id': u'localhost', u'pool': u'default'} | 399 request_dimensions_2 = [(u'id', u'localhost'), (u'pool', u'default')] |
| 391 self._gen_new_task_to_run( | 400 self._gen_new_task_to_run( |
| 392 properties=dict(dimensions=request_dimensions_2), nb_task=0) | 401 properties=dict(dimensions_flat=_flatten(request_dimensions_2)), |
| 402 nb_task=0) |
| 393 | 403 |
| 394 bot_dimensions = { | 404 bot_dimensions = { |
| 395 u'foo': [u'bar'], | 405 u'foo': [u'bar'], |
| 396 u'id': [u'localhost'], | 406 u'id': [u'localhost'], |
| 397 u'os': [u'Windows-3.1.1'], | 407 u'os': [u'Windows-3.1.1'], |
| 398 u'pool': [u'default'], | 408 u'pool': [u'default'], |
| 399 } | 409 } |
| 400 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) | 410 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) |
| 401 expected = [ | 411 expected = [ |
| 402 { | 412 { |
| 403 'dimensions_hash': _hash_dimensions(request_dimensions_1), | 413 'dimensions_hash': _hash_dimensions(request_dimensions_1), |
| 404 'expiration_ts': self.expiration_ts, | 414 'expiration_ts': self.expiration_ts, |
| 405 'queue_number': '0x613fbb330c8ede72', | 415 'queue_number': '0x613fbb330c8ede72', |
| 406 }, | 416 }, |
| 407 { | 417 { |
| 408 'dimensions_hash': _hash_dimensions(request_dimensions_2), | 418 'dimensions_hash': _hash_dimensions(request_dimensions_2), |
| 409 'expiration_ts': self.expiration_ts + datetime.timedelta(seconds=1), | 419 'expiration_ts': self.expiration_ts + datetime.timedelta(seconds=1), |
| 410 'queue_number': '0x5385bf748c8ede7c', | 420 'queue_number': '0x5385bf748c8ede7c', |
| 411 }, | 421 }, |
| 412 ] | 422 ] |
| 413 # There is a significant risk of non-determinism. | 423 # There is a significant risk of non-determinism. |
| 414 self.assertEqual(sorted(expected), sorted(actual)) | 424 self.assertEqual(sorted(expected), sorted(actual)) |
| 415 | 425 |
| 416 def test_yield_next_available_task_to_dispatch_clock_skew(self): | 426 def test_yield_next_available_task_to_dispatch_clock_skew(self): |
| 417 # Asserts that a TaskToRun added later in the DB (with a Key with an higher | 427 # Asserts that a TaskToRun added later in the DB (with a Key with an higher |
| 418 # value) but with a timestamp sooner (for example, time desynchronization | 428 # value) but with a timestamp sooner (for example, time desynchronization |
| 419 # between machines) is still returned in the timestamp order, e.g. priority | 429 # between machines) is still returned in the timestamp order, e.g. priority |
| 420 # is done based on timestamps and priority only. | 430 # is done based on timestamps and priority only. |
| 421 request_dimensions_1 = { | 431 request_dimensions_1 = [ |
| 422 u'foo': u'bar', | 432 (u'foo', u'bar'), |
| 423 u'os': u'Windows-3.1.1', | 433 (u'os', u'Windows-3.1.1'), |
| 424 u'pool': u'default', | 434 (u'pool', u'default'), |
| 425 } | 435 ] |
| 426 self._gen_new_task_to_run(properties=dict(dimensions=request_dimensions_1)) | 436 self._gen_new_task_to_run( |
| 437 properties=dict(dimensions_flat=_flatten(request_dimensions_1))) |
| 427 | 438 |
| 428 # The second shard is added before the first, potentially because of a | 439 # The second shard is added before the first, potentially because of a |
| 429 # desynchronized clock. It'll have higher priority. | 440 # desynchronized clock. It'll have higher priority. |
| 430 self.mock_now(self.now, -1) | 441 self.mock_now(self.now, -1) |
| 431 request_dimensions_2 = {u'id': u'localhost', u'pool': u'default'} | 442 request_dimensions_2 = [(u'id', u'localhost'), (u'pool', u'default')] |
| 432 self._gen_new_task_to_run( | 443 self._gen_new_task_to_run( |
| 433 properties=dict(dimensions=request_dimensions_2), nb_task=0) | 444 properties=dict(dimensions_flat=_flatten(request_dimensions_2)), |
| 445 nb_task=0) |
| 434 | 446 |
| 435 bot_dimensions = { | 447 bot_dimensions = { |
| 436 u'foo': [u'bar'], | 448 u'foo': [u'bar'], |
| 437 u'id': [u'localhost'], | 449 u'id': [u'localhost'], |
| 438 u'os': [u'Windows-3.1.1'], | 450 u'os': [u'Windows-3.1.1'], |
| 439 u'pool': [u'default'], | 451 u'pool': [u'default'], |
| 440 } | 452 } |
| 441 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) | 453 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) |
| 442 expected = [ | 454 expected = [ |
| 443 { | 455 { |
| 444 'dimensions_hash': _hash_dimensions(request_dimensions_1), | 456 'dimensions_hash': _hash_dimensions(request_dimensions_1), |
| 445 'expiration_ts': self.expiration_ts, | 457 'expiration_ts': self.expiration_ts, |
| 446 'queue_number': '0x613fbb330c8ede72', | 458 'queue_number': '0x613fbb330c8ede72', |
| 447 }, | 459 }, |
| 448 { | 460 { |
| 449 'dimensions_hash': _hash_dimensions(request_dimensions_2), | 461 'dimensions_hash': _hash_dimensions(request_dimensions_2), |
| 450 # Due to time being late on the second requester frontend. | 462 # Due to time being late on the second requester frontend. |
| 451 'expiration_ts': self.expiration_ts - datetime.timedelta(seconds=1), | 463 'expiration_ts': self.expiration_ts - datetime.timedelta(seconds=1), |
| 452 'queue_number': '0x5385bf748c8ede68', | 464 'queue_number': '0x5385bf748c8ede68', |
| 453 }, | 465 }, |
| 454 ] | 466 ] |
| 455 # There is a significant risk of non-determinism. | 467 # There is a significant risk of non-determinism. |
| 456 self.assertEqual(sorted(expected), sorted(actual)) | 468 self.assertEqual(sorted(expected), sorted(actual)) |
| 457 | 469 |
| 458 def test_yield_next_available_task_to_dispatch_priority(self): | 470 def test_yield_next_available_task_to_dispatch_priority(self): |
| 459 # Task added later but with higher priority are returned first. | 471 # Task added later but with higher priority are returned first. |
| 460 request_dimensions_1 = {u'os': u'Windows-3.1.1', u'pool': u'default'} | 472 request_dimensions_1 = [(u'os', u'Windows-3.1.1'), (u'pool', u'default')] |
| 461 self._gen_new_task_to_run(properties=dict(dimensions=request_dimensions_1)) | 473 self._gen_new_task_to_run( |
| 474 properties=dict(dimensions_flat=_flatten(request_dimensions_1))) |
| 462 | 475 |
| 463 # This one is later but has higher priority. | 476 # This one is later but has higher priority. |
| 464 self.mock_now(self.now, 60) | 477 self.mock_now(self.now, 60) |
| 465 request_dimensions_2 = {u'os': u'Windows-3.1.1', u'pool': u'default'} | 478 request_dimensions_2 = [(u'os', u'Windows-3.1.1'), (u'pool', u'default')] |
| 466 request = self.mkreq( | 479 request = self.mkreq( |
| 467 _gen_request( | 480 _gen_request( |
| 468 properties=dict(dimensions=request_dimensions_2), priority=10), | 481 properties=dict(dimensions_flat=_flatten(request_dimensions_2)), |
| 482 priority=10), |
| 469 nb_task=0) | 483 nb_task=0) |
| 470 task_to_run.new_task_to_run(request).put() | 484 task_to_run.new_task_to_run(request).put() |
| 471 | 485 |
| 472 # It should return them all, in the expected order. | 486 # It should return them all, in the expected order. |
| 473 expected = [ | 487 expected = [ |
| 474 { | 488 { |
| 475 'dimensions_hash': _hash_dimensions(request_dimensions_1), | 489 'dimensions_hash': _hash_dimensions(request_dimensions_1), |
| 476 'expiration_ts': datetime.datetime(2014, 1, 2, 3, 6, 5, 6), | 490 'expiration_ts': datetime.datetime(2014, 1, 2, 3, 6, 5, 6), |
| 477 'queue_number': '0x1a3aa663028ee0ca', | 491 'queue_number': '0x1a3aa663028ee0ca', |
| 478 }, | 492 }, |
| 479 { | 493 { |
| 480 'dimensions_hash': _hash_dimensions(request_dimensions_2), | 494 'dimensions_hash': _hash_dimensions(request_dimensions_2), |
| 481 'expiration_ts': datetime.datetime(2014, 1, 2, 3, 5, 5, 6), | 495 'expiration_ts': datetime.datetime(2014, 1, 2, 3, 5, 5, 6), |
| 482 'queue_number': '0x1a3aa6630c8ede72', | 496 'queue_number': '0x1a3aa6630c8ede72', |
| 483 }, | 497 }, |
| 484 ] | 498 ] |
| 485 bot_dimensions = { | 499 bot_dimensions = { |
| 486 u'id': [u'localhost'], | 500 u'id': [u'localhost'], |
| 487 u'os': [u'Windows-3.1.1'], | 501 u'os': [u'Windows-3.1.1'], |
| 488 u'pool': [u'default'], | 502 u'pool': [u'default'], |
| 489 } | 503 } |
| 490 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) | 504 actual = _yield_next_available_task_to_dispatch(bot_dimensions, None) |
| 491 self.assertEqual(expected, actual) | 505 self.assertEqual(expected, actual) |
| 492 | 506 |
| 493 def test_yield_next_available_task_to_run_task_exceeds_deadline(self): | 507 def test_yield_next_available_task_to_run_task_exceeds_deadline(self): |
| 494 request_dimensions = { | 508 request_dimensions = [ |
| 495 u'foo': u'bar', | 509 (u'foo', u'bar'), |
| 496 u'id': u'localhost', | 510 (u'id', u'localhost'), |
| 497 u'os': u'Windows-3.1.1', | 511 (u'os', u'Windows-3.1.1'), |
| 498 u'pool': u'default', | 512 (u'pool', u'default'), |
| 499 } | 513 ] |
| 500 self._gen_new_task_to_run( | 514 self._gen_new_task_to_run( |
| 501 properties=dict(dimensions=request_dimensions), nb_task=0) | 515 properties={ |
| 516 'dimensions_flat': [u'%s:%s' % (k, v) for k, v in request_dimensions], |
| 517 }, |
| 518 nb_task=0) |
| 502 # Bot declares exactly same dimensions so it matches. | 519 # Bot declares exactly same dimensions so it matches. |
| 503 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()} | 520 bot_dimensions = {k: [v] for k, v in request_dimensions} |
| 504 actual = _yield_next_available_task_to_dispatch( | 521 actual = _yield_next_available_task_to_dispatch( |
| 505 bot_dimensions, datetime.datetime(1969, 1, 1)) | 522 bot_dimensions, datetime.datetime(1969, 1, 1)) |
| 506 self.failIf(actual) | 523 self.failIf(actual) |
| 507 | 524 |
| 508 def test_yield_next_available_task_to_run_task_meets_deadline(self): | 525 def test_yield_next_available_task_to_run_task_meets_deadline(self): |
| 509 request_dimensions = { | 526 request_dimensions = [ |
| 510 u'foo': u'bar', | 527 (u'foo', u'bar'), |
| 511 u'id': u'localhost', | 528 (u'id', u'localhost'), |
| 512 u'os': u'Windows-3.1.1', | 529 (u'os', u'Windows-3.1.1'), |
| 513 u'pool': u'default', | 530 (u'pool', u'default'), |
| 514 } | 531 ] |
| 515 self._gen_new_task_to_run( | 532 self._gen_new_task_to_run( |
| 516 properties=dict(dimensions=request_dimensions), nb_task=0) | 533 properties=dict(dimensions_flat=_flatten(request_dimensions)), |
| 534 nb_task=0) |
| 517 # Bot declares exactly same dimensions so it matches. | 535 # Bot declares exactly same dimensions so it matches. |
| 518 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()} | 536 bot_dimensions = {k: [v] for k, v in request_dimensions} |
| 519 actual = _yield_next_available_task_to_dispatch( | 537 actual = _yield_next_available_task_to_dispatch( |
| 520 bot_dimensions, datetime.datetime(3000, 1, 1)) | 538 bot_dimensions, datetime.datetime(3000, 1, 1)) |
| 521 expected = [ | 539 expected = [ |
| 522 { | 540 { |
| 523 'dimensions_hash': _hash_dimensions(request_dimensions), | 541 'dimensions_hash': _hash_dimensions(request_dimensions), |
| 524 'expiration_ts': self.expiration_ts, | 542 'expiration_ts': self.expiration_ts, |
| 525 'queue_number': '0x3f6b0f050c8ede72', | 543 'queue_number': '0x3f6b0f050c8ede72', |
| 526 }, | 544 }, |
| 527 ] | 545 ] |
| 528 self.assertEqual(expected, actual) | 546 self.assertEqual(expected, actual) |
| 529 | 547 |
| 530 def test_yield_next_available_task_to_run_task_terminate(self): | 548 def test_yield_next_available_task_to_run_task_terminate(self): |
| 531 request_dimensions = { | 549 request_dimensions = [ |
| 532 u'id': u'fake-id', | 550 (u'id', u'fake-id'), |
| 533 } | 551 ] |
| 534 task = self._gen_new_task_to_run( | 552 task = self._gen_new_task_to_run( |
| 535 priority=0, | 553 priority=0, |
| 536 properties=dict( | 554 properties=dict( |
| 537 command=[], dimensions=request_dimensions, execution_timeout_secs=0, | 555 command=[], dimensions_flat=_flatten(request_dimensions), |
| 538 grace_period_secs=0), | 556 execution_timeout_secs=0, grace_period_secs=0), |
| 539 nb_task=0) | 557 nb_task=0) |
| 540 self.assertTrue(task.key.parent().get().properties.is_terminate) | 558 self.assertTrue(task.key.parent().get().properties.is_terminate) |
| 541 # Bot declares exactly same dimensions so it matches. | 559 # Bot declares exactly same dimensions so it matches. |
| 542 bot_dimensions = {k: [v] for k, v in request_dimensions.iteritems()} | 560 bot_dimensions = {k: [v] for k, v in request_dimensions} |
| 543 bot_dimensions[u'pool'] = [u'default'] | 561 bot_dimensions[u'pool'] = [u'default'] |
| 544 actual = _yield_next_available_task_to_dispatch(bot_dimensions, 0) | 562 actual = _yield_next_available_task_to_dispatch(bot_dimensions, 0) |
| 545 expected = [ | 563 expected = [ |
| 546 { | 564 { |
| 547 'dimensions_hash': _hash_dimensions(request_dimensions), | 565 'dimensions_hash': _hash_dimensions(request_dimensions), |
| 548 'expiration_ts': self.expiration_ts, | 566 'expiration_ts': self.expiration_ts, |
| 549 'queue_number': '0x54795e3c800ede72', | 567 'queue_number': '0x54795e3c800ede72', |
| 550 }, | 568 }, |
| 551 ] | 569 ] |
| 552 self.assertEqual(expected, actual) | 570 self.assertEqual(expected, actual) |
| (...skipping 13 matching lines...) Expand all Loading... |
| 566 # All tasks are now expired. Note that even if they still have .queue_number | 584 # All tasks are now expired. Note that even if they still have .queue_number |
| 567 # set because the cron job wasn't run, they are still not yielded by | 585 # set because the cron job wasn't run, they are still not yielded by |
| 568 # yield_next_available_task_to_dispatch() | 586 # yield_next_available_task_to_dispatch() |
| 569 self.mock_now(self.now, 61) | 587 self.mock_now(self.now, 61) |
| 570 self.assertEqual( | 588 self.assertEqual( |
| 571 0, len(_yield_next_available_task_to_dispatch(bot_dimensions, None))) | 589 0, len(_yield_next_available_task_to_dispatch(bot_dimensions, None))) |
| 572 self.assertEqual( | 590 self.assertEqual( |
| 573 1, len(list(task_to_run.yield_expired_task_to_run()))) | 591 1, len(list(task_to_run.yield_expired_task_to_run()))) |
| 574 | 592 |
| 575 def test_is_reapable(self): | 593 def test_is_reapable(self): |
| 576 req_dimensions = {u'os': u'Windows-3.1.1', u'pool': u'default'} | 594 req_dimensions = [(u'os', u'Windows-3.1.1'), (u'pool', u'default')] |
| 577 to_run = self._gen_new_task_to_run( | 595 to_run = self._gen_new_task_to_run( |
| 578 properties=dict(dimensions=req_dimensions)) | 596 properties=dict(dimensions_flat=_flatten(req_dimensions))) |
| 579 bot_dimensions = { | 597 bot_dimensions = { |
| 580 u'id': [u'localhost'], | 598 u'id': [u'localhost'], |
| 581 u'os': [u'Windows-3.1.1'], | 599 u'os': [u'Windows-3.1.1'], |
| 582 u'pool': [u'default'], | 600 u'pool': [u'default'], |
| 583 } | 601 } |
| 584 self.assertEqual( | 602 self.assertEqual( |
| 585 1, len(_yield_next_available_task_to_dispatch(bot_dimensions, None))) | 603 1, len(_yield_next_available_task_to_dispatch(bot_dimensions, None))) |
| 586 | 604 |
| 587 self.assertEqual(True, to_run.is_reapable) | 605 self.assertEqual(True, to_run.is_reapable) |
| 588 to_run.queue_number = None | 606 to_run.queue_number = None |
| 589 to_run.put() | 607 to_run.put() |
| 590 self.assertEqual(False, to_run.is_reapable) | 608 self.assertEqual(False, to_run.is_reapable) |
| 591 | 609 |
| 592 def test_set_lookup_cache(self): | 610 def test_set_lookup_cache(self): |
| 611 req_dimensions = [(u'os', u'Windows-3.1.1'), (u'pool', u'default')] |
| 593 to_run = self._gen_new_task_to_run( | 612 to_run = self._gen_new_task_to_run( |
| 594 properties={ | 613 properties=dict(dimensions_flat=_flatten(req_dimensions))) |
| 595 'dimensions': {u'os': u'Windows-3.1.1', u'pool': u'default'}, | |
| 596 }) | |
| 597 self.assertEqual(False, task_to_run._lookup_cache_is_taken(to_run.key)) | 614 self.assertEqual(False, task_to_run._lookup_cache_is_taken(to_run.key)) |
| 598 task_to_run.set_lookup_cache(to_run.key, True) | 615 task_to_run.set_lookup_cache(to_run.key, True) |
| 599 self.assertEqual(False, task_to_run._lookup_cache_is_taken(to_run.key)) | 616 self.assertEqual(False, task_to_run._lookup_cache_is_taken(to_run.key)) |
| 600 task_to_run.set_lookup_cache(to_run.key, False) | 617 task_to_run.set_lookup_cache(to_run.key, False) |
| 601 self.assertEqual(True, task_to_run._lookup_cache_is_taken(to_run.key)) | 618 self.assertEqual(True, task_to_run._lookup_cache_is_taken(to_run.key)) |
| 602 task_to_run.set_lookup_cache(to_run.key, True) | 619 task_to_run.set_lookup_cache(to_run.key, True) |
| 603 self.assertEqual(False, task_to_run._lookup_cache_is_taken(to_run.key)) | 620 self.assertEqual(False, task_to_run._lookup_cache_is_taken(to_run.key)) |
| 604 | 621 |
| 605 | 622 |
| 606 if __name__ == '__main__': | 623 if __name__ == '__main__': |
| 607 if '-v' in sys.argv: | 624 if '-v' in sys.argv: |
| 608 unittest.TestCase.maxDiff = None | 625 unittest.TestCase.maxDiff = None |
| 609 logging.basicConfig( | 626 logging.basicConfig( |
| 610 level=logging.DEBUG if '-v' in sys.argv else logging.ERROR) | 627 level=logging.DEBUG if '-v' in sys.argv else logging.ERROR) |
| 611 unittest.main() | 628 unittest.main() |
| OLD | NEW |