OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package engine | 5 package engine |
6 | 6 |
7 import ( | 7 import ( |
8 "encoding/json" | 8 "encoding/json" |
9 "math/rand" | 9 "math/rand" |
10 "sort" | 10 "sort" |
11 "strings" | 11 "strings" |
12 "testing" | 12 "testing" |
13 "time" | 13 "time" |
14 | 14 |
15 "github.com/golang/protobuf/proto" | 15 "github.com/golang/protobuf/proto" |
16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
17 "google.golang.org/api/pubsub/v1" | 17 "google.golang.org/api/pubsub/v1" |
18 | 18 |
19 "github.com/luci/gae/impl/memory" | 19 "github.com/luci/gae/impl/memory" |
20 ds "github.com/luci/gae/service/datastore" | 20 ds "github.com/luci/gae/service/datastore" |
21 tq "github.com/luci/gae/service/taskqueue" | 21 tq "github.com/luci/gae/service/taskqueue" |
22 | 22 |
23 "github.com/luci/luci-go/common/clock" | 23 "github.com/luci/luci-go/common/clock" |
24 "github.com/luci/luci-go/common/clock/testclock" | 24 "github.com/luci/luci-go/common/clock/testclock" |
25 "github.com/luci/luci-go/common/data/rand/mathrand" | 25 "github.com/luci/luci-go/common/data/rand/mathrand" |
26 "github.com/luci/luci-go/common/data/stringset" | 26 "github.com/luci/luci-go/common/data/stringset" |
27 "github.com/luci/luci-go/common/errors" | 27 "github.com/luci/luci-go/common/errors" |
| 28 "github.com/luci/luci-go/common/retry/transient" |
28 "github.com/luci/luci-go/server/secrets/testsecrets" | 29 "github.com/luci/luci-go/server/secrets/testsecrets" |
29 | 30 |
30 "github.com/luci/luci-go/scheduler/appengine/catalog" | 31 "github.com/luci/luci-go/scheduler/appengine/catalog" |
31 "github.com/luci/luci-go/scheduler/appengine/messages" | 32 "github.com/luci/luci-go/scheduler/appengine/messages" |
32 "github.com/luci/luci-go/scheduler/appengine/task" | 33 "github.com/luci/luci-go/scheduler/appengine/task" |
33 "github.com/luci/luci-go/scheduler/appengine/task/noop" | 34 "github.com/luci/luci-go/scheduler/appengine/task/noop" |
34 | 35 |
35 . "github.com/luci/luci-go/common/testing/assertions" | 36 . "github.com/luci/luci-go/common/testing/assertions" |
36 . "github.com/smartystreets/goconvey/convey" | 37 . "github.com/smartystreets/goconvey/convey" |
37 ) | 38 ) |
(...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
191 e, _ := newTestEngine() | 192 e, _ := newTestEngine() |
192 | 193 |
193 // Pretend collision happened in all retries. | 194 // Pretend collision happened in all retries. |
194 ds.GetTestable(c).SetTransactionRetryCount(15) | 195 ds.GetTestable(c).SetTransactionRetryCount(15) |
195 err := e.UpdateProjectJobs(c, "abc", []catalog.Definition{ | 196 err := e.UpdateProjectJobs(c, "abc", []catalog.Definition{ |
196 { | 197 { |
197 JobID: "abc/1", | 198 JobID: "abc/1", |
198 Revision: "rev1", | 199 Revision: "rev1", |
199 Schedule: "*/5 * * * * * *", | 200 Schedule: "*/5 * * * * * *", |
200 }}) | 201 }}) |
201 » » So(errors.IsTransient(err), ShouldBeTrue) | 202 » » So(transient.Tag.In(err), ShouldBeTrue) |
202 So(allJobs(c), ShouldResemble, []Job{}) | 203 So(allJobs(c), ShouldResemble, []Job{}) |
203 ensureZeroTasks(c, "timers-q") | 204 ensureZeroTasks(c, "timers-q") |
204 ensureZeroTasks(c, "invs-q") | 205 ensureZeroTasks(c, "invs-q") |
205 }) | 206 }) |
206 } | 207 } |
207 | 208 |
208 func TestResetAllJobsOnDevServer(t *testing.T) { | 209 func TestResetAllJobsOnDevServer(t *testing.T) { |
209 Convey("works", t, func() { | 210 Convey("works", t, func() { |
210 c := newTestContext(epoch) | 211 c := newTestContext(epoch) |
211 e, _ := newTestEngine() | 212 e, _ := newTestEngine() |
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
323 | 324 |
324 // Time to run the job and it fails to launch with a transient e
rror. | 325 // Time to run the job and it fails to launch with a transient e
rror. |
325 mgr.launchTask = func(ctx context.Context, ctl task.Controller)
error { | 326 mgr.launchTask = func(ctx context.Context, ctl task.Controller)
error { |
326 // Check data provided via the controller. | 327 // Check data provided via the controller. |
327 So(ctl.JobID(), ShouldEqual, "abc/1") | 328 So(ctl.JobID(), ShouldEqual, "abc/1") |
328 So(ctl.InvocationID(), ShouldEqual, int64(92000935185821
98800)) | 329 So(ctl.InvocationID(), ShouldEqual, int64(92000935185821
98800)) |
329 So(ctl.InvocationNonce(), ShouldEqual, int64(92895361673
2700780)) | 330 So(ctl.InvocationNonce(), ShouldEqual, int64(92895361673
2700780)) |
330 So(ctl.Task(), ShouldResemble, &messages.NoopTask{}) | 331 So(ctl.Task(), ShouldResemble, &messages.NoopTask{}) |
331 | 332 |
332 ctl.DebugLog("oops, fail") | 333 ctl.DebugLog("oops, fail") |
333 » » » return errors.WrapTransient(errors.New("oops")) | 334 » » » return errors.New("oops", transient.Tag) |
334 } | 335 } |
335 » » So(errors.IsTransient(e.ExecuteSerializedAction(c, invTask.Paylo
ad, 0)), ShouldBeTrue) | 336 » » So(transient.Tag.In(e.ExecuteSerializedAction(c, invTask.Payload
, 0)), ShouldBeTrue) |
336 | 337 |
337 // Still in QUEUED state, but with InvocatioID assigned. | 338 // Still in QUEUED state, but with InvocatioID assigned. |
338 jobs := allJobs(c) | 339 jobs := allJobs(c) |
339 So(jobs, ShouldResemble, []Job{ | 340 So(jobs, ShouldResemble, []Job{ |
340 { | 341 { |
341 JobID: "abc/1", | 342 JobID: "abc/1", |
342 ProjectID: "abc", | 343 ProjectID: "abc", |
343 Revision: "rev1", | 344 Revision: "rev1", |
344 Enabled: true, | 345 Enabled: true, |
345 Schedule: "*/5 * * * * * *", | 346 Schedule: "*/5 * * * * * *", |
(...skipping 392 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
738 So(e.ProcessPubSubPush(c, blob), ShouldErrLike, "bad tok
en") | 739 So(e.ProcessPubSubPush(c, blob), ShouldErrLike, "bad tok
en") |
739 }) | 740 }) |
740 | 741 |
741 Convey("ProcessPubSubPush handles missing invocation", func() { | 742 Convey("ProcessPubSubPush handles missing invocation", func() { |
742 ds.Delete(c, ds.KeyForObj(c, &inv)) | 743 ds.Delete(c, ds.KeyForObj(c, &inv)) |
743 msg := pubsub.PubsubMessage{ | 744 msg := pubsub.PubsubMessage{ |
744 Attributes: map[string]string{"auth_token": toke
n}, | 745 Attributes: map[string]string{"auth_token": toke
n}, |
745 } | 746 } |
746 blob, err := json.Marshal(&msg) | 747 blob, err := json.Marshal(&msg) |
747 So(err, ShouldBeNil) | 748 So(err, ShouldBeNil) |
748 » » » So(errors.IsTransient(e.ProcessPubSubPush(c, blob)), Sho
uldBeFalse) | 749 » » » So(transient.Tag.In(e.ProcessPubSubPush(c, blob)), Shoul
dBeFalse) |
749 }) | 750 }) |
750 }) | 751 }) |
751 } | 752 } |
752 | 753 |
753 func TestAborts(t *testing.T) { | 754 func TestAborts(t *testing.T) { |
754 Convey("with mock invocation", t, func() { | 755 Convey("with mock invocation", t, func() { |
755 c := newTestContext(epoch) | 756 c := newTestContext(epoch) |
756 e, mgr := newTestEngine() | 757 e, mgr := newTestEngine() |
757 | 758 |
758 // A job in "QUEUED" state (about to run an invocation). | 759 // A job in "QUEUED" state (about to run an invocation). |
(...skipping 334 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1093 | 1094 |
1094 func ensureOneTask(c context.Context, q string) *tq.Task { | 1095 func ensureOneTask(c context.Context, q string) *tq.Task { |
1095 tqt := tq.GetTestable(c) | 1096 tqt := tq.GetTestable(c) |
1096 tasks := tqt.GetScheduledTasks()[q] | 1097 tasks := tqt.GetScheduledTasks()[q] |
1097 So(len(tasks), ShouldEqual, 1) | 1098 So(len(tasks), ShouldEqual, 1) |
1098 for _, t := range tasks { | 1099 for _, t := range tasks { |
1099 return t | 1100 return t |
1100 } | 1101 } |
1101 return nil | 1102 return nil |
1102 } | 1103 } |
OLD | NEW |