Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(247)

Side by Side Diff: scheduler/appengine/engine/engine.go

Issue 2986033003: [scheduler]: ACLs phase 1 - per Job ACL specification and enforcement. (Closed)
Patch Set: Review. Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « scheduler/appengine/catalog/catalog_test.go ('k') | scheduler/appengine/engine/engine_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. 1 // Copyright 2015 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
(...skipping 30 matching lines...) Expand all
41 "github.com/luci/luci-go/common/data/rand/mathrand" 41 "github.com/luci/luci-go/common/data/rand/mathrand"
42 "github.com/luci/luci-go/common/data/stringset" 42 "github.com/luci/luci-go/common/data/stringset"
43 "github.com/luci/luci-go/common/errors" 43 "github.com/luci/luci-go/common/errors"
44 "github.com/luci/luci-go/common/logging" 44 "github.com/luci/luci-go/common/logging"
45 "github.com/luci/luci-go/common/retry/transient" 45 "github.com/luci/luci-go/common/retry/transient"
46 "github.com/luci/luci-go/server/auth" 46 "github.com/luci/luci-go/server/auth"
47 "github.com/luci/luci-go/server/auth/identity" 47 "github.com/luci/luci-go/server/auth/identity"
48 "github.com/luci/luci-go/server/auth/signing" 48 "github.com/luci/luci-go/server/auth/signing"
49 "github.com/luci/luci-go/server/tokens" 49 "github.com/luci/luci-go/server/tokens"
50 50
51 "github.com/luci/luci-go/scheduler/appengine/acl"
51 "github.com/luci/luci-go/scheduler/appengine/catalog" 52 "github.com/luci/luci-go/scheduler/appengine/catalog"
52 "github.com/luci/luci-go/scheduler/appengine/schedule" 53 "github.com/luci/luci-go/scheduler/appengine/schedule"
53 "github.com/luci/luci-go/scheduler/appengine/task" 54 "github.com/luci/luci-go/scheduler/appengine/task"
54 ) 55 )
55 56
56 var ( 57 var (
57 » ErrNoSuchJob = errors.New("no such job") 58 » ErrNoOwnerPermission = errors.New("no OWNER permission on a job")
58 » ErrNoSuchInvocation = errors.New("the invocation doesn't exist") 59 » ErrNoSuchJob = errors.New("no such job")
60 » ErrNoSuchInvocation = errors.New("the invocation doesn't exist")
59 ) 61 )
60 62
61 // Engine manages all scheduler jobs: keeps track of their state, runs state 63 // Engine manages all scheduler jobs: keeps track of their state, runs state
62 // machine transactions, starts new invocations, etc. A method returns 64 // machine transactions, starts new invocations, etc. A method returns
63 // errors.Transient if the error is non-fatal and the call should be retried 65 // errors.Transient if the error is non-fatal and the call should be retried
64 // later. Any other error means that retry won't help. 66 // later. Any other error means that retry won't help.
67 // ACLs are enforced unlike EngineInternal with the following implications:
68 // * if caller lacks READER access to Jobs, methods behave as if Jobs do not
69 // exist.
70 // * if caller lacks OWNER access, calling mutating methods will result in
71 // ErrNoOwnerPermission (assuming caller has READER access, else see above).
65 type Engine interface { 72 type Engine interface {
73 // GetVisibleJobs returns a list of all enabled scheduler jobs in no
74 // particular order.
75 GetVisibleJobs(c context.Context) ([]*Job, error)
76
77 // GetVisibleProjectJobs returns a list of enabled scheduler jobs of som e
78 // project in no particular order.
79 GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, erro r)
80
81 // GetVisibleJob returns single scheduler job given its full ID or nil i f no such
82 // job or if not visible.
83 GetVisibleJob(c context.Context, jobID string) (*Job, error)
84
85 // ListVisibleInvocations returns invocations of a visible job, most rec ent first.
86 // Returns fetched invocations and cursor string if there's more.
87 ListVisibleInvocations(c context.Context, jobID string, pageSize int, cu rsor string) ([]*Invocation, string, error)
88
89 // GetVisibleInvocation returns single invocation of some job given its ID.
90 GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Inv ocation, error)
91
92 // GetVisibleInvocationsByNonce returns a list of Invocations with given nonce.
93 //
94 // Invocation nonce is a random number that identifies an intent to star t
95 // an invocation. Normally one nonce corresponds to one Invocation entit y,
96 // but there can be more if job fails to start with a transient error.
97 GetVisibleInvocationsByNonce(c context.Context, invNonce int64) ([]*Invo cation, error)
98
99 // PauseJob replaces job's schedule with "triggered", effectively preven ting
100 // it from running automatically (until it is resumed). Manual invocatio ns are
101 // still allowed. Does nothing if job is already paused. Any pending or
102 // running invocations are still executed.
103 PauseJob(c context.Context, jobID string) error
104
105 // ResumeJob resumes paused job. Does nothing if the job is not paused.
106 ResumeJob(c context.Context, jobID string) error
107
108 // AbortJob resets the job to scheduled state, aborting a currently pend ing or
109 // running invocation (if any).
110 //
111 // Returns nil if the job is not currently running.
112 AbortJob(c context.Context, jobID string) error
113
114 // AbortInvocation forcefully moves the invocation to failed state.
115 //
116 // It opportunistically tries to send "abort" signal to a job runner if it
117 // supports cancellation, but it doesn't wait for reply. It proceeds to
118 // modifying local state in the scheduler service datastore immediately.
119 //
120 // AbortInvocation can be used to manually "unstuck" jobs that got stuck due
121 // to missing PubSub notifications or other kinds of unexpected conditio ns.
122 //
123 // Does nothing if invocation is already in some final state.
124 AbortInvocation(c context.Context, jobID string, invID int64) error
125
126 // TriggerInvocation launches job invocation right now if job isn't runn ing
127 // now. Used by "Run now" UI button.
128 //
129 // Returns new invocation nonce (a random number that identifies an inte nt to
130 // start an invocation). Normally one nonce corresponds to one Invocatio n
131 // entity, but there can be more if job fails to start with a transient error.
132 TriggerInvocation(c context.Context, jobID string) (int64, error)
133 }
134
135 // EngineInternal is to be used by frontend initialization code only.
136 type EngineInternal interface {
66 // GetAllProjects returns a list of all projects that have at least one 137 // GetAllProjects returns a list of all projects that have at least one
67 // enabled scheduler job. 138 // enabled scheduler job.
68 GetAllProjects(c context.Context) ([]string, error) 139 GetAllProjects(c context.Context) ([]string, error)
69 140
70 // GetAllJobs returns a list of all enabled scheduler jobs in no particu lar
71 // order.
72 GetAllJobs(c context.Context) ([]*Job, error)
73
74 // GetProjectJobs returns a list of enabled scheduler jobs of some proje ct
75 // in no particular order.
76 GetProjectJobs(c context.Context, projectID string) ([]*Job, error)
77
78 // GetJob returns single scheduler job given its full ID or nil if no su ch
79 // job.
80 GetJob(c context.Context, jobID string) (*Job, error)
81
82 // ListInvocations returns invocations of a job, most recent first.
83 // Returns fetched invocations and cursor string if there's more.
84 ListInvocations(c context.Context, jobID string, pageSize int, cursor st ring) ([]*Invocation, string, error)
85
86 // GetInvocation returns single invocation of some job given its ID.
87 GetInvocation(c context.Context, jobID string, invID int64) (*Invocation , error)
88
89 // GetInvocationsByNonce returns a list of Invocations with given nonce.
90 //
91 // Invocation nonce is a random number that identifies an intent to star t
92 // an invocation. Normally one nonce corresponds to one Invocation entit y,
93 // but there can be more if job fails to start with a transient error.
94 GetInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error)
95
96 // UpdateProjectJobs adds new, removes old and updates existing jobs. 141 // UpdateProjectJobs adds new, removes old and updates existing jobs.
97 UpdateProjectJobs(c context.Context, projectID string, defs []catalog.De finition) error 142 UpdateProjectJobs(c context.Context, projectID string, defs []catalog.De finition) error
98 143
99 // ResetAllJobsOnDevServer forcefully resets state of all enabled jobs. 144 // ResetAllJobsOnDevServer forcefully resets state of all enabled jobs.
100 // Supposed to be used only on devserver, where task queue stub state is not 145 // Supposed to be used only on devserver, where task queue stub state is not
101 // preserved between appserver restarts and it messes everything. 146 // preserved between appserver restarts and it messes everything.
102 ResetAllJobsOnDevServer(c context.Context) error 147 ResetAllJobsOnDevServer(c context.Context) error
103 148
104 // ExecuteSerializedAction is called via a task queue to execute an acti on 149 // ExecuteSerializedAction is called via a task queue to execute an acti on
105 // produced by job state machine transition. These actions are POSTed 150 // produced by job state machine transition. These actions are POSTed
106 // to TimersQueue and InvocationsQueue defined in Config by Engine. 151 // to TimersQueue and InvocationsQueue defined in Config by Engine.
107 // 'retryCount' is 0 on first attempt, 1 if task queue service retries 152 // 'retryCount' is 0 on first attempt, 1 if task queue service retries
108 // request once, 2 - if twice, and so on. Returning transient errors her e 153 // request once, 2 - if twice, and so on. Returning transient errors her e
109 // causes the task queue to retry the task. 154 // causes the task queue to retry the task.
110 ExecuteSerializedAction(c context.Context, body []byte, retryCount int) error 155 ExecuteSerializedAction(c context.Context, body []byte, retryCount int) error
111 156
112 // ProcessPubSubPush is called whenever incoming PubSub message is recei ved. 157 // ProcessPubSubPush is called whenever incoming PubSub message is recei ved.
113 ProcessPubSubPush(c context.Context, body []byte) error 158 ProcessPubSubPush(c context.Context, body []byte) error
114 159
115 // PullPubSubOnDevServer is called on dev server to pull messages from P ubSub 160 // PullPubSubOnDevServer is called on dev server to pull messages from P ubSub
116 // subscription associated with given publisher. 161 // subscription associated with given publisher.
117 // 162 //
118 // It is needed to be able to manually tests PubSub related workflows on dev 163 // It is needed to be able to manually tests PubSub related workflows on dev
119 // server, since dev server can't accept PubSub push messages. 164 // server, since dev server can't accept PubSub push messages.
120 PullPubSubOnDevServer(c context.Context, taskManagerName, publisher stri ng) error 165 PullPubSubOnDevServer(c context.Context, taskManagerName, publisher stri ng) error
121 166
122 » // TriggerInvocation launches job invocation right now if job isn't runn ing 167 » // PublicAPI returns ACL-enforced API.
123 » // now. Used by "Run now" UI button. 168 » PublicAPI() Engine
124 » //
125 » // Returns new invocation nonce (a random number that identifies an inte nt to
126 » // start an invocation). Normally one nonce corresponds to one Invocatio n
127 » // entity, but there can be more if job fails to start with a transient error.
128 » TriggerInvocation(c context.Context, jobID string, triggeredBy identity. Identity) (int64, error)
129
130 » // PauseJob replaces job's schedule with "triggered", effectively preven ting
131 » // it from running automatically (until it is resumed). Manual invocatio ns are
132 » // still allowed. Does nothing if job is already paused. Any pending or
133 » // running invocations are still executed.
134 » PauseJob(c context.Context, jobID string, who identity.Identity) error
135
136 » // ResumeJob resumes paused job. Doesn't nothing if the job is not pause d.
137 » ResumeJob(c context.Context, jobID string, who identity.Identity) error
138
139 » // AbortInvocation forcefully moves the invocation to failed state.
140 » //
141 » // It opportunistically tries to send "abort" signal to a job runner if it
142 » // supports cancellation, but it doesn't wait for reply. It proceeds to
143 » // modifying local state in the scheduler service datastore immediately.
144 » //
145 » // AbortInvocation can be used to manually "unstuck" jobs that got stuck due
146 » // to missing PubSub notifications or other kinds of unexpected conditio ns.
147 » //
148 » // Does nothing if invocation is already in some final state.
149 » AbortInvocation(c context.Context, jobID string, invID int64, who identi ty.Identity) error
150
151 » // AbortJob resets the job to scheduled state, aborting a currently pend ing or
152 » // running invocation (if any).
153 » //
154 » // Returns nil if the job is not currently running.
155 » AbortJob(c context.Context, jobID string, who identity.Identity) error
156 } 169 }
157 170
158 // Config contains parameters for the engine. 171 // Config contains parameters for the engine.
159 type Config struct { 172 type Config struct {
160 Catalog catalog.Catalog // provides task.Manager's to run t asks 173 Catalog catalog.Catalog // provides task.Manager's to run t asks
161 TimersQueuePath string // URL of a task queue handler for timer ticks 174 TimersQueuePath string // URL of a task queue handler for timer ticks
162 TimersQueueName string // queue name for timer ticks 175 TimersQueueName string // queue name for timer ticks
163 InvocationsQueuePath string // URL of a task queue handler that starts jobs 176 InvocationsQueuePath string // URL of a task queue handler that starts jobs
164 InvocationsQueueName string // queue name for job starts 177 InvocationsQueueName string // queue name for job starts
165 PubSubPushPath string // URL to use in PubSub push config 178 PubSubPushPath string // URL to use in PubSub push config
166 } 179 }
167 180
168 // NewEngine returns default implementation of Engine. 181 // NewEngine returns default implementation of EngineInternal.
169 func NewEngine(conf Config) Engine { 182 func NewEngine(conf Config) EngineInternal {
170 return &engineImpl{ 183 return &engineImpl{
171 Config: conf, 184 Config: conf,
172 doneFlags: make(map[string]bool), 185 doneFlags: make(map[string]bool),
173 } 186 }
174 } 187 }
175 188
176 //// Implementation. 189 //// Implementation.
177 190
178 const ( 191 const (
179 // invocationRetryLimit is how many times to retry an invocation before giving 192 // invocationRetryLimit is how many times to retry an invocation before giving
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
262 // an appropriate revision. 275 // an appropriate revision.
263 RevisionURL string `gae:",noindex"` 276 RevisionURL string `gae:",noindex"`
264 277
265 // Schedule is the job's schedule in regular cron expression format. 278 // Schedule is the job's schedule in regular cron expression format.
266 Schedule string `gae:",noindex"` 279 Schedule string `gae:",noindex"`
267 280
268 // Task is the job's payload in serialized form. Opaque from the point o f view 281 // Task is the job's payload in serialized form. Opaque from the point o f view
269 // of the engine. See Catalog.UnmarshalTask(). 282 // of the engine. See Catalog.UnmarshalTask().
270 Task []byte `gae:",noindex"` 283 Task []byte `gae:",noindex"`
271 284
285 // ACLs are the latest ACLs applied to Job and all its invocations.
286 Acls acl.GrantsByRole `gae:",noindex"`
287
272 // State is the job's state machine state, see StateMachine. 288 // State is the job's state machine state, see StateMachine.
273 State JobState 289 State JobState
274 } 290 }
275 291
276 // GetJobName returns name of this Job as defined its project's config. 292 // GetJobName returns name of this Job as defined its project's config.
277 func (e *Job) GetJobName() string { 293 func (e *Job) GetJobName() string {
278 // JobID has form <project>/<id>. Split it into components. 294 // JobID has form <project>/<id>. Split it into components.
279 chunks := strings.Split(e.JobID, "/") 295 chunks := strings.Split(e.JobID, "/")
280 return chunks[1] 296 return chunks[1]
281 } 297 }
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
321 e.State == other.State) 337 e.State == other.State)
322 } 338 }
323 339
324 // matches returns true if job definition in the entity matches the one 340 // matches returns true if job definition in the entity matches the one
325 // specified by catalog.Definition struct. UpdateProjectJobs skips updates for 341 // specified by catalog.Definition struct. UpdateProjectJobs skips updates for
326 // such jobs (assuming they are up-to-date). 342 // such jobs (assuming they are up-to-date).
327 func (e *Job) matches(def catalog.Definition) bool { 343 func (e *Job) matches(def catalog.Definition) bool {
328 return e.JobID == def.JobID && 344 return e.JobID == def.JobID &&
329 e.Flavor == def.Flavor && 345 e.Flavor == def.Flavor &&
330 e.Schedule == def.Schedule && 346 e.Schedule == def.Schedule &&
347 e.Acls.Equal(&def.Acls) &&
331 bytes.Equal(e.Task, def.Task) 348 bytes.Equal(e.Task, def.Task)
332 } 349 }
333 350
334 // Invocation entity stores single attempt to run a job. Its parent entity 351 // Invocation entity stores single attempt to run a job. Its parent entity
335 // is corresponding Job, its ID is generated based on time. 352 // is corresponding Job, its ID is generated based on time.
336 type Invocation struct { 353 type Invocation struct {
337 _kind string `gae:"$kind,Invocation"` 354 _kind string `gae:"$kind,Invocation"`
338 _extra ds.PropertyMap `gae:"-,extra"` 355 _extra ds.PropertyMap `gae:"-,extra"`
339 356
340 // ID is identifier of this particular attempt to run a job. Multiple at tempts 357 // ID is identifier of this particular attempt to run a job. Multiple at tempts
(...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after
531 548
532 return 0, errors.New("could not find available invocationID after 10 att empts") 549 return 0, errors.New("could not find available invocationID after 10 att empts")
533 } 550 }
534 551
535 // debugLog mutates a string by appending a line to it. 552 // debugLog mutates a string by appending a line to it.
536 func debugLog(c context.Context, str *string, format string, args ...interface{} ) { 553 func debugLog(c context.Context, str *string, format string, args ...interface{} ) {
537 prefix := clock.Now(c).UTC().Format("[15:04:05.000] ") 554 prefix := clock.Now(c).UTC().Format("[15:04:05.000] ")
538 *str += prefix + fmt.Sprintf(format+"\n", args...) 555 *str += prefix + fmt.Sprintf(format+"\n", args...)
539 } 556 }
540 557
541 //// 558 ////////////////////////////////////////////////////////////////////////////////
559 // engineImpl.
542 560
543 type engineImpl struct { 561 type engineImpl struct {
544 Config 562 Config
545 563
546 lock sync.Mutex 564 lock sync.Mutex
547 doneFlags map[string]bool // see doIfNotDone 565 doneFlags map[string]bool // see doIfNotDone
548 566
549 // configureTopic is used by prepareTopic, mocked in tests. 567 // configureTopic is used by prepareTopic, mocked in tests.
550 configureTopic func(c context.Context, topic, sub, pushURL, publisher st ring) error 568 configureTopic func(c context.Context, topic, sub, pushURL, publisher st ring) error
551 } 569 }
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
612 // Filter out duplicates, sort. 630 // Filter out duplicates, sort.
613 projects := stringset.New(len(entities)) 631 projects := stringset.New(len(entities))
614 for _, ent := range entities { 632 for _, ent := range entities {
615 projects.Add(ent.ProjectID) 633 projects.Add(ent.ProjectID)
616 } 634 }
617 out := projects.ToSlice() 635 out := projects.ToSlice()
618 sort.Strings(out) 636 sort.Strings(out)
619 return out, nil 637 return out, nil
620 } 638 }
621 639
622 func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) { 640 func (e *engineImpl) GetVisibleJobs(c context.Context) ([]*Job, error) {
623 q := ds.NewQuery("Job").Eq("Enabled", true) 641 q := ds.NewQuery("Job").Eq("Enabled", true)
624 » return e.queryEnabledJobs(c, q) 642 » return e.queryEnabledVisibleJobs(c, q)
625 } 643 }
626 644
627 func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job , error) { 645 func (e *engineImpl) GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, error) {
628 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) 646 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID)
629 » return e.queryEnabledJobs(c, q) 647 » return e.queryEnabledVisibleJobs(c, q)
630 } 648 }
631 649
632 func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e rror) { 650 func (e *engineImpl) queryEnabledVisibleJobs(c context.Context, q *ds.Query) ([] *Job, error) {
633 entities := []*Job{} 651 entities := []*Job{}
634 if err := ds.GetAll(c, q, &entities); err != nil { 652 if err := ds.GetAll(c, q, &entities); err != nil {
635 return nil, transient.Tag.Apply(err) 653 return nil, transient.Tag.Apply(err)
636 } 654 }
637 // Non-ancestor query used, need to recheck filters. 655 // Non-ancestor query used, need to recheck filters.
638 filtered := make([]*Job, 0, len(entities)) 656 filtered := make([]*Job, 0, len(entities))
639 for _, job := range entities { 657 for _, job := range entities {
640 » » if job.Enabled { 658 » » if !job.Enabled {
659 » » » continue
660 » » }
661 » » // TODO(tandrii): improve batch ACLs check here to take advantag e of likely
662 » » // shared ACLs between most jobs of the same project.
663 » » if ok, err := job.Acls.IsReader(c); err != nil {
664 » » » return nil, transient.Tag.Apply(err)
665 » » } else if ok {
641 filtered = append(filtered, job) 666 filtered = append(filtered, job)
642 } 667 }
643 } 668 }
644 return filtered, nil 669 return filtered, nil
645 } 670 }
646 671
647 func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { 672 func (e *engineImpl) GetVisibleJob(c context.Context, jobID string) (*Job, error ) {
673 » job, err := e.getJob(c, jobID)
674 » if err != nil {
675 » » return nil, err
676 » } else if job == nil {
677 » » return nil, ErrNoSuchJob
678 » }
679 » if ok, err := job.Acls.IsReader(c); err != nil {
680 » » return nil, err
681 » } else if !ok {
682 » » return nil, ErrNoSuchJob
683 » }
684 » return job, nil
685 }
686
687 func (e *engineImpl) getOwnedJob(c context.Context, jobID string) (*Job, error) {
688 » job, err := e.getJob(c, jobID)
689 » if err != nil {
690 » » return nil, err
691 » } else if job == nil {
692 » » return nil, ErrNoSuchJob
693 » }
694
695 » switch owner, err := job.Acls.IsOwner(c); {
696 » case err != nil:
697 » » return nil, err
698 » case owner:
699 » » return job, nil
700 » }
701
702 » // Not owner, but maybe reader? Give nicer error in such case.
703 » switch reader, err := job.Acls.IsReader(c); {
704 » case err != nil:
705 » » return nil, err
706 » case reader:
707 » » return nil, ErrNoOwnerPermission
708 » default:
709 » » return nil, ErrNoSuchJob
710 » }
711 }
712
713 func (e *engineImpl) getJob(c context.Context, jobID string) (*Job, error) {
648 job := &Job{JobID: jobID} 714 job := &Job{JobID: jobID}
649 switch err := ds.Get(c, job); { 715 switch err := ds.Get(c, job); {
650 case err == nil: 716 case err == nil:
651 return job, nil 717 return job, nil
652 case err == ds.ErrNoSuchEntity: 718 case err == ds.ErrNoSuchEntity:
653 return nil, nil 719 return nil, nil
654 default: 720 default:
655 return nil, transient.Tag.Apply(err) 721 return nil, transient.Tag.Apply(err)
656 } 722 }
657 } 723 }
658 724
659 func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i nt, cursor string) ([]*Invocation, string, error) { 725 func (e *engineImpl) PublicAPI() Engine {
726 » return e
727 }
728
729 func (e *engineImpl) ListVisibleInvocations(c context.Context, jobID string, pag eSize int, cursor string) ([]*Invocation, string, error) {
730 » if job, err := e.GetVisibleJob(c, jobID); err != nil {
731 » » return nil, "", err
732 » } else if job == nil {
733 » » // Either no Job or no access, both imply returning no invocatio ns.
734 » » return nil, "", ErrNoSuchInvocation
735 » }
736
660 if pageSize == 0 || pageSize > 500 { 737 if pageSize == 0 || pageSize > 500 {
661 pageSize = 500 738 pageSize = 500
662 } 739 }
663 740
664 // Deserialize the cursor. 741 // Deserialize the cursor.
665 var cursorObj ds.Cursor 742 var cursorObj ds.Cursor
666 if cursor != "" { 743 if cursor != "" {
667 var err error 744 var err error
668 cursorObj, err = ds.DecodeCursor(c, cursor) 745 cursorObj, err = ds.DecodeCursor(c, cursor)
669 if err != nil { 746 if err != nil {
(...skipping 11 matching lines...) Expand all
681 } 758 }
682 759
683 // Fetch pageSize worth of invocations, then grab the cursor. 760 // Fetch pageSize worth of invocations, then grab the cursor.
684 out := make([]*Invocation, 0, pageSize) 761 out := make([]*Invocation, 0, pageSize)
685 var newCursor string 762 var newCursor string
686 err := ds.Run(c, q, func(obj *Invocation, getCursor ds.CursorCB) error { 763 err := ds.Run(c, q, func(obj *Invocation, getCursor ds.CursorCB) error {
687 out = append(out, obj) 764 out = append(out, obj)
688 if len(out) < pageSize { 765 if len(out) < pageSize {
689 return nil 766 return nil
690 } 767 }
691 » » c, err := getCursor() 768 » » cur, err := getCursor()
692 if err != nil { 769 if err != nil {
693 return err 770 return err
694 } 771 }
695 » » newCursor = c.String() 772 » » newCursor = cur.String()
696 return ds.Stop 773 return ds.Stop
697 }) 774 })
698 if err != nil { 775 if err != nil {
699 return nil, "", transient.Tag.Apply(err) 776 return nil, "", transient.Tag.Apply(err)
700 } 777 }
701 return out, newCursor, nil 778 return out, newCursor, nil
702 } 779 }
703 780
704 func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) { 781 func (e *engineImpl) GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) {
782 » switch _, err := e.GetVisibleJob(c, jobID); {
783 » case err == ErrNoSuchJob:
784 » » return nil, ErrNoSuchInvocation
785 » case err != nil:
786 » » return nil, err
787 » default:
788 » » return e.getInvocation(c, jobID, invID)
789 » }
790 }
791
792 func (e *engineImpl) getInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) {
705 inv := &Invocation{ 793 inv := &Invocation{
706 ID: invID, 794 ID: invID,
707 JobKey: ds.NewKey(c, "Job", jobID, 0, nil), 795 JobKey: ds.NewKey(c, "Job", jobID, 0, nil),
708 } 796 }
709 switch err := ds.Get(c, inv); { 797 switch err := ds.Get(c, inv); {
710 case err == nil: 798 case err == nil:
711 return inv, nil 799 return inv, nil
712 case err == ds.ErrNoSuchEntity: 800 case err == ds.ErrNoSuchEntity:
713 return nil, nil 801 return nil, nil
714 default: 802 default:
715 return nil, transient.Tag.Apply(err) 803 return nil, transient.Tag.Apply(err)
716 } 804 }
717 } 805 }
718 806
719 func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([ ]*Invocation, error) { 807 func (e *engineImpl) GetVisibleInvocationsByNonce(c context.Context, invNonce in t64) ([]*Invocation, error) {
720 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) 808 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce)
721 entities := []*Invocation{} 809 entities := []*Invocation{}
722 if err := ds.GetAll(c, q, &entities); err != nil { 810 if err := ds.GetAll(c, q, &entities); err != nil {
723 return nil, transient.Tag.Apply(err) 811 return nil, transient.Tag.Apply(err)
724 } 812 }
813 if len(entities) > 0 {
814 // All Invocations with the same nonce must belong to the same J ob.
815 switch _, err := e.GetVisibleJob(c, entities[0].JobKey.StringID( )); {
816 case err == ErrNoSuchJob:
817 return []*Invocation{}, nil
818 case err != nil:
819 return nil, err
820 }
821 }
725 return entities, nil 822 return entities, nil
726 } 823 }
727 824
728 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error { 825 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error {
729 // JobID -> *Job map. 826 // JobID -> *Job map.
730 existing, err := e.getProjectJobs(c, projectID) 827 existing, err := e.getProjectJobs(c, projectID)
731 if err != nil { 828 if err != nil {
732 return err 829 return err
733 } 830 }
734 // JobID -> new definition revision map. 831 // JobID -> new definition revision map.
(...skipping 309 matching lines...) Expand 10 before | Expand all | Expand 10 after
1044 1141
1045 func (e *engineImpl) executeInvAction(c context.Context, payload *actionTaskPayl oad, retryCount int) error { 1142 func (e *engineImpl) executeInvAction(c context.Context, payload *actionTaskPayl oad, retryCount int) error {
1046 switch { 1143 switch {
1047 case payload.InvTimer != nil: 1144 case payload.InvTimer != nil:
1048 return e.invocationTimerTick(c, payload.JobID, payload.InvID, pa yload.InvTimer) 1145 return e.invocationTimerTick(c, payload.JobID, payload.InvID, pa yload.InvTimer)
1049 default: 1146 default:
1050 return fmt.Errorf("unexpected invocation action kind %q", payloa d) 1147 return fmt.Errorf("unexpected invocation action kind %q", payloa d)
1051 } 1148 }
1052 } 1149 }
1053 1150
1054 func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere dBy identity.Identity) (int64, error) { 1151 func (e *engineImpl) TriggerInvocation(c context.Context, jobID string) (int64, error) {
1152 » // First, we check ACLs.
1153 » if _, err := e.getOwnedJob(c, jobID); err != nil {
1154 » » return 0, err
1155 » }
1156
1055 var err error 1157 var err error
1056 var invNonce int64 1158 var invNonce int64
1057 err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) er ror { 1159 err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) er ror {
1058 if isNew { 1160 if isNew {
1059 err = ErrNoSuchJob 1161 err = ErrNoSuchJob
1060 return errSkipPut 1162 return errSkipPut
1061 } 1163 }
1062 if !job.Enabled { 1164 if !job.Enabled {
1063 err = errors.New("the job is disabled") 1165 err = errors.New("the job is disabled")
1064 return errSkipPut 1166 return errSkipPut
1065 } 1167 }
1066 invNonce = 0 1168 invNonce = 0
1067 return e.rollSM(c, job, func(sm *StateMachine) error { 1169 return e.rollSM(c, job, func(sm *StateMachine) error {
1068 » » » if err := sm.OnManualInvocation(triggeredBy); err != nil { 1170 » » » if err := sm.OnManualInvocation(auth.CurrentIdentity(c)) ; err != nil {
1069 return err 1171 return err
1070 } 1172 }
1071 invNonce = sm.State.InvocationNonce 1173 invNonce = sm.State.InvocationNonce
1072 return nil 1174 return nil
1073 }) 1175 })
1074 }) 1176 })
1075 if err == nil { 1177 if err == nil {
1076 err = err2 1178 err = err2
1077 } 1179 }
1078 return invNonce, err 1180 return invNonce, err
1079 } 1181 }
1080 1182
1081 func (e *engineImpl) PauseJob(c context.Context, jobID string, who identity.Iden tity) error { 1183 func (e *engineImpl) PauseJob(c context.Context, jobID string) error {
1082 » return e.setPausedFlag(c, jobID, true, who) 1184 » return e.setPausedFlag(c, jobID, true, auth.CurrentIdentity(c))
1083 } 1185 }
1084 1186
1085 func (e *engineImpl) ResumeJob(c context.Context, jobID string, who identity.Ide ntity) error { 1187 func (e *engineImpl) ResumeJob(c context.Context, jobID string) error {
1086 » return e.setPausedFlag(c, jobID, false, who) 1188 » return e.setPausedFlag(c, jobID, false, auth.CurrentIdentity(c))
1087 } 1189 }
1088 1190
1089 func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool, who identity.Identity) error { 1191 func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool, who identity.Identity) error {
1192 // First, we check ACLs outside of transaction. Yes, this allows for rac es
1193 // between (un)pausing and ACLs changes but these races have no impact.
1194 if _, err := e.getOwnedJob(c, jobID); err != nil {
1195 return err
1196 }
1090 return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or { 1197 return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or {
1091 if isNew || !job.Enabled { 1198 if isNew || !job.Enabled {
1092 return ErrNoSuchJob 1199 return ErrNoSuchJob
1093 } 1200 }
1094 if job.Paused == paused { 1201 if job.Paused == paused {
1095 return errSkipPut 1202 return errSkipPut
1096 } 1203 }
1097 if paused { 1204 if paused {
1098 logging.Warningf(c, "Job is paused by %s", who) 1205 logging.Warningf(c, "Job is paused by %s", who)
1099 } else { 1206 } else {
1100 logging.Warningf(c, "Job is resumed by %s", who) 1207 logging.Warningf(c, "Job is resumed by %s", who)
1101 } 1208 }
1102 job.Paused = paused 1209 job.Paused = paused
1103 return e.rollSM(c, job, func(sm *StateMachine) error { 1210 return e.rollSM(c, job, func(sm *StateMachine) error {
1104 sm.OnScheduleChange() 1211 sm.OnScheduleChange()
1105 return nil 1212 return nil
1106 }) 1213 })
1107 }) 1214 })
1108 } 1215 }
1109 1216
1110 func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6 4, who identity.Identity) error { 1217 func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6 4) error {
1218 » if _, err := e.getOwnedJob(c, jobID); err != nil {
1219 » » return err
1220 » }
1221 » return e.abortInvocation(c, jobID, invID)
1222 }
1223
1224 func (e *engineImpl) abortInvocation(c context.Context, jobID string, invID int6 4) error {
1111 c = logging.SetField(c, "JobID", jobID) 1225 c = logging.SetField(c, "JobID", jobID)
1112 c = logging.SetField(c, "InvID", invID) 1226 c = logging.SetField(c, "InvID", invID)
1113 1227
1114 var inv *Invocation 1228 var inv *Invocation
1115 var err error 1229 var err error
1116 » switch inv, err = e.GetInvocation(c, jobID, invID); { 1230 » switch inv, err = e.getInvocation(c, jobID, invID); {
1117 case err != nil: 1231 case err != nil:
1118 logging.Errorf(c, "Failed to fetch the invocation - %s", err) 1232 logging.Errorf(c, "Failed to fetch the invocation - %s", err)
1119 return err 1233 return err
1120 case inv == nil: 1234 case inv == nil:
1121 logging.Errorf(c, "The invocation doesn't exist") 1235 logging.Errorf(c, "The invocation doesn't exist")
1122 return ErrNoSuchInvocation 1236 return ErrNoSuchInvocation
1123 case inv.Status.Final(): 1237 case inv.Status.Final():
1124 return nil 1238 return nil
1125 } 1239 }
1126 1240
1127 ctl, err := e.controllerForInvocation(c, inv) 1241 ctl, err := e.controllerForInvocation(c, inv)
1128 if err != nil { 1242 if err != nil {
1129 logging.Errorf(c, "Cannot get controller - %s", err) 1243 logging.Errorf(c, "Cannot get controller - %s", err)
1130 return err 1244 return err
1131 } 1245 }
1132 1246
1133 » ctl.DebugLog("Invocation is manually aborted by %q", who) 1247 » ctl.DebugLog("Invocation is manually aborted by %q", auth.CurrentUser(c) )
1134 if err = ctl.manager.AbortTask(c, ctl); err != nil { 1248 if err = ctl.manager.AbortTask(c, ctl); err != nil {
1135 logging.Errorf(c, "Failed to abort the task - %s", err) 1249 logging.Errorf(c, "Failed to abort the task - %s", err)
1136 return err 1250 return err
1137 } 1251 }
1138 1252
1139 ctl.State().Status = task.StatusAborted 1253 ctl.State().Status = task.StatusAborted
1140 if err = ctl.Save(c); err != nil { 1254 if err = ctl.Save(c); err != nil {
1141 logging.Errorf(c, "Failed to save the invocation - %s", err) 1255 logging.Errorf(c, "Failed to save the invocation - %s", err)
1142 return err 1256 return err
1143 } 1257 }
1144 return nil 1258 return nil
1145 } 1259 }
1146 1260
1147 // AbortJob resets the job to scheduled state, aborting a currently pending or 1261 // AbortJob resets the job to scheduled state, aborting a currently pending or
1148 // running invocation (if any). 1262 // running invocation (if any).
1149 // 1263 //
1150 // Returns nil if the job is not currently running. 1264 // Returns nil if the job is not currently running.
1151 func (e *engineImpl) AbortJob(c context.Context, jobID string, who identity.Iden tity) error { 1265 func (e *engineImpl) AbortJob(c context.Context, jobID string) error {
1152 » // First we switch the job to the default state and disassociate the run ning 1266 » // First, we check ACLs.
1267 » if _, err := e.getOwnedJob(c, jobID); err != nil {
1268 » » return err
1269 » }
1270 » // Second, we switch the job to the default state and disassociate the r unning
1153 // invocation (if any) from the job entity. 1271 // invocation (if any) from the job entity.
1154 var invID int64 1272 var invID int64
1155 err := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or { 1273 err := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or {
1156 if isNew { 1274 if isNew {
1157 return errSkipPut // the job was removed, nothing to abo rt 1275 return errSkipPut // the job was removed, nothing to abo rt
1158 } 1276 }
1159 invID = job.State.InvocationID 1277 invID = job.State.InvocationID
1160 return e.rollSM(c, job, func(sm *StateMachine) error { 1278 return e.rollSM(c, job, func(sm *StateMachine) error {
1161 sm.OnManualAbort() 1279 sm.OnManualAbort()
1162 return nil 1280 return nil
1163 }) 1281 })
1164 }) 1282 })
1165 if err != nil { 1283 if err != nil {
1166 return err 1284 return err
1167 } 1285 }
1168 1286
1169 // Now we kill the invocation. We do it separately because it may involv e 1287 // Now we kill the invocation. We do it separately because it may involv e
1170 // an RPC to remote service (e.g. to cancel a task) that can't be done f rom 1288 // an RPC to remote service (e.g. to cancel a task) that can't be done f rom
1171 // the transaction. 1289 // the transaction.
1172 if invID != 0 { 1290 if invID != 0 {
1173 » » return e.AbortInvocation(c, jobID, invID, who) 1291 » » return e.abortInvocation(c, jobID, invID)
1174 } 1292 }
1175 return nil 1293 return nil
1176 } 1294 }
1177 1295
1178 // updateJob updates an existing job if its definition has changed, adds 1296 // updateJob updates an existing job if its definition has changed, adds
1179 // a completely new job or enables a previously disabled job. 1297 // a completely new job or enables a previously disabled job.
1180 func (e *engineImpl) updateJob(c context.Context, def catalog.Definition) error { 1298 func (e *engineImpl) updateJob(c context.Context, def catalog.Definition) error {
1181 return e.txn(c, def.JobID, func(c context.Context, job *Job, isNew bool) error { 1299 return e.txn(c, def.JobID, func(c context.Context, job *Job, isNew bool) error {
1182 if !isNew && job.Enabled && job.matches(def) { 1300 if !isNew && job.Enabled && job.matches(def) {
1183 return errSkipPut 1301 return errSkipPut
(...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after
1310 } 1428 }
1311 1429
1312 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks. 1430 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks.
1313 // 1431 //
1314 // See also handlePubSubMessage, it is quite similar. 1432 // See also handlePubSubMessage, it is quite similar.
1315 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID int64, timer *invocationTimer) error { 1433 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID int64, timer *invocationTimer) error {
1316 c = logging.SetField(c, "JobID", jobID) 1434 c = logging.SetField(c, "JobID", jobID)
1317 c = logging.SetField(c, "InvID", invID) 1435 c = logging.SetField(c, "InvID", invID)
1318 1436
1319 logging.Infof(c, "Handling invocation timer %q", timer.Name) 1437 logging.Infof(c, "Handling invocation timer %q", timer.Name)
1320 » inv, err := e.GetInvocation(c, jobID, invID) 1438 » inv, err := e.getInvocation(c, jobID, invID)
1321 if err != nil { 1439 if err != nil {
1322 logging.Errorf(c, "Failed to fetch the invocation - %s", err) 1440 logging.Errorf(c, "Failed to fetch the invocation - %s", err)
1323 return err 1441 return err
1324 } 1442 }
1325 if inv == nil { 1443 if inv == nil {
1326 return ErrNoSuchInvocation 1444 return ErrNoSuchInvocation
1327 } 1445 }
1328 1446
1329 // Finished invocations are immutable, skip the message. 1447 // Finished invocations are immutable, skip the message.
1330 if inv.Status.Final() { 1448 if inv.Status.Final() {
(...skipping 364 matching lines...) Expand 10 before | Expand all | Expand 10 after
1695 return err 1813 return err
1696 } 1814 }
1697 jobID = data["job"] 1815 jobID = data["job"]
1698 if invID, err = strconv.ParseInt(data["inv"], 10, 64); err != nil { 1816 if invID, err = strconv.ParseInt(data["inv"], 10, 64); err != nil {
1699 logging.Errorf(c, "Could not parse 'inv' %q - %s", data["inv"], err) 1817 logging.Errorf(c, "Could not parse 'inv' %q - %s", data["inv"], err)
1700 return err 1818 return err
1701 } 1819 }
1702 1820
1703 c = logging.SetField(c, "JobID", jobID) 1821 c = logging.SetField(c, "JobID", jobID)
1704 c = logging.SetField(c, "InvID", invID) 1822 c = logging.SetField(c, "InvID", invID)
1705 » inv, err := e.GetInvocation(c, jobID, invID) 1823 » inv, err := e.getInvocation(c, jobID, invID)
1706 if err != nil { 1824 if err != nil {
1707 logging.Errorf(c, "Failed to fetch the invocation - %s", err) 1825 logging.Errorf(c, "Failed to fetch the invocation - %s", err)
1708 return err 1826 return err
1709 } 1827 }
1710 if inv == nil { 1828 if inv == nil {
1711 return ErrNoSuchInvocation 1829 return ErrNoSuchInvocation
1712 } 1830 }
1713 1831
1714 // Finished invocations are immutable, skip the message. 1832 // Finished invocations are immutable, skip the message.
1715 if inv.Status.Final() { 1833 if inv.Status.Final() {
(...skipping 236 matching lines...) Expand 10 before | Expand all | Expand 10 after
1952 } 2070 }
1953 if hasFinished { 2071 if hasFinished {
1954 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err or { 2072 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err or {
1955 sm.OnInvocationDone(saving.ID) 2073 sm.OnInvocationDone(saving.ID)
1956 return nil 2074 return nil
1957 }) 2075 })
1958 } 2076 }
1959 return nil 2077 return nil
1960 }) 2078 })
1961 } 2079 }
OLDNEW
« no previous file with comments | « scheduler/appengine/catalog/catalog_test.go ('k') | scheduler/appengine/engine/engine_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698