Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(137)

Side by Side Diff: scheduler/appengine/engine/engine_test.go

Issue 2986033003: [scheduler]: ACLs phase 1 - per Job ACL specification and enforcement. (Closed)
Patch Set: [WIP] ACLs into engine public API. Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. 1 // Copyright 2015 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
(...skipping 18 matching lines...) Expand all
29 "github.com/luci/gae/impl/memory" 29 "github.com/luci/gae/impl/memory"
30 ds "github.com/luci/gae/service/datastore" 30 ds "github.com/luci/gae/service/datastore"
31 tq "github.com/luci/gae/service/taskqueue" 31 tq "github.com/luci/gae/service/taskqueue"
32 32
33 "github.com/luci/luci-go/common/clock" 33 "github.com/luci/luci-go/common/clock"
34 "github.com/luci/luci-go/common/clock/testclock" 34 "github.com/luci/luci-go/common/clock/testclock"
35 "github.com/luci/luci-go/common/data/rand/mathrand" 35 "github.com/luci/luci-go/common/data/rand/mathrand"
36 "github.com/luci/luci-go/common/data/stringset" 36 "github.com/luci/luci-go/common/data/stringset"
37 "github.com/luci/luci-go/common/errors" 37 "github.com/luci/luci-go/common/errors"
38 "github.com/luci/luci-go/common/retry/transient" 38 "github.com/luci/luci-go/common/retry/transient"
39 "github.com/luci/luci-go/server/auth"
40 "github.com/luci/luci-go/server/auth/authtest"
39 "github.com/luci/luci-go/server/secrets/testsecrets" 41 "github.com/luci/luci-go/server/secrets/testsecrets"
40 42
43 "github.com/luci/luci-go/scheduler/appengine/acl"
41 "github.com/luci/luci-go/scheduler/appengine/catalog" 44 "github.com/luci/luci-go/scheduler/appengine/catalog"
42 "github.com/luci/luci-go/scheduler/appengine/messages" 45 "github.com/luci/luci-go/scheduler/appengine/messages"
43 "github.com/luci/luci-go/scheduler/appengine/task" 46 "github.com/luci/luci-go/scheduler/appengine/task"
44 "github.com/luci/luci-go/scheduler/appengine/task/noop" 47 "github.com/luci/luci-go/scheduler/appengine/task/noop"
45 48
46 . "github.com/luci/luci-go/common/testing/assertions" 49 . "github.com/luci/luci-go/common/testing/assertions"
47 . "github.com/smartystreets/goconvey/convey" 50 . "github.com/smartystreets/goconvey/convey"
48 ) 51 )
49 52
50 func TestGetAllProjects(t *testing.T) { 53 func TestGetAllProjects(t *testing.T) {
(...skipping 485 matching lines...) Expand 10 before | Expand all | Expand 10 after
536 So(err, ShouldBeNil) 539 So(err, ShouldBeNil)
537 540
538 So(newer, ShouldBeLessThan, older) 541 So(newer, ShouldBeLessThan, older)
539 }) 542 })
540 } 543 }
541 544
542 func TestQueries(t *testing.T) { 545 func TestQueries(t *testing.T) {
543 Convey("with mock data", t, func() { 546 Convey("with mock data", t, func() {
544 c := newTestContext(epoch) 547 c := newTestContext(epoch)
545 e, _ := newTestEngine() 548 e, _ := newTestEngine()
549 // TODO(tandrii): remove aclDefault once all Jobs have ACLs.
550 aclDefault := acl.GrantsByRole{}
551 aclSome := acl.GrantsByRole{Readers: []string{"group:some"}}
552 aclOne := acl.GrantsByRole{Owners: []string{"one@example.com"}}
553
554 ctxAnon := auth.WithState(c, &authtest.FakeState{
555 Identity: "anonymous:anonymous",
556 })
557 ctxOne := auth.WithState(c, &authtest.FakeState{
558 Identity: "user:one@example.com",
559 IdentityGroups: []string{"all"},
560 })
561 ctxSome := auth.WithState(c, &authtest.FakeState{
562 Identity: "user:some@example.com",
563 IdentityGroups: []string{"some"},
564 })
565 ctxAdmin := auth.WithState(c, &authtest.FakeState{
566 Identity: "user:admin@example.com",
567 IdentityGroups: []string{"administrators"},
568 })
546 569
547 So(ds.Put(c, 570 So(ds.Put(c,
548 » » » &Job{JobID: "abc/1", ProjectID: "abc", Enabled: true}, 571 » » » &Job{JobID: "abc/1", ProjectID: "abc", Enabled: true, Ac ls: aclOne},
549 » » » &Job{JobID: "abc/2", ProjectID: "abc", Enabled: true}, 572 » » » &Job{JobID: "abc/2", ProjectID: "abc", Enabled: true, Ac ls: aclSome},
550 » » » &Job{JobID: "def/1", ProjectID: "def", Enabled: true}, 573 » » » &Job{JobID: "abc/3", ProjectID: "abc", Enabled: true, Ac ls: aclDefault},
551 » » » &Job{JobID: "def/2", ProjectID: "def", Enabled: false}, 574 » » » &Job{JobID: "def/1", ProjectID: "def", Enabled: true, Ac ls: aclDefault},
575 » » » &Job{JobID: "def/2", ProjectID: "def", Enabled: false, A cls: aclDefault},
552 ), ShouldBeNil) 576 ), ShouldBeNil)
553 577
554 job1 := ds.NewKey(c, "Job", "abc/1", 0, nil) 578 job1 := ds.NewKey(c, "Job", "abc/1", 0, nil)
555 job2 := ds.NewKey(c, "Job", "abc/2", 0, nil) 579 job2 := ds.NewKey(c, "Job", "abc/2", 0, nil)
580 job3 := ds.NewKey(c, "Job", "abc/3", 0, nil)
556 So(ds.Put(c, 581 So(ds.Put(c,
557 &Invocation{ID: 1, JobKey: job1, InvocationNonce: 123}, 582 &Invocation{ID: 1, JobKey: job1, InvocationNonce: 123},
558 &Invocation{ID: 2, JobKey: job1, InvocationNonce: 123}, 583 &Invocation{ID: 2, JobKey: job1, InvocationNonce: 123},
559 &Invocation{ID: 3, JobKey: job1}, 584 &Invocation{ID: 3, JobKey: job1},
560 &Invocation{ID: 1, JobKey: job2}, 585 &Invocation{ID: 1, JobKey: job2},
561 &Invocation{ID: 2, JobKey: job2}, 586 &Invocation{ID: 2, JobKey: job2},
562 &Invocation{ID: 3, JobKey: job2}, 587 &Invocation{ID: 3, JobKey: job2},
588 &Invocation{ID: 1, JobKey: job3},
563 ), ShouldBeNil) 589 ), ShouldBeNil)
564 590
565 ds.GetTestable(c).CatchupIndexes() 591 ds.GetTestable(c).CatchupIndexes()
566 592
567 » » Convey("GetAllJobs works", func() { 593 » » Convey("GetAllJobsRA works", func() {
568 » » » jobs, err := e.GetAllJobs(c) 594 » » » getAllJobsRA := func(ctx context.Context) []string {
569 » » » So(err, ShouldBeNil) 595 » » » » jobs, err := e.GetAllJobsRA(ctx)
570 » » » ids := stringset.New(0) 596 » » » » So(err, ShouldBeNil)
571 » » » for _, j := range jobs { 597 » » » » return sortedJobIds(jobs)
572 » » » » ids.Add(j.JobID)
573 } 598 }
574 » » » asSlice := ids.ToSlice() 599
575 » » » sort.Strings(asSlice) 600 » » » Convey("Anonymous users see only public jobs", func() {
576 » » » So(asSlice, ShouldResemble, []string{"abc/1", "abc/2", " def/1"}) // only enabled 601 » » » » // Only 3 jobs with default ACLs granting READER access to everyone, but
602 » » » » // def/2 is disabled and so shouldn't be returne d.
603 » » » » So(getAllJobsRA(ctxAnon), ShouldResemble, []stri ng{"abc/3", "def/1"})
604 » » » })
605 » » » Convey("Owners can see their own jobs + public jobs", fu nc() {
606 » » » » // abc/1 is owned by one@example.com.
607 » » » » So(getAllJobsRA(ctxOne), ShouldResemble, []strin g{"abc/1", "abc/3", "def/1"})
608 » » » })
609 » » » Convey("Explicit readers", func() {
610 » » » » So(getAllJobsRA(ctxSome), ShouldResemble, []stri ng{"abc/2", "abc/3", "def/1"})
611 » » » })
612 » » » Convey("Admins have implicit READER access to all jobs", func() {
613 » » » » So(getAllJobsRA(ctxAdmin), ShouldResemble, []str ing{"abc/1", "abc/2", "abc/3", "def/1"})
614 » » » })
577 }) 615 })
578 616
579 » » Convey("GetProjectJobs works", func() { 617 » » Convey("GetProjectJobsRA works", func() {
580 » » » jobs, err := e.GetProjectJobs(c, "def") 618 » » » getProjectJobsRA := func(ctx context.Context, project st ring) []string {
581 » » » So(err, ShouldBeNil) 619 » » » » jobs, err := e.GetProjectJobsRA(ctx, project)
582 » » » So(len(jobs), ShouldEqual, 1) 620 » » » » So(err, ShouldBeNil)
583 » » » So(jobs[0].JobID, ShouldEqual, "def/1") 621 » » » » return sortedJobIds(jobs)
622 » » » }
623 » » » Convey("Anonymous can still see public jobs", func() {
624 » » » » So(getProjectJobsRA(ctxAnon, "def"), ShouldResem ble, []string{"def/1"})
625 » » » })
626 » » » Convey("Admin have implicit READER access to all jobs", func() {
627 » » » » So(getProjectJobsRA(ctxAdmin, "abc"), ShouldRese mble, []string{"abc/1", "abc/2", "abc/3"})
628 » » » })
629 » » » Convey("Owners can still see their jobs", func() {
630 » » » » So(getProjectJobsRA(ctxOne, "abc"), ShouldResemb le, []string{"abc/1", "abc/3"})
631 » » » })
632 » » » Convey("Readers can see their jobs", func() {
633 » » » » So(getProjectJobsRA(ctxSome, "abc"), ShouldResem ble, []string{"abc/2", "abc/3"})
634 » » » })
584 }) 635 })
585 636
586 » » Convey("GetJob works", func() { 637 » » Convey("GetJobRA works", func() {
587 » » » job, err := e.GetJob(c, "missing/job") 638 » » » job, err := e.GetJobRA(ctxAdmin, "missing/job")
588 So(job, ShouldBeNil) 639 So(job, ShouldBeNil)
589 So(err, ShouldBeNil) 640 So(err, ShouldBeNil)
590 641
591 » » » job, err = e.GetJob(c, "abc/1") 642 » » » job, err = e.GetJobRA(ctxAnon, "abc/1") // no READER per mission.
643 » » » So(job, ShouldBeNil)
644 » » » So(err, ShouldBeNil)
645
646 » » » job, err = e.GetJobRA(ctxAnon, "def/1") // OK.
647 » » » So(job, ShouldNotBeNil)
648 » » » So(err, ShouldBeNil)
649
650 » » » job, err = e.GetJobRA(ctxAnon, "def/2") // OK, even thou gh not enabled.
592 So(job, ShouldNotBeNil) 651 So(job, ShouldNotBeNil)
593 So(err, ShouldBeNil) 652 So(err, ShouldBeNil)
594 }) 653 })
595 654
596 » » Convey("ListInvocations works", func() { 655 » » Convey("ListInvocations works w/o ACL enforcement", func() {
597 » » » invs, cursor, err := e.ListInvocations(c, "abc/1", 2, "" ) 656 » » » invs, cursor, err := e.ListInvocations(ctxAnon, "abc/1", 2, "")
598 So(err, ShouldBeNil) 657 So(err, ShouldBeNil)
599 So(len(invs), ShouldEqual, 2) 658 So(len(invs), ShouldEqual, 2)
600 So(invs[0].ID, ShouldEqual, 1) 659 So(invs[0].ID, ShouldEqual, 1)
601 So(invs[1].ID, ShouldEqual, 2) 660 So(invs[1].ID, ShouldEqual, 2)
602 So(cursor, ShouldNotEqual, "") 661 So(cursor, ShouldNotEqual, "")
603 662
604 » » » invs, cursor, err = e.ListInvocations(c, "abc/1", 2, cur sor) 663 » » » invs, cursor, err = e.ListInvocations(ctxAdmin, "abc/1", 2, cursor)
605 So(err, ShouldBeNil) 664 So(err, ShouldBeNil)
606 So(len(invs), ShouldEqual, 1) 665 So(len(invs), ShouldEqual, 1)
607 So(invs[0].ID, ShouldEqual, 3) 666 So(invs[0].ID, ShouldEqual, 3)
608 So(cursor, ShouldEqual, "") 667 So(cursor, ShouldEqual, "")
609 }) 668 })
610 669
611 Convey("GetInvocation works", func() { 670 Convey("GetInvocation works", func() {
612 inv, err := e.GetInvocation(c, "missing/job", 1) 671 inv, err := e.GetInvocation(c, "missing/job", 1)
613 So(inv, ShouldBeNil) 672 So(inv, ShouldBeNil)
614 So(err, ShouldBeNil) 673 So(err, ShouldBeNil)
(...skipping 163 matching lines...) Expand 10 before | Expand all | Expand 10 after
778 ctl.State().Status = task.StatusRunning 837 ctl.State().Status = task.StatusRunning
779 So(ctl.Save(ctx), ShouldBeNil) 838 So(ctl.Save(ctx), ShouldBeNil)
780 return nil 839 return nil
781 } 840 }
782 So(e.startInvocation(c, jobID, invNonce, "", 0), ShouldB eNil) 841 So(e.startInvocation(c, jobID, invNonce, "", 0), ShouldB eNil)
783 842
784 // It is alive and the job entity tracks it. 843 // It is alive and the job entity tracks it.
785 inv, err := e.GetInvocation(c, jobID, invID) 844 inv, err := e.GetInvocation(c, jobID, invID)
786 So(err, ShouldBeNil) 845 So(err, ShouldBeNil)
787 So(inv.Status, ShouldEqual, task.StatusRunning) 846 So(inv.Status, ShouldEqual, task.StatusRunning)
788 » » » job, err := e.GetJob(c, jobID) 847 » » » job, err := e.GetJobRA(c, jobID)
789 So(err, ShouldBeNil) 848 So(err, ShouldBeNil)
790 So(job.State.State, ShouldEqual, JobStateRunning) 849 So(job.State.State, ShouldEqual, JobStateRunning)
791 So(job.State.InvocationID, ShouldEqual, invID) 850 So(job.State.InvocationID, ShouldEqual, invID)
792 851
793 return invID 852 return invID
794 } 853 }
795 854
796 Convey("AbortInvocation works", func() { 855 Convey("AbortInvocation works", func() {
797 // Actually launch the queued invocation. 856 // Actually launch the queued invocation.
798 invID := launchInv() 857 invID := launchInv()
799 858
800 // Kill it. 859 // Kill it.
801 So(e.AbortInvocation(c, jobID, invID, ""), ShouldBeNil) 860 So(e.AbortInvocation(c, jobID, invID, ""), ShouldBeNil)
802 861
803 // It is dead. 862 // It is dead.
804 inv, err := e.GetInvocation(c, jobID, invID) 863 inv, err := e.GetInvocation(c, jobID, invID)
805 So(err, ShouldBeNil) 864 So(err, ShouldBeNil)
806 So(inv.Status, ShouldEqual, task.StatusAborted) 865 So(inv.Status, ShouldEqual, task.StatusAborted)
807 866
808 // The job moved on with its life. 867 // The job moved on with its life.
809 » » » job, err := e.GetJob(c, jobID) 868 » » » job, err := e.GetJobRA(c, jobID)
810 So(err, ShouldBeNil) 869 So(err, ShouldBeNil)
811 So(job.State.State, ShouldEqual, JobStateSuspended) 870 So(job.State.State, ShouldEqual, JobStateSuspended)
812 So(job.State.InvocationID, ShouldEqual, 0) 871 So(job.State.InvocationID, ShouldEqual, 0)
813 }) 872 })
814 873
815 Convey("AbortJob kills running invocation", func() { 874 Convey("AbortJob kills running invocation", func() {
816 // Actually launch the queued invocation. 875 // Actually launch the queued invocation.
817 invID := launchInv() 876 invID := launchInv()
818 877
819 // Kill it. 878 // Kill it.
820 So(e.AbortJob(c, jobID, ""), ShouldBeNil) 879 So(e.AbortJob(c, jobID, ""), ShouldBeNil)
821 880
822 // It is dead. 881 // It is dead.
823 inv, err := e.GetInvocation(c, jobID, invID) 882 inv, err := e.GetInvocation(c, jobID, invID)
824 So(err, ShouldBeNil) 883 So(err, ShouldBeNil)
825 So(inv.Status, ShouldEqual, task.StatusAborted) 884 So(inv.Status, ShouldEqual, task.StatusAborted)
826 885
827 // The job moved on with its life. 886 // The job moved on with its life.
828 » » » job, err := e.GetJob(c, jobID) 887 » » » job, err := e.GetJobRA(c, jobID)
829 So(err, ShouldBeNil) 888 So(err, ShouldBeNil)
830 So(job.State.State, ShouldEqual, JobStateSuspended) 889 So(job.State.State, ShouldEqual, JobStateSuspended)
831 So(job.State.InvocationID, ShouldEqual, 0) 890 So(job.State.InvocationID, ShouldEqual, 0)
832 }) 891 })
833 892
834 Convey("AbortJob kills queued invocation", func() { 893 Convey("AbortJob kills queued invocation", func() {
835 So(e.AbortJob(c, jobID, ""), ShouldBeNil) 894 So(e.AbortJob(c, jobID, ""), ShouldBeNil)
836 895
837 // The job moved on with its life. 896 // The job moved on with its life.
838 » » » job, err := e.GetJob(c, jobID) 897 » » » job, err := e.GetJobRA(c, jobID)
839 So(err, ShouldBeNil) 898 So(err, ShouldBeNil)
840 So(job.State.State, ShouldEqual, JobStateSuspended) 899 So(job.State.State, ShouldEqual, JobStateSuspended)
841 So(job.State.InvocationID, ShouldEqual, 0) 900 So(job.State.InvocationID, ShouldEqual, 0)
842 }) 901 })
843 }) 902 })
844 } 903 }
845 904
846 func TestAddTimer(t *testing.T) { 905 func TestAddTimer(t *testing.T) {
847 Convey("with mock job", t, func() { 906 Convey("with mock job", t, func() {
848 c := newTestContext(epoch) 907 c := newTestContext(epoch)
849 e, mgr := newTestEngine() 908 e, mgr := newTestEngine()
850 909
851 // A job in "QUEUED" state (about to run an invocation). 910 // A job in "QUEUED" state (about to run an invocation).
852 const jobID = "abc/1" 911 const jobID = "abc/1"
853 const invNonce = int64(12345) 912 const invNonce = int64(12345)
854 prepareQueuedJob(c, jobID, invNonce) 913 prepareQueuedJob(c, jobID, invNonce)
855 914
856 Convey("AddTimer works", func() { 915 Convey("AddTimer works", func() {
857 // Start an invocation that adds a timer. 916 // Start an invocation that adds a timer.
858 mgr.launchTask = func(ctx context.Context, ctl task.Cont roller) error { 917 mgr.launchTask = func(ctx context.Context, ctl task.Cont roller) error {
859 ctl.AddTimer(ctx, time.Minute, "timer-name", []b yte{1, 2, 3}) 918 ctl.AddTimer(ctx, time.Minute, "timer-name", []b yte{1, 2, 3})
860 ctl.State().Status = task.StatusRunning 919 ctl.State().Status = task.StatusRunning
861 return nil 920 return nil
862 } 921 }
863 So(e.startInvocation(c, jobID, invNonce, "", 0), ShouldB eNil) 922 So(e.startInvocation(c, jobID, invNonce, "", 0), ShouldB eNil)
864 923
865 // The job is running. 924 // The job is running.
866 » » » job, err := e.GetJob(c, jobID) 925 » » » job, err := e.GetJobRA(c, jobID)
867 So(err, ShouldBeNil) 926 So(err, ShouldBeNil)
868 So(job.State.State, ShouldEqual, JobStateRunning) 927 So(job.State.State, ShouldEqual, JobStateRunning)
869 928
870 // Added a task to the timers task queue. 929 // Added a task to the timers task queue.
871 tasks := tq.GetTestable(c).GetScheduledTasks()["timers-q "] 930 tasks := tq.GetTestable(c).GetScheduledTasks()["timers-q "]
872 So(len(tasks), ShouldEqual, 1) 931 So(len(tasks), ShouldEqual, 1)
873 var tqt *tq.Task 932 var tqt *tq.Task
874 for _, tqt = range tasks { 933 for _, tqt = range tasks {
875 } 934 }
876 So(tqt.ETA, ShouldResemble, clock.Now(c).Add(time.Minute )) 935 So(tqt.ETA, ShouldResemble, clock.Now(c).Add(time.Minute ))
(...skipping 19 matching lines...) Expand all
896 So(name, ShouldEqual, "timer-name") 955 So(name, ShouldEqual, "timer-name")
897 So(payload, ShouldResemble, []byte{1, 2, 3}) 956 So(payload, ShouldResemble, []byte{1, 2, 3})
898 ctl.AddTimer(ctx, time.Minute, "ignored-timer", nil) 957 ctl.AddTimer(ctx, time.Minute, "ignored-timer", nil)
899 ctl.State().Status = task.StatusSucceeded 958 ctl.State().Status = task.StatusSucceeded
900 return nil 959 return nil
901 } 960 }
902 clock.Get(c).(testclock.TestClock).Add(time.Minute) 961 clock.Get(c).(testclock.TestClock).Add(time.Minute)
903 So(e.ExecuteSerializedAction(c, tqt.Payload, 0), ShouldB eNil) 962 So(e.ExecuteSerializedAction(c, tqt.Payload, 0), ShouldB eNil)
904 963
905 // The job has finished (by timer handler). Moves back t o SUSPENDED state. 964 // The job has finished (by timer handler). Moves back t o SUSPENDED state.
906 » » » job, err = e.GetJob(c, jobID) 965 » » » job, err = e.GetJobRA(c, jobID)
907 So(err, ShouldBeNil) 966 So(err, ShouldBeNil)
908 So(job.State.State, ShouldEqual, JobStateSuspended) 967 So(job.State.State, ShouldEqual, JobStateSuspended)
909 968
910 // No new timers added for finished job. 969 // No new timers added for finished job.
911 tasks = tq.GetTestable(c).GetScheduledTasks()["timers-q" ] 970 tasks = tq.GetTestable(c).GetScheduledTasks()["timers-q" ]
912 So(len(tasks), ShouldEqual, 0) 971 So(len(tasks), ShouldEqual, 0)
913 }) 972 })
914 }) 973 })
915 } 974 }
916 975
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after
1039 func (m *fakeTaskManager) HandleNotification(c context.Context, ctl task.Control ler, msg *pubsub.PubsubMessage) error { 1098 func (m *fakeTaskManager) HandleNotification(c context.Context, ctl task.Control ler, msg *pubsub.PubsubMessage) error {
1040 return m.handleNotification(c, msg) 1099 return m.handleNotification(c, msg)
1041 } 1100 }
1042 1101
1043 func (m fakeTaskManager) HandleTimer(c context.Context, ctl task.Controller, nam e string, payload []byte) error { 1102 func (m fakeTaskManager) HandleTimer(c context.Context, ctl task.Controller, nam e string, payload []byte) error {
1044 return m.handleTimer(c, ctl, name, payload) 1103 return m.handleTimer(c, ctl, name, payload)
1045 } 1104 }
1046 1105
1047 //// 1106 ////
1048 1107
1108 func sortedJobIds(jobs []*Job) []string {
1109 ids := stringset.New(len(jobs))
1110 for _, j := range jobs {
1111 ids.Add(j.JobID)
1112 }
1113 asSlice := ids.ToSlice()
1114 sort.Strings(asSlice)
1115 return asSlice
1116 }
1117
1049 // prepareQueuedJob makes datastore entries for a job in QUEUED state. 1118 // prepareQueuedJob makes datastore entries for a job in QUEUED state.
1050 func prepareQueuedJob(c context.Context, jobID string, invNonce int64) { 1119 func prepareQueuedJob(c context.Context, jobID string, invNonce int64) {
1051 taskBlob, err := proto.Marshal(&messages.TaskDefWrapper{ 1120 taskBlob, err := proto.Marshal(&messages.TaskDefWrapper{
1052 Noop: &messages.NoopTask{}, 1121 Noop: &messages.NoopTask{},
1053 }) 1122 })
1054 if err != nil { 1123 if err != nil {
1055 panic(err) 1124 panic(err)
1056 } 1125 }
1057 chunks := strings.Split(jobID, "/") 1126 chunks := strings.Split(jobID, "/")
1058 err = ds.Put(c, &Job{ 1127 err = ds.Put(c, &Job{
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
1104 1173
1105 func ensureOneTask(c context.Context, q string) *tq.Task { 1174 func ensureOneTask(c context.Context, q string) *tq.Task {
1106 tqt := tq.GetTestable(c) 1175 tqt := tq.GetTestable(c)
1107 tasks := tqt.GetScheduledTasks()[q] 1176 tasks := tqt.GetScheduledTasks()[q]
1108 So(len(tasks), ShouldEqual, 1) 1177 So(len(tasks), ShouldEqual, 1)
1109 for _, t := range tasks { 1178 for _, t := range tasks {
1110 return t 1179 return t
1111 } 1180 }
1112 return nil 1181 return nil
1113 } 1182 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698