| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package engine | 5 package engine |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "encoding/json" | 8 "encoding/json" |
| 9 "math/rand" | 9 "math/rand" |
| 10 "sort" | 10 "sort" |
| (...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 115 }}), ShouldBeNil) | 115 }}), ShouldBeNil) |
| 116 So(allJobs(c), ShouldResemble, []Job{ | 116 So(allJobs(c), ShouldResemble, []Job{ |
| 117 { | 117 { |
| 118 JobID: "abc/1", | 118 JobID: "abc/1", |
| 119 ProjectID: "abc", | 119 ProjectID: "abc", |
| 120 Revision: "rev2", | 120 Revision: "rev2", |
| 121 Enabled: true, | 121 Enabled: true, |
| 122 Schedule: "*/1 * * * * * *", | 122 Schedule: "*/1 * * * * * *", |
| 123 State: JobState{ | 123 State: JobState{ |
| 124 State: "SCHEDULED", | 124 State: "SCHEDULED", |
| 125 » » » » » TickNonce: 9111178027324032851, | 125 » » » » » TickNonce: 6891407870632131044, |
| 126 TickTime: epoch.Add(1 * time.Second), | 126 TickTime: epoch.Add(1 * time.Second), |
| 127 }, | 127 }, |
| 128 }, | 128 }, |
| 129 }) | 129 }) |
| 130 // Enqueued timer task to launch it. | 130 // Enqueued timer task to launch it. |
| 131 task = ensureOneTask(c, "timers-q") | 131 task = ensureOneTask(c, "timers-q") |
| 132 So(task.Path, ShouldEqual, "/timers") | 132 So(task.Path, ShouldEqual, "/timers") |
| 133 So(task.ETA, ShouldResemble, epoch.Add(1*time.Second)) | 133 So(task.ETA, ShouldResemble, epoch.Add(1*time.Second)) |
| 134 tq.GetTestable(c).ResetTasks() | 134 tq.GetTestable(c).ResetTasks() |
| 135 | 135 |
| (...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 237 So(e.ResetAllJobsOnDevServer(c), ShouldBeNil) | 237 So(e.ResetAllJobsOnDevServer(c), ShouldBeNil) |
| 238 So(allJobs(c), ShouldResemble, []Job{ | 238 So(allJobs(c), ShouldResemble, []Job{ |
| 239 { | 239 { |
| 240 JobID: "abc/1", | 240 JobID: "abc/1", |
| 241 ProjectID: "abc", | 241 ProjectID: "abc", |
| 242 Revision: "rev1", | 242 Revision: "rev1", |
| 243 Enabled: true, | 243 Enabled: true, |
| 244 Schedule: "*/5 * * * * * *", | 244 Schedule: "*/5 * * * * * *", |
| 245 State: JobState{ | 245 State: JobState{ |
| 246 State: "SCHEDULED", | 246 State: "SCHEDULED", |
| 247 » » » » » TickNonce: 9111178027324032851, | 247 » » » » » TickNonce: 6891407870632131044, |
| 248 TickTime: epoch.Add(65 * time.Second), | 248 TickTime: epoch.Add(65 * time.Second), |
| 249 }, | 249 }, |
| 250 }, | 250 }, |
| 251 }) | 251 }) |
| 252 }) | 252 }) |
| 253 } | 253 } |
| 254 | 254 |
| 255 func TestFullFlow(t *testing.T) { | 255 func TestFullFlow(t *testing.T) { |
| 256 Convey("full flow", t, func() { | 256 Convey("full flow", t, func() { |
| 257 c := newTestContext(epoch) | 257 c := newTestContext(epoch) |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 295 So(allJobs(c), ShouldResemble, []Job{ | 295 So(allJobs(c), ShouldResemble, []Job{ |
| 296 { | 296 { |
| 297 JobID: "abc/1", | 297 JobID: "abc/1", |
| 298 ProjectID: "abc", | 298 ProjectID: "abc", |
| 299 Revision: "rev1", | 299 Revision: "rev1", |
| 300 Enabled: true, | 300 Enabled: true, |
| 301 Schedule: "*/5 * * * * * *", | 301 Schedule: "*/5 * * * * * *", |
| 302 Task: taskBytes, | 302 Task: taskBytes, |
| 303 State: JobState{ | 303 State: JobState{ |
| 304 State: "QUEUED", | 304 State: "QUEUED", |
| 305 » » » » » TickNonce: 9111178027324032851, | 305 » » » » » TickNonce: 6891407870632131044, |
| 306 TickTime: epoch.Add(10 * time.Sec
ond), | 306 TickTime: epoch.Add(10 * time.Sec
ond), |
| 307 » » » » » InvocationNonce: 631000787647335445, | 307 » » » » » InvocationNonce: 1907242367099883828, |
| 308 InvocationTime: epoch.Add(5 * time.Seco
nd), | 308 InvocationTime: epoch.Add(5 * time.Seco
nd), |
| 309 }, | 309 }, |
| 310 }, | 310 }, |
| 311 }) | 311 }) |
| 312 | 312 |
| 313 // Next tick task is added. | 313 // Next tick task is added. |
| 314 tickTask := ensureOneTask(c, "timers-q") | 314 tickTask := ensureOneTask(c, "timers-q") |
| 315 So(tickTask.Path, ShouldEqual, "/timers") | 315 So(tickTask.Path, ShouldEqual, "/timers") |
| 316 So(tickTask.ETA, ShouldResemble, epoch.Add(10*time.Second)) | 316 So(tickTask.ETA, ShouldResemble, epoch.Add(10*time.Second)) |
| 317 | 317 |
| 318 // Invocation task (ETA is 1 sec in the future). | 318 // Invocation task (ETA is 1 sec in the future). |
| 319 invTask := ensureOneTask(c, "invs-q") | 319 invTask := ensureOneTask(c, "invs-q") |
| 320 So(invTask.Path, ShouldEqual, "/invs") | 320 So(invTask.Path, ShouldEqual, "/invs") |
| 321 So(invTask.ETA, ShouldResemble, epoch.Add(6*time.Second)) | 321 So(invTask.ETA, ShouldResemble, epoch.Add(6*time.Second)) |
| 322 tq.GetTestable(c).ResetTasks() | 322 tq.GetTestable(c).ResetTasks() |
| 323 | 323 |
| 324 // Time to run the job and it fails to launch with a transient e
rror. | 324 // Time to run the job and it fails to launch with a transient e
rror. |
| 325 mgr.launchTask = func(ctl task.Controller) error { | 325 mgr.launchTask = func(ctl task.Controller) error { |
| 326 // Check data provided via the controller. | 326 // Check data provided via the controller. |
| 327 So(ctl.JobID(), ShouldEqual, "abc/1") | 327 So(ctl.JobID(), ShouldEqual, "abc/1") |
| 328 » » » So(ctl.InvocationID(), ShouldEqual, int64(92000935185826
66224)) | 328 » » » So(ctl.InvocationID(), ShouldEqual, int64(92000935185825
46608)) |
| 329 » » » So(ctl.InvocationNonce(), ShouldEqual, int64(63100078764
7335445)) | 329 » » » So(ctl.InvocationNonce(), ShouldEqual, int64(19072423670
99883828)) |
| 330 So(ctl.Task(), ShouldResemble, &messages.NoopTask{}) | 330 So(ctl.Task(), ShouldResemble, &messages.NoopTask{}) |
| 331 | 331 |
| 332 ctl.DebugLog("oops, fail") | 332 ctl.DebugLog("oops, fail") |
| 333 return errors.WrapTransient(errors.New("oops")) | 333 return errors.WrapTransient(errors.New("oops")) |
| 334 } | 334 } |
| 335 So(errors.IsTransient(e.ExecuteSerializedAction(c, invTask.Paylo
ad, 0)), ShouldBeTrue) | 335 So(errors.IsTransient(e.ExecuteSerializedAction(c, invTask.Paylo
ad, 0)), ShouldBeTrue) |
| 336 | 336 |
| 337 // Still in QUEUED state, but with InvocatioID assigned. | 337 // Still in QUEUED state, but with InvocatioID assigned. |
| 338 jobs := allJobs(c) | 338 jobs := allJobs(c) |
| 339 So(jobs, ShouldResemble, []Job{ | 339 So(jobs, ShouldResemble, []Job{ |
| 340 { | 340 { |
| 341 JobID: "abc/1", | 341 JobID: "abc/1", |
| 342 ProjectID: "abc", | 342 ProjectID: "abc", |
| 343 Revision: "rev1", | 343 Revision: "rev1", |
| 344 Enabled: true, | 344 Enabled: true, |
| 345 Schedule: "*/5 * * * * * *", | 345 Schedule: "*/5 * * * * * *", |
| 346 Task: taskBytes, | 346 Task: taskBytes, |
| 347 State: JobState{ | 347 State: JobState{ |
| 348 State: "QUEUED", | 348 State: "QUEUED", |
| 349 » » » » » TickNonce: 9111178027324032851, | 349 » » » » » TickNonce: 6891407870632131044, |
| 350 TickTime: epoch.Add(10 * time.Sec
ond), | 350 TickTime: epoch.Add(10 * time.Sec
ond), |
| 351 » » » » » InvocationNonce: 631000787647335445, | 351 » » » » » InvocationNonce: 1907242367099883828, |
| 352 InvocationTime: epoch.Add(5 * time.Seco
nd), | 352 InvocationTime: epoch.Add(5 * time.Seco
nd), |
| 353 » » » » » InvocationID: 9200093518582666224, | 353 » » » » » InvocationID: 9200093518582546608, |
| 354 }, | 354 }, |
| 355 }, | 355 }, |
| 356 }) | 356 }) |
| 357 jobKey := ds.KeyForObj(c, &jobs[0]) | 357 jobKey := ds.KeyForObj(c, &jobs[0]) |
| 358 | 358 |
| 359 // Check Invocation fields. | 359 // Check Invocation fields. |
| 360 » » inv := Invocation{ID: 9200093518582666224, JobKey: jobKey} | 360 » » inv := Invocation{ID: 9200093518582546608, JobKey: jobKey} |
| 361 So(ds.Get(c, &inv), ShouldBeNil) | 361 So(ds.Get(c, &inv), ShouldBeNil) |
| 362 inv.JobKey = nil // for easier ShouldResemble below | 362 inv.JobKey = nil // for easier ShouldResemble below |
| 363 debugLog := inv.DebugLog | 363 debugLog := inv.DebugLog |
| 364 inv.DebugLog = "" | 364 inv.DebugLog = "" |
| 365 So(inv, ShouldResemble, Invocation{ | 365 So(inv, ShouldResemble, Invocation{ |
| 366 » » » ID: 9200093518582666224, | 366 » » » ID: 9200093518582546608, |
| 367 » » » InvocationNonce: 631000787647335445, | 367 » » » InvocationNonce: 1907242367099883828, |
| 368 Revision: "rev1", | 368 Revision: "rev1", |
| 369 Started: epoch.Add(5 * time.Second), | 369 Started: epoch.Add(5 * time.Second), |
| 370 Finished: epoch.Add(5 * time.Second), | 370 Finished: epoch.Add(5 * time.Second), |
| 371 Task: taskBytes, | 371 Task: taskBytes, |
| 372 DebugLog: "", | 372 DebugLog: "", |
| 373 Status: task.StatusFailed, | 373 Status: task.StatusFailed, |
| 374 MutationsCount: 1, | 374 MutationsCount: 1, |
| 375 }) | 375 }) |
| 376 So(debugLog, ShouldContainSubstring, "[22:42:05.000] Invocation
initiated (attempt 1)") | 376 So(debugLog, ShouldContainSubstring, "[22:42:05.000] Invocation
initiated (attempt 1)") |
| 377 So(debugLog, ShouldContainSubstring, "[22:42:05.000] oops, fail"
) | 377 So(debugLog, ShouldContainSubstring, "[22:42:05.000] oops, fail"
) |
| (...skipping 11 matching lines...) Expand all Loading... |
| 389 So(allJobs(c), ShouldResemble, []Job{ | 389 So(allJobs(c), ShouldResemble, []Job{ |
| 390 { | 390 { |
| 391 JobID: "abc/1", | 391 JobID: "abc/1", |
| 392 ProjectID: "abc", | 392 ProjectID: "abc", |
| 393 Revision: "rev1", | 393 Revision: "rev1", |
| 394 Enabled: true, | 394 Enabled: true, |
| 395 Schedule: "*/5 * * * * * *", | 395 Schedule: "*/5 * * * * * *", |
| 396 Task: taskBytes, | 396 Task: taskBytes, |
| 397 State: JobState{ | 397 State: JobState{ |
| 398 State: "RUNNING", | 398 State: "RUNNING", |
| 399 » » » » » » TickNonce: 9111178027
324032851, | 399 » » » » » » TickNonce: 6891407870
632131044, |
| 400 TickTime: epoch.Add(
10 * time.Second), | 400 TickTime: epoch.Add(
10 * time.Second), |
| 401 » » » » » » InvocationNonce: 6310007876
47335445, | 401 » » » » » » InvocationNonce: 1907242367
099883828, |
| 402 InvocationRetryCount: 1, | 402 InvocationRetryCount: 1, |
| 403 InvocationTime: epoch.Add(
5 * time.Second), | 403 InvocationTime: epoch.Add(
5 * time.Second), |
| 404 » » » » » » InvocationID: 9200093518
581789696, | 404 » » » » » » InvocationID: 9200093518
581921600, |
| 405 }, | 405 }, |
| 406 }, | 406 }, |
| 407 }) | 407 }) |
| 408 » » » inv := Invocation{ID: 9200093518581789696, JobKey: jobKe
y} | 408 » » » inv := Invocation{ID: 9200093518581921600, JobKey: jobKe
y} |
| 409 So(ds.Get(c, &inv), ShouldBeNil) | 409 So(ds.Get(c, &inv), ShouldBeNil) |
| 410 inv.JobKey = nil // for easier ShouldResemble below | 410 inv.JobKey = nil // for easier ShouldResemble below |
| 411 So(inv, ShouldResemble, Invocation{ | 411 So(inv, ShouldResemble, Invocation{ |
| 412 » » » » ID: 9200093518581789696, | 412 » » » » ID: 9200093518581921600, |
| 413 » » » » InvocationNonce: 631000787647335445, | 413 » » » » InvocationNonce: 1907242367099883828, |
| 414 Revision: "rev1", | 414 Revision: "rev1", |
| 415 Started: epoch.Add(5 * time.Second), | 415 Started: epoch.Add(5 * time.Second), |
| 416 Task: taskBytes, | 416 Task: taskBytes, |
| 417 DebugLog: "[22:42:05.000] Invocation init
iated (attempt 2)\n[22:42:05.000] Starting\n", | 417 DebugLog: "[22:42:05.000] Invocation init
iated (attempt 2)\n[22:42:05.000] Starting\n", |
| 418 RetryCount: 1, | 418 RetryCount: 1, |
| 419 Status: task.StatusRunning, | 419 Status: task.StatusRunning, |
| 420 MutationsCount: 1, | 420 MutationsCount: 1, |
| 421 }) | 421 }) |
| 422 | 422 |
| 423 // Noop save, just for the code coverage. | 423 // Noop save, just for the code coverage. |
| 424 So(ctl.Save(), ShouldBeNil) | 424 So(ctl.Save(), ShouldBeNil) |
| 425 | 425 |
| 426 // Change state to the final one. | 426 // Change state to the final one. |
| 427 ctl.State().Status = task.StatusSucceeded | 427 ctl.State().Status = task.StatusSucceeded |
| 428 ctl.State().ViewURL = "http://view_url" | 428 ctl.State().ViewURL = "http://view_url" |
| 429 ctl.State().TaskData = []byte("blah") | 429 ctl.State().TaskData = []byte("blah") |
| 430 return nil | 430 return nil |
| 431 } | 431 } |
| 432 So(e.ExecuteSerializedAction(c, invTask.Payload, 1), ShouldBeNil
) | 432 So(e.ExecuteSerializedAction(c, invTask.Payload, 1), ShouldBeNil
) |
| 433 | 433 |
| 434 // After final save. | 434 // After final save. |
| 435 » » inv = Invocation{ID: 9200093518581789696, JobKey: jobKey} | 435 » » inv = Invocation{ID: 9200093518581921600, JobKey: jobKey} |
| 436 So(ds.Get(c, &inv), ShouldBeNil) | 436 So(ds.Get(c, &inv), ShouldBeNil) |
| 437 inv.JobKey = nil // for easier ShouldResemble below | 437 inv.JobKey = nil // for easier ShouldResemble below |
| 438 debugLog = inv.DebugLog | 438 debugLog = inv.DebugLog |
| 439 inv.DebugLog = "" | 439 inv.DebugLog = "" |
| 440 So(inv, ShouldResemble, Invocation{ | 440 So(inv, ShouldResemble, Invocation{ |
| 441 » » » ID: 9200093518581789696, | 441 » » » ID: 9200093518581921600, |
| 442 » » » InvocationNonce: 631000787647335445, | 442 » » » InvocationNonce: 1907242367099883828, |
| 443 Revision: "rev1", | 443 Revision: "rev1", |
| 444 Started: epoch.Add(5 * time.Second), | 444 Started: epoch.Add(5 * time.Second), |
| 445 Finished: epoch.Add(5 * time.Second), | 445 Finished: epoch.Add(5 * time.Second), |
| 446 Task: taskBytes, | 446 Task: taskBytes, |
| 447 DebugLog: "", | 447 DebugLog: "", |
| 448 RetryCount: 1, | 448 RetryCount: 1, |
| 449 Status: task.StatusSucceeded, | 449 Status: task.StatusSucceeded, |
| 450 ViewURL: "http://view_url", | 450 ViewURL: "http://view_url", |
| 451 TaskData: []byte("blah"), | 451 TaskData: []byte("blah"), |
| 452 MutationsCount: 2, | 452 MutationsCount: 2, |
| 453 }) | 453 }) |
| 454 So(debugLog, ShouldContainSubstring, "[22:42:05.000] Invocation
initiated (attempt 2)") | 454 So(debugLog, ShouldContainSubstring, "[22:42:05.000] Invocation
initiated (attempt 2)") |
| 455 So(debugLog, ShouldContainSubstring, "[22:42:05.000] Starting") | 455 So(debugLog, ShouldContainSubstring, "[22:42:05.000] Starting") |
| 456 So(debugLog, ShouldContainSubstring, "with status SUCCEEDED") | 456 So(debugLog, ShouldContainSubstring, "with status SUCCEEDED") |
| 457 | 457 |
| 458 // Previous invocation is canceled. | 458 // Previous invocation is canceled. |
| 459 » » inv = Invocation{ID: 9200093518582666224, JobKey: jobKey} | 459 » » inv = Invocation{ID: 9200093518582546608, JobKey: jobKey} |
| 460 So(ds.Get(c, &inv), ShouldBeNil) | 460 So(ds.Get(c, &inv), ShouldBeNil) |
| 461 inv.JobKey = nil // for easier ShouldResemble below | 461 inv.JobKey = nil // for easier ShouldResemble below |
| 462 debugLog = inv.DebugLog | 462 debugLog = inv.DebugLog |
| 463 inv.DebugLog = "" | 463 inv.DebugLog = "" |
| 464 So(inv, ShouldResemble, Invocation{ | 464 So(inv, ShouldResemble, Invocation{ |
| 465 » » » ID: 9200093518582666224, | 465 » » » ID: 9200093518582546608, |
| 466 » » » InvocationNonce: 631000787647335445, | 466 » » » InvocationNonce: 1907242367099883828, |
| 467 Revision: "rev1", | 467 Revision: "rev1", |
| 468 Started: epoch.Add(5 * time.Second), | 468 Started: epoch.Add(5 * time.Second), |
| 469 Finished: epoch.Add(5 * time.Second), | 469 Finished: epoch.Add(5 * time.Second), |
| 470 Task: taskBytes, | 470 Task: taskBytes, |
| 471 DebugLog: "", | 471 DebugLog: "", |
| 472 Status: task.StatusFailed, | 472 Status: task.StatusFailed, |
| 473 MutationsCount: 1, | 473 MutationsCount: 1, |
| 474 }) | 474 }) |
| 475 So(debugLog, ShouldContainSubstring, "[22:42:05.000] Invocation
initiated (attempt 1)") | 475 So(debugLog, ShouldContainSubstring, "[22:42:05.000] Invocation
initiated (attempt 1)") |
| 476 So(debugLog, ShouldContainSubstring, "[22:42:05.000] oops, fail"
) | 476 So(debugLog, ShouldContainSubstring, "[22:42:05.000] oops, fail"
) |
| 477 So(debugLog, ShouldContainSubstring, "with status FAILED") | 477 So(debugLog, ShouldContainSubstring, "with status FAILED") |
| 478 So(debugLog, ShouldContainSubstring, "[22:42:05.000] It will pro
bably be retried") | 478 So(debugLog, ShouldContainSubstring, "[22:42:05.000] It will pro
bably be retried") |
| 479 | 479 |
| 480 // Job is in scheduled state again. | 480 // Job is in scheduled state again. |
| 481 So(allJobs(c), ShouldResemble, []Job{ | 481 So(allJobs(c), ShouldResemble, []Job{ |
| 482 { | 482 { |
| 483 JobID: "abc/1", | 483 JobID: "abc/1", |
| 484 ProjectID: "abc", | 484 ProjectID: "abc", |
| 485 Revision: "rev1", | 485 Revision: "rev1", |
| 486 Enabled: true, | 486 Enabled: true, |
| 487 Schedule: "*/5 * * * * * *", | 487 Schedule: "*/5 * * * * * *", |
| 488 Task: taskBytes, | 488 Task: taskBytes, |
| 489 State: JobState{ | 489 State: JobState{ |
| 490 State: "SCHEDULED", | 490 State: "SCHEDULED", |
| 491 » » » » » TickNonce: 9111178027324032851, | 491 » » » » » TickNonce: 6891407870632131044, |
| 492 TickTime: epoch.Add(10 * time.Second), | 492 TickTime: epoch.Add(10 * time.Second), |
| 493 PrevTime: epoch.Add(5 * time.Second), | 493 PrevTime: epoch.Add(5 * time.Second), |
| 494 }, | 494 }, |
| 495 }, | 495 }, |
| 496 }) | 496 }) |
| 497 }) | 497 }) |
| 498 } | 498 } |
| 499 | 499 |
| 500 func TestGenerateInvocationID(t *testing.T) { | 500 func TestGenerateInvocationID(t *testing.T) { |
| 501 Convey("generateInvocationID does not collide", t, func() { | 501 Convey("generateInvocationID does not collide", t, func() { |
| (...skipping 587 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1089 | 1089 |
| 1090 func ensureOneTask(c context.Context, q string) *tq.Task { | 1090 func ensureOneTask(c context.Context, q string) *tq.Task { |
| 1091 tqt := tq.GetTestable(c) | 1091 tqt := tq.GetTestable(c) |
| 1092 tasks := tqt.GetScheduledTasks()[q] | 1092 tasks := tqt.GetScheduledTasks()[q] |
| 1093 So(len(tasks), ShouldEqual, 1) | 1093 So(len(tasks), ShouldEqual, 1) |
| 1094 for _, t := range tasks { | 1094 for _, t := range tasks { |
| 1095 return t | 1095 return t |
| 1096 } | 1096 } |
| 1097 return nil | 1097 return nil |
| 1098 } | 1098 } |
| OLD | NEW |