Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(305)

Side by Side Diff: scheduler/appengine/engine/engine_test.go

Issue 2986033003: [scheduler]: ACLs phase 1 - per Job ACL specification and enforcement. (Closed)
Patch Set: Review. Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « scheduler/appengine/engine/engine.go ('k') | scheduler/appengine/frontend/handler.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. 1 // Copyright 2015 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
(...skipping 18 matching lines...) Expand all
29 "github.com/luci/gae/impl/memory" 29 "github.com/luci/gae/impl/memory"
30 ds "github.com/luci/gae/service/datastore" 30 ds "github.com/luci/gae/service/datastore"
31 tq "github.com/luci/gae/service/taskqueue" 31 tq "github.com/luci/gae/service/taskqueue"
32 32
33 "github.com/luci/luci-go/common/clock" 33 "github.com/luci/luci-go/common/clock"
34 "github.com/luci/luci-go/common/clock/testclock" 34 "github.com/luci/luci-go/common/clock/testclock"
35 "github.com/luci/luci-go/common/data/rand/mathrand" 35 "github.com/luci/luci-go/common/data/rand/mathrand"
36 "github.com/luci/luci-go/common/data/stringset" 36 "github.com/luci/luci-go/common/data/stringset"
37 "github.com/luci/luci-go/common/errors" 37 "github.com/luci/luci-go/common/errors"
38 "github.com/luci/luci-go/common/retry/transient" 38 "github.com/luci/luci-go/common/retry/transient"
39 "github.com/luci/luci-go/server/auth"
40 "github.com/luci/luci-go/server/auth/authtest"
39 "github.com/luci/luci-go/server/secrets/testsecrets" 41 "github.com/luci/luci-go/server/secrets/testsecrets"
40 42
43 "github.com/luci/luci-go/scheduler/appengine/acl"
41 "github.com/luci/luci-go/scheduler/appengine/catalog" 44 "github.com/luci/luci-go/scheduler/appengine/catalog"
42 "github.com/luci/luci-go/scheduler/appengine/messages" 45 "github.com/luci/luci-go/scheduler/appengine/messages"
43 "github.com/luci/luci-go/scheduler/appengine/task" 46 "github.com/luci/luci-go/scheduler/appengine/task"
44 "github.com/luci/luci-go/scheduler/appengine/task/noop" 47 "github.com/luci/luci-go/scheduler/appengine/task/noop"
45 48
46 . "github.com/luci/luci-go/common/testing/assertions" 49 . "github.com/luci/luci-go/common/testing/assertions"
47 . "github.com/smartystreets/goconvey/convey" 50 . "github.com/smartystreets/goconvey/convey"
48 ) 51 )
49 52
50 func TestGetAllProjects(t *testing.T) { 53 func TestGetAllProjects(t *testing.T) {
(...skipping 485 matching lines...) Expand 10 before | Expand all | Expand 10 after
536 So(err, ShouldBeNil) 539 So(err, ShouldBeNil)
537 540
538 So(newer, ShouldBeLessThan, older) 541 So(newer, ShouldBeLessThan, older)
539 }) 542 })
540 } 543 }
541 544
542 func TestQueries(t *testing.T) { 545 func TestQueries(t *testing.T) {
543 Convey("with mock data", t, func() { 546 Convey("with mock data", t, func() {
544 c := newTestContext(epoch) 547 c := newTestContext(epoch)
545 e, _ := newTestEngine() 548 e, _ := newTestEngine()
549 // TODO(tandrii): remove aclDefault once all Jobs have ACLs.
550 aclDefault := acl.GrantsByRole{}
551 aclSome := acl.GrantsByRole{Readers: []string{"group:some"}}
552 aclOne := acl.GrantsByRole{Owners: []string{"one@example.com"}}
553 aclAdmin := acl.GrantsByRole{Readers: []string{"group:administra tors"}, Owners: []string{"group:administrators"}}
554
555 ctxAnon := auth.WithState(c, &authtest.FakeState{
556 Identity: "anonymous:anonymous",
557 })
558 ctxOne := auth.WithState(c, &authtest.FakeState{
559 Identity: "user:one@example.com",
560 IdentityGroups: []string{"all"},
561 })
562 ctxSome := auth.WithState(c, &authtest.FakeState{
563 Identity: "user:some@example.com",
564 IdentityGroups: []string{"some"},
565 })
566 ctxAdmin := auth.WithState(c, &authtest.FakeState{
567 Identity: "user:admin@example.com",
568 IdentityGroups: []string{"administrators"},
569 })
546 570
547 So(ds.Put(c, 571 So(ds.Put(c,
548 » » » &Job{JobID: "abc/1", ProjectID: "abc", Enabled: true}, 572 » » » &Job{JobID: "abc/1", ProjectID: "abc", Enabled: true, Ac ls: aclOne},
549 » » » &Job{JobID: "abc/2", ProjectID: "abc", Enabled: true}, 573 » » » &Job{JobID: "abc/2", ProjectID: "abc", Enabled: true, Ac ls: aclSome},
550 » » » &Job{JobID: "def/1", ProjectID: "def", Enabled: true}, 574 » » » &Job{JobID: "abc/3", ProjectID: "abc", Enabled: true, Ac ls: aclDefault},
551 » » » &Job{JobID: "def/2", ProjectID: "def", Enabled: false}, 575 » » » &Job{JobID: "def/1", ProjectID: "def", Enabled: true, Ac ls: aclDefault},
576 » » » &Job{JobID: "def/2", ProjectID: "def", Enabled: false, A cls: aclDefault},
577 » » » &Job{JobID: "secret/1", ProjectID: "secret", Enabled: tr ue, Acls: aclAdmin},
552 ), ShouldBeNil) 578 ), ShouldBeNil)
553 579
554 job1 := ds.NewKey(c, "Job", "abc/1", 0, nil) 580 job1 := ds.NewKey(c, "Job", "abc/1", 0, nil)
555 job2 := ds.NewKey(c, "Job", "abc/2", 0, nil) 581 job2 := ds.NewKey(c, "Job", "abc/2", 0, nil)
582 job3 := ds.NewKey(c, "Job", "abc/3", 0, nil)
556 So(ds.Put(c, 583 So(ds.Put(c,
557 &Invocation{ID: 1, JobKey: job1, InvocationNonce: 123}, 584 &Invocation{ID: 1, JobKey: job1, InvocationNonce: 123},
558 &Invocation{ID: 2, JobKey: job1, InvocationNonce: 123}, 585 &Invocation{ID: 2, JobKey: job1, InvocationNonce: 123},
559 &Invocation{ID: 3, JobKey: job1}, 586 &Invocation{ID: 3, JobKey: job1},
560 &Invocation{ID: 1, JobKey: job2}, 587 &Invocation{ID: 1, JobKey: job2},
561 &Invocation{ID: 2, JobKey: job2}, 588 &Invocation{ID: 2, JobKey: job2},
562 &Invocation{ID: 3, JobKey: job2}, 589 &Invocation{ID: 3, JobKey: job2},
590 &Invocation{ID: 1, JobKey: job3},
563 ), ShouldBeNil) 591 ), ShouldBeNil)
564 592
565 ds.GetTestable(c).CatchupIndexes() 593 ds.GetTestable(c).CatchupIndexes()
566 594
567 » » Convey("GetAllJobs works", func() { 595 » » Convey("GetAllProjects ignores ACLs and CurrentIdentity", func() {
568 » » » jobs, err := e.GetAllJobs(c) 596 » » » test := func(ctx context.Context) {
569 » » » So(err, ShouldBeNil) 597 » » » » r, err := e.GetAllProjects(c)
570 » » » ids := stringset.New(0) 598 » » » » So(err, ShouldBeNil)
571 » » » for _, j := range jobs { 599 » » » » So(r, ShouldResemble, []string{"abc", "def", "se cret"})
572 » » » » ids.Add(j.JobID)
573 } 600 }
574 » » » asSlice := ids.ToSlice() 601 » » » test(c)
575 » » » sort.Strings(asSlice) 602 » » » test(ctxAnon)
576 » » » So(asSlice, ShouldResemble, []string{"abc/1", "abc/2", " def/1"}) // only enabled 603 » » » test(ctxAdmin)
577 }) 604 })
578 605
579 » » Convey("GetProjectJobs works", func() { 606 » » Convey("GetVisibleJobs works", func() {
580 » » » jobs, err := e.GetProjectJobs(c, "def") 607 » » » get := func(ctx context.Context) []string {
581 » » » So(err, ShouldBeNil) 608 » » » » jobs, err := e.GetVisibleJobs(ctx)
582 » » » So(len(jobs), ShouldEqual, 1) 609 » » » » So(err, ShouldBeNil)
583 » » » So(jobs[0].JobID, ShouldEqual, "def/1") 610 » » » » return sortedJobIds(jobs)
611 » » » }
612
613 » » » Convey("Anonymous users see only public jobs", func() {
614 » » » » // Only 3 jobs with default ACLs granting READER access to everyone, but
615 » » » » // def/2 is disabled and so shouldn't be returne d.
616 » » » » So(get(ctxAnon), ShouldResemble, []string{"abc/3 ", "def/1"})
617 » » » })
618 » » » Convey("Owners can see their own jobs + public jobs", fu nc() {
619 » » » » // abc/1 is owned by one@example.com.
620 » » » » So(get(ctxOne), ShouldResemble, []string{"abc/1" , "abc/3", "def/1"})
621 » » » })
622 » » » Convey("Explicit readers", func() {
623 » » » » So(get(ctxSome), ShouldResemble, []string{"abc/2 ", "abc/3", "def/1"})
624 » » » })
625 » » » Convey("Admins have implicit READER access to all jobs", func() {
626 » » » » So(get(ctxAdmin), ShouldResemble, []string{"abc/ 1", "abc/2", "abc/3", "def/1", "secret/1"})
627 » » » })
584 }) 628 })
585 629
586 » » Convey("GetJob works", func() { 630 » » Convey("GetProjectJobsRA works", func() {
587 » » » job, err := e.GetJob(c, "missing/job") 631 » » » get := func(ctx context.Context, project string) []strin g {
588 » » » So(job, ShouldBeNil) 632 » » » » jobs, err := e.GetVisibleProjectJobs(ctx, projec t)
633 » » » » So(err, ShouldBeNil)
634 » » » » return sortedJobIds(jobs)
635 » » » }
636 » » » Convey("Anonymous can still see public jobs", func() {
637 » » » » So(get(ctxAnon, "def"), ShouldResemble, []string {"def/1"})
638 » » » })
639 » » » Convey("Admin have implicit READER access to all jobs", func() {
640 » » » » So(get(ctxAdmin, "abc"), ShouldResemble, []strin g{"abc/1", "abc/2", "abc/3"})
641 » » » })
642 » » » Convey("Owners can still see their jobs", func() {
643 » » » » So(get(ctxOne, "abc"), ShouldResemble, []string{ "abc/1", "abc/3"})
644 » » » })
645 » » » Convey("Readers can see their jobs", func() {
646 » » » » So(get(ctxSome, "abc"), ShouldResemble, []string {"abc/2", "abc/3"})
647 » » » })
648 » » })
649
650 » » Convey("GetVisibleJob works", func() {
651 » » » _, err := e.GetVisibleJob(ctxAdmin, "missing/job")
652 » » » So(err, ShouldEqual, ErrNoSuchJob)
653
654 » » » _, err = e.GetVisibleJob(ctxAnon, "abc/1") // no READER permission.
655 » » » So(err, ShouldEqual, ErrNoSuchJob)
656
657 » » » job, err := e.GetVisibleJob(ctxAnon, "def/1") // OK.
658 » » » So(job, ShouldNotBeNil)
589 So(err, ShouldBeNil) 659 So(err, ShouldBeNil)
590 660
591 » » » job, err = e.GetJob(c, "abc/1") 661 » » » job, err = e.GetVisibleJob(ctxAnon, "def/2") // OK, even though not enabled.
592 So(job, ShouldNotBeNil) 662 So(job, ShouldNotBeNil)
593 So(err, ShouldBeNil) 663 So(err, ShouldBeNil)
594 }) 664 })
595 665
596 » » Convey("ListInvocations works", func() { 666 » » Convey("ListVisibleInvocations works", func() {
597 » » » invs, cursor, err := e.ListInvocations(c, "abc/1", 2, "" ) 667 » » » Convey("Anonymous can't see non-public job invocations", func() {
598 » » » So(err, ShouldBeNil) 668 » » » » _, _, err := e.ListVisibleInvocations(ctxAnon, " abc/1", 2, "")
599 » » » So(len(invs), ShouldEqual, 2) 669 » » » » So(err, ShouldResemble, ErrNoSuchJob)
600 » » » So(invs[0].ID, ShouldEqual, 1) 670 » » » })
601 » » » So(invs[1].ID, ShouldEqual, 2)
602 » » » So(cursor, ShouldNotEqual, "")
603 671
604 » » » invs, cursor, err = e.ListInvocations(c, "abc/1", 2, cur sor) 672 » » » Convey("With paging", func() {
605 » » » So(err, ShouldBeNil) 673 » » » » invs, cursor, err := e.ListVisibleInvocations(ct xOne, "abc/1", 2, "")
606 » » » So(len(invs), ShouldEqual, 1) 674 » » » » So(err, ShouldBeNil)
607 » » » So(invs[0].ID, ShouldEqual, 3) 675 » » » » So(len(invs), ShouldEqual, 2)
608 » » » So(cursor, ShouldEqual, "") 676 » » » » So(invs[0].ID, ShouldEqual, 1)
677 » » » » So(invs[1].ID, ShouldEqual, 2)
678 » » » » So(cursor, ShouldNotEqual, "")
679
680 » » » » invs, cursor, err = e.ListVisibleInvocations(ctx One, "abc/1", 2, cursor)
681 » » » » So(err, ShouldBeNil)
682 » » » » So(len(invs), ShouldEqual, 1)
683 » » » » So(invs[0].ID, ShouldEqual, 3)
684 » » » » So(cursor, ShouldEqual, "")
685 » » » })
609 }) 686 })
610 687
611 Convey("GetInvocation works", func() { 688 Convey("GetInvocation works", func() {
612 » » » inv, err := e.GetInvocation(c, "missing/job", 1) 689 » » » Convey("Anonymous can't see non-public job invocation", func() {
613 » » » So(inv, ShouldBeNil) 690 » » » » _, err := e.GetVisibleInvocation(ctxAnon, "abc/1 ", 1)
614 » » » So(err, ShouldBeNil) 691 » » » » So(err, ShouldResemble, ErrNoSuchInvocation)
692 » » » })
615 693
616 » » » inv, err = e.GetInvocation(c, "abc/1", 1) 694 » » » Convey("NoSuchInvocation", func() {
617 » » » So(inv, ShouldNotBeNil) 695 » » » » _, err := e.GetVisibleInvocation(ctxAdmin, "miss ing/job", 1)
618 » » » So(err, ShouldBeNil) 696 » » » » So(err, ShouldResemble, ErrNoSuchInvocation)
697 » » » })
698
699 » » » Convey("Reader sees", func() {
700 » » » » inv, err := e.GetVisibleInvocation(ctxOne, "abc/ 1", 1)
701 » » » » So(inv, ShouldNotBeNil)
702 » » » » So(err, ShouldBeNil)
703 » » » })
619 }) 704 })
620 705
621 Convey("GetInvocationsByNonce works", func() { 706 Convey("GetInvocationsByNonce works", func() {
622 » » » inv, err := e.GetInvocationsByNonce(c, 11111) // unknown 707 » » » Convey("Anonymous can't see non-public job invocations", func() {
623 » » » So(len(inv), ShouldEqual, 0) 708 » » » » invs, err := e.GetVisibleInvocationsByNonce(ctxA non, 123)
624 » » » So(err, ShouldBeNil) 709 » » » » So(len(invs), ShouldEqual, 0)
710 » » » » So(err, ShouldBeNil)
711 » » » })
625 712
626 » » » inv, err = e.GetInvocationsByNonce(c, 123) 713 » » » Convey("NoSuchInvocation", func() {
627 » » » So(len(inv), ShouldEqual, 2) 714 » » » » invs, err := e.GetVisibleInvocationsByNonce(ctxA dmin, 11111) // unknown
628 » » » So(err, ShouldBeNil) 715 » » » » So(len(invs), ShouldEqual, 0)
716 » » » » So(err, ShouldBeNil)
717 » » » })
718
719 » » » Convey("Reader sees", func() {
720 » » » » invs, err := e.GetVisibleInvocationsByNonce(ctxO ne, 123)
721 » » » » So(len(invs), ShouldEqual, 2)
722 » » » » So(err, ShouldBeNil)
723 » » » })
629 }) 724 })
630 }) 725 })
631 } 726 }
632 727
633 func TestPrepareTopic(t *testing.T) { 728 func TestPrepareTopic(t *testing.T) {
634 Convey("PrepareTopic works", t, func(ctx C) { 729 Convey("PrepareTopic works", t, func(ctx C) {
635 c := newTestContext(epoch) 730 c := newTestContext(epoch)
636 731
637 e, _ := newTestEngine() 732 e, _ := newTestEngine()
638 733
(...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after
758 So(err, ShouldBeNil) 853 So(err, ShouldBeNil)
759 So(transient.Tag.In(e.ProcessPubSubPush(c, blob)), Shoul dBeFalse) 854 So(transient.Tag.In(e.ProcessPubSubPush(c, blob)), Shoul dBeFalse)
760 }) 855 })
761 }) 856 })
762 } 857 }
763 858
764 func TestAborts(t *testing.T) { 859 func TestAborts(t *testing.T) {
765 Convey("with mock invocation", t, func() { 860 Convey("with mock invocation", t, func() {
766 c := newTestContext(epoch) 861 c := newTestContext(epoch)
767 e, mgr := newTestEngine() 862 e, mgr := newTestEngine()
863 ctxAnon := auth.WithState(c, &authtest.FakeState{
864 Identity: "anonymous:anonymous",
865 })
866 ctxReader := auth.WithState(c, &authtest.FakeState{
867 Identity: "user:reader@example.com",
868 IdentityGroups: []string{"readers"},
869 })
870 ctxOwner := auth.WithState(c, &authtest.FakeState{
871 Identity: "user:owner@example.com",
872 IdentityGroups: []string{"owners"},
873 })
768 874
769 // A job in "QUEUED" state (about to run an invocation). 875 // A job in "QUEUED" state (about to run an invocation).
770 const jobID = "abc/1" 876 const jobID = "abc/1"
771 const invNonce = int64(12345) 877 const invNonce = int64(12345)
772 prepareQueuedJob(c, jobID, invNonce) 878 prepareQueuedJob(c, jobID, invNonce)
773 879
774 launchInv := func() int64 { 880 launchInv := func() int64 {
775 var invID int64 881 var invID int64
776 mgr.launchTask = func(ctx context.Context, ctl task.Cont roller) error { 882 mgr.launchTask = func(ctx context.Context, ctl task.Cont roller) error {
777 invID = ctl.InvocationID() 883 invID = ctl.InvocationID()
778 ctl.State().Status = task.StatusRunning 884 ctl.State().Status = task.StatusRunning
779 So(ctl.Save(ctx), ShouldBeNil) 885 So(ctl.Save(ctx), ShouldBeNil)
780 return nil 886 return nil
781 } 887 }
782 So(e.startInvocation(c, jobID, invNonce, "", 0), ShouldB eNil) 888 So(e.startInvocation(c, jobID, invNonce, "", 0), ShouldB eNil)
783 889
784 // It is alive and the job entity tracks it. 890 // It is alive and the job entity tracks it.
785 » » » inv, err := e.GetInvocation(c, jobID, invID) 891 » » » inv, err := e.getInvocation(c, jobID, invID)
786 So(err, ShouldBeNil) 892 So(err, ShouldBeNil)
787 So(inv.Status, ShouldEqual, task.StatusRunning) 893 So(inv.Status, ShouldEqual, task.StatusRunning)
788 » » » job, err := e.GetJob(c, jobID) 894 » » » job, err := e.getJob(c, jobID)
789 So(err, ShouldBeNil) 895 So(err, ShouldBeNil)
790 So(job.State.State, ShouldEqual, JobStateRunning) 896 So(job.State.State, ShouldEqual, JobStateRunning)
791 So(job.State.InvocationID, ShouldEqual, invID) 897 So(job.State.InvocationID, ShouldEqual, invID)
792 898
793 return invID 899 return invID
794 } 900 }
795 901
796 Convey("AbortInvocation works", func() { 902 Convey("AbortInvocation works", func() {
797 // Actually launch the queued invocation. 903 // Actually launch the queued invocation.
798 invID := launchInv() 904 invID := launchInv()
799 905
800 » » » // Kill it. 906 » » » // Try to kill it w/o permission.
801 » » » So(e.AbortInvocation(c, jobID, invID, ""), ShouldBeNil) 907 » » » So(e.AbortInvocation(c, jobID, invID), ShouldNotBeNil) / / No current identity.
908 » » » So(e.AbortInvocation(ctxAnon, jobID, invID), ShouldResem ble, ErrNoSuchJob)
909 » » » So(e.AbortInvocation(ctxReader, jobID, invID), ShouldRes emble, ErrNoOwnerPermission)
910 » » » // Now kill it.
911 » » » So(e.AbortInvocation(ctxOwner, jobID, invID), ShouldBeNi l)
802 912
803 // It is dead. 913 // It is dead.
804 » » » inv, err := e.GetInvocation(c, jobID, invID) 914 » » » inv, err := e.getInvocation(c, jobID, invID)
805 So(err, ShouldBeNil) 915 So(err, ShouldBeNil)
806 So(inv.Status, ShouldEqual, task.StatusAborted) 916 So(inv.Status, ShouldEqual, task.StatusAborted)
807 917
808 // The job moved on with its life. 918 // The job moved on with its life.
809 » » » job, err := e.GetJob(c, jobID) 919 » » » job, err := e.getJob(c, jobID)
810 So(err, ShouldBeNil) 920 So(err, ShouldBeNil)
811 So(job.State.State, ShouldEqual, JobStateSuspended) 921 So(job.State.State, ShouldEqual, JobStateSuspended)
812 So(job.State.InvocationID, ShouldEqual, 0) 922 So(job.State.InvocationID, ShouldEqual, 0)
813 }) 923 })
814 924
815 Convey("AbortJob kills running invocation", func() { 925 Convey("AbortJob kills running invocation", func() {
816 // Actually launch the queued invocation. 926 // Actually launch the queued invocation.
817 invID := launchInv() 927 invID := launchInv()
818 928
929 // Try to kill it w/o permission.
930 So(e.AbortJob(c, jobID), ShouldNotBeNil) // No current i dentity.
931 So(e.AbortJob(ctxAnon, jobID), ShouldResemble, ErrNoSuch Job)
932 So(e.AbortJob(ctxReader, jobID), ShouldResemble, ErrNoOw nerPermission)
819 // Kill it. 933 // Kill it.
820 » » » So(e.AbortJob(c, jobID, ""), ShouldBeNil) 934 » » » So(e.AbortJob(ctxOwner, jobID), ShouldBeNil)
821 935
822 // It is dead. 936 // It is dead.
823 » » » inv, err := e.GetInvocation(c, jobID, invID) 937 » » » inv, err := e.getInvocation(c, jobID, invID)
824 So(err, ShouldBeNil) 938 So(err, ShouldBeNil)
825 So(inv.Status, ShouldEqual, task.StatusAborted) 939 So(inv.Status, ShouldEqual, task.StatusAborted)
826 940
827 // The job moved on with its life. 941 // The job moved on with its life.
828 » » » job, err := e.GetJob(c, jobID) 942 » » » job, err := e.getJob(c, jobID)
829 So(err, ShouldBeNil) 943 So(err, ShouldBeNil)
830 So(job.State.State, ShouldEqual, JobStateSuspended) 944 So(job.State.State, ShouldEqual, JobStateSuspended)
831 So(job.State.InvocationID, ShouldEqual, 0) 945 So(job.State.InvocationID, ShouldEqual, 0)
832 }) 946 })
833 947
834 Convey("AbortJob kills queued invocation", func() { 948 Convey("AbortJob kills queued invocation", func() {
835 » » » So(e.AbortJob(c, jobID, ""), ShouldBeNil) 949 » » » So(e.AbortJob(ctxOwner, jobID), ShouldBeNil)
836 950
837 // The job moved on with its life. 951 // The job moved on with its life.
838 » » » job, err := e.GetJob(c, jobID) 952 » » » job, err := e.getJob(c, jobID)
839 So(err, ShouldBeNil) 953 So(err, ShouldBeNil)
840 So(job.State.State, ShouldEqual, JobStateSuspended) 954 So(job.State.State, ShouldEqual, JobStateSuspended)
841 So(job.State.InvocationID, ShouldEqual, 0) 955 So(job.State.InvocationID, ShouldEqual, 0)
842 }) 956 })
957
958 Convey("AbortJob fails on non-existing job", func() {
959 So(e.AbortJob(ctxOwner, "not/exists"), ShouldResemble, E rrNoSuchJob)
960 })
843 }) 961 })
844 } 962 }
845 963
846 func TestAddTimer(t *testing.T) { 964 func TestAddTimer(t *testing.T) {
847 Convey("with mock job", t, func() { 965 Convey("with mock job", t, func() {
848 c := newTestContext(epoch) 966 c := newTestContext(epoch)
849 e, mgr := newTestEngine() 967 e, mgr := newTestEngine()
850 968
851 // A job in "QUEUED" state (about to run an invocation). 969 // A job in "QUEUED" state (about to run an invocation).
852 const jobID = "abc/1" 970 const jobID = "abc/1"
853 const invNonce = int64(12345) 971 const invNonce = int64(12345)
854 prepareQueuedJob(c, jobID, invNonce) 972 prepareQueuedJob(c, jobID, invNonce)
855 973
856 Convey("AddTimer works", func() { 974 Convey("AddTimer works", func() {
857 // Start an invocation that adds a timer. 975 // Start an invocation that adds a timer.
858 mgr.launchTask = func(ctx context.Context, ctl task.Cont roller) error { 976 mgr.launchTask = func(ctx context.Context, ctl task.Cont roller) error {
859 ctl.AddTimer(ctx, time.Minute, "timer-name", []b yte{1, 2, 3}) 977 ctl.AddTimer(ctx, time.Minute, "timer-name", []b yte{1, 2, 3})
860 ctl.State().Status = task.StatusRunning 978 ctl.State().Status = task.StatusRunning
861 return nil 979 return nil
862 } 980 }
863 So(e.startInvocation(c, jobID, invNonce, "", 0), ShouldB eNil) 981 So(e.startInvocation(c, jobID, invNonce, "", 0), ShouldB eNil)
864 982
865 // The job is running. 983 // The job is running.
866 » » » job, err := e.GetJob(c, jobID) 984 » » » job, err := e.getJob(c, jobID)
867 So(err, ShouldBeNil) 985 So(err, ShouldBeNil)
868 So(job.State.State, ShouldEqual, JobStateRunning) 986 So(job.State.State, ShouldEqual, JobStateRunning)
869 987
870 // Added a task to the timers task queue. 988 // Added a task to the timers task queue.
871 tasks := tq.GetTestable(c).GetScheduledTasks()["timers-q "] 989 tasks := tq.GetTestable(c).GetScheduledTasks()["timers-q "]
872 So(len(tasks), ShouldEqual, 1) 990 So(len(tasks), ShouldEqual, 1)
873 var tqt *tq.Task 991 var tqt *tq.Task
874 for _, tqt = range tasks { 992 for _, tqt = range tasks {
875 } 993 }
876 So(tqt.ETA, ShouldResemble, clock.Now(c).Add(time.Minute )) 994 So(tqt.ETA, ShouldResemble, clock.Now(c).Add(time.Minute ))
(...skipping 19 matching lines...) Expand all
896 So(name, ShouldEqual, "timer-name") 1014 So(name, ShouldEqual, "timer-name")
897 So(payload, ShouldResemble, []byte{1, 2, 3}) 1015 So(payload, ShouldResemble, []byte{1, 2, 3})
898 ctl.AddTimer(ctx, time.Minute, "ignored-timer", nil) 1016 ctl.AddTimer(ctx, time.Minute, "ignored-timer", nil)
899 ctl.State().Status = task.StatusSucceeded 1017 ctl.State().Status = task.StatusSucceeded
900 return nil 1018 return nil
901 } 1019 }
902 clock.Get(c).(testclock.TestClock).Add(time.Minute) 1020 clock.Get(c).(testclock.TestClock).Add(time.Minute)
903 So(e.ExecuteSerializedAction(c, tqt.Payload, 0), ShouldB eNil) 1021 So(e.ExecuteSerializedAction(c, tqt.Payload, 0), ShouldB eNil)
904 1022
905 // The job has finished (by timer handler). Moves back t o SUSPENDED state. 1023 // The job has finished (by timer handler). Moves back t o SUSPENDED state.
906 » » » job, err = e.GetJob(c, jobID) 1024 » » » job, err = e.getJob(c, jobID)
907 So(err, ShouldBeNil) 1025 So(err, ShouldBeNil)
908 So(job.State.State, ShouldEqual, JobStateSuspended) 1026 So(job.State.State, ShouldEqual, JobStateSuspended)
909 1027
910 // No new timers added for finished job. 1028 // No new timers added for finished job.
911 tasks = tq.GetTestable(c).GetScheduledTasks()["timers-q" ] 1029 tasks = tq.GetTestable(c).GetScheduledTasks()["timers-q" ]
912 So(len(tasks), ShouldEqual, 0) 1030 So(len(tasks), ShouldEqual, 0)
913 }) 1031 })
914 }) 1032 })
915 } 1033 }
916 1034
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after
1039 func (m *fakeTaskManager) HandleNotification(c context.Context, ctl task.Control ler, msg *pubsub.PubsubMessage) error { 1157 func (m *fakeTaskManager) HandleNotification(c context.Context, ctl task.Control ler, msg *pubsub.PubsubMessage) error {
1040 return m.handleNotification(c, msg) 1158 return m.handleNotification(c, msg)
1041 } 1159 }
1042 1160
1043 func (m fakeTaskManager) HandleTimer(c context.Context, ctl task.Controller, nam e string, payload []byte) error { 1161 func (m fakeTaskManager) HandleTimer(c context.Context, ctl task.Controller, nam e string, payload []byte) error {
1044 return m.handleTimer(c, ctl, name, payload) 1162 return m.handleTimer(c, ctl, name, payload)
1045 } 1163 }
1046 1164
1047 //// 1165 ////
1048 1166
1167 func sortedJobIds(jobs []*Job) []string {
1168 ids := stringset.New(len(jobs))
1169 for _, j := range jobs {
1170 ids.Add(j.JobID)
1171 }
1172 asSlice := ids.ToSlice()
1173 sort.Strings(asSlice)
1174 return asSlice
1175 }
1176
1049 // prepareQueuedJob makes datastore entries for a job in QUEUED state. 1177 // prepareQueuedJob makes datastore entries for a job in QUEUED state.
1050 func prepareQueuedJob(c context.Context, jobID string, invNonce int64) { 1178 func prepareQueuedJob(c context.Context, jobID string, invNonce int64) {
1051 taskBlob, err := proto.Marshal(&messages.TaskDefWrapper{ 1179 taskBlob, err := proto.Marshal(&messages.TaskDefWrapper{
1052 Noop: &messages.NoopTask{}, 1180 Noop: &messages.NoopTask{},
1053 }) 1181 })
1054 if err != nil { 1182 if err != nil {
1055 panic(err) 1183 panic(err)
1056 } 1184 }
1057 chunks := strings.Split(jobID, "/") 1185 chunks := strings.Split(jobID, "/")
1058 err = ds.Put(c, &Job{ 1186 err = ds.Put(c, &Job{
1059 JobID: jobID, 1187 JobID: jobID,
1060 ProjectID: chunks[0], 1188 ProjectID: chunks[0],
1061 Enabled: true, 1189 Enabled: true,
1190 Acls: acl.GrantsByRole{Owners: []string{"group:owners"}, Re aders: []string{"group:readers"}},
1062 Task: taskBlob, 1191 Task: taskBlob,
1063 Schedule: "triggered", 1192 Schedule: "triggered",
1064 State: JobState{ 1193 State: JobState{
1065 State: JobStateQueued, 1194 State: JobStateQueued,
1066 InvocationNonce: invNonce, 1195 InvocationNonce: invNonce,
1067 }, 1196 },
1068 }) 1197 })
1069 if err != nil { 1198 if err != nil {
1070 panic(err) 1199 panic(err)
1071 } 1200 }
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
1104 1233
1105 func ensureOneTask(c context.Context, q string) *tq.Task { 1234 func ensureOneTask(c context.Context, q string) *tq.Task {
1106 tqt := tq.GetTestable(c) 1235 tqt := tq.GetTestable(c)
1107 tasks := tqt.GetScheduledTasks()[q] 1236 tasks := tqt.GetScheduledTasks()[q]
1108 So(len(tasks), ShouldEqual, 1) 1237 So(len(tasks), ShouldEqual, 1)
1109 for _, t := range tasks { 1238 for _, t := range tasks {
1110 return t 1239 return t
1111 } 1240 }
1112 return nil 1241 return nil
1113 } 1242 }
OLDNEW
« no previous file with comments | « scheduler/appengine/engine/engine.go ('k') | scheduler/appengine/frontend/handler.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698