Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(797)

Side by Side Diff: scheduler/appengine/engine/engine.go

Issue 2986033003: [scheduler]: ACLs phase 1 - per Job ACL specification and enforcement. (Closed)
Patch Set: rework Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « scheduler/appengine/catalog/catalog_test.go ('k') | scheduler/appengine/engine/engine_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. 1 // Copyright 2015 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
(...skipping 30 matching lines...) Expand all
41 "github.com/luci/luci-go/common/data/rand/mathrand" 41 "github.com/luci/luci-go/common/data/rand/mathrand"
42 "github.com/luci/luci-go/common/data/stringset" 42 "github.com/luci/luci-go/common/data/stringset"
43 "github.com/luci/luci-go/common/errors" 43 "github.com/luci/luci-go/common/errors"
44 "github.com/luci/luci-go/common/logging" 44 "github.com/luci/luci-go/common/logging"
45 "github.com/luci/luci-go/common/retry/transient" 45 "github.com/luci/luci-go/common/retry/transient"
46 "github.com/luci/luci-go/server/auth" 46 "github.com/luci/luci-go/server/auth"
47 "github.com/luci/luci-go/server/auth/identity" 47 "github.com/luci/luci-go/server/auth/identity"
48 "github.com/luci/luci-go/server/auth/signing" 48 "github.com/luci/luci-go/server/auth/signing"
49 "github.com/luci/luci-go/server/tokens" 49 "github.com/luci/luci-go/server/tokens"
50 50
51 "github.com/luci/luci-go/scheduler/appengine/acl"
51 "github.com/luci/luci-go/scheduler/appengine/catalog" 52 "github.com/luci/luci-go/scheduler/appengine/catalog"
52 "github.com/luci/luci-go/scheduler/appengine/schedule" 53 "github.com/luci/luci-go/scheduler/appengine/schedule"
53 "github.com/luci/luci-go/scheduler/appengine/task" 54 "github.com/luci/luci-go/scheduler/appengine/task"
54 ) 55 )
55 56
56 var ( 57 var (
57 » ErrNoSuchJob = errors.New("no such job") 58 » ErrNoOwnerPermission = errors.New("no OWNER permission on a job")
58 » ErrNoSuchInvocation = errors.New("the invocation doesn't exist") 59 » ErrNoSuchJob = errors.New("no such job")
60 » ErrNoSuchInvocation = errors.New("the invocation doesn't exist")
59 ) 61 )
60 62
61 // Engine manages all scheduler jobs: keeps track of their state, runs state 63 // Engine manages all scheduler jobs: keeps track of their state, runs state
62 // machine transactions, starts new invocations, etc. A method returns 64 // machine transactions, starts new invocations, etc. A method returns
63 // errors.Transient if the error is non-fatal and the call should be retried 65 // errors.Transient if the error is non-fatal and the call should be retried
64 // later. Any other error means that retry won't help. 66 // later. Any other error means that retry won't help.
67 // ACLs are enforced unlike EngineInternal with the following implications:
68 // * if caller lacks have READER access to Jobs, methods behave as if Jobs
Vadim Sh. 2017/08/04 23:07:26 "lacks have" ?
tandrii(chromium) 2017/08/07 12:48:17 Done.
69 // do not exist.
70 // * if caller lacks have OWNER access, calling mutating methods will result in
Vadim Sh. 2017/08/04 23:07:27 same here
tandrii(chromium) 2017/08/07 12:48:17 Done.
71 // ErrNoOwnerPermission (assuming caller has READER access, else see above).
65 type Engine interface { 72 type Engine interface {
73 // GetVisibleProjects returns a list of all projects that have at least one
74 // enabled scheduler job to which caller has READER access.
75 // GetVisibleProjects(c context.Context) ([]string, error)
Vadim Sh. 2017/08/04 23:07:27 remove I assume it's not needed now?
tandrii(chromium) 2017/08/07 12:48:16 Yep, Done.
76
77 // GetVisibleJobs returns a list of all enabled scheduler jobs in no
78 // particular order.
79 GetVisibleJobs(c context.Context) ([]*Job, error)
80
81 // GetVisibleProjectJobs returns a list of enabled scheduler jobs of som e
82 // project in no particular order.
83 GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, erro r)
84
85 // GetVisibleJob returns single scheduler job given its full ID or nil i f no such
86 // job or if not visible.
87 GetVisibleJob(c context.Context, jobID string) (*Job, error)
88
89 // ListVisibleInvocations returns invocations of a visible job, most rec ent first.
90 // Returns fetched invocations and cursor string if there's more.
91 ListVisibleInvocations(c context.Context, jobID string, pageSize int, cu rsor string) ([]*Invocation, string, error)
92
93 // GetVisibleInvocation returns single invocation of some job given its ID.
94 GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Inv ocation, error)
95
96 // GetVisibleInvocationsByNonce returns a list of Invocations with given nonce.
97 //
98 // Invocation nonce is a random number that identifies an intent to star t
99 // an invocation. Normally one nonce corresponds to one Invocation entit y,
100 // but there can be more if job fails to start with a transient error.
101 GetVisibleInvocationsByNonce(c context.Context, invNonce int64) ([]*Invo cation, error)
102
103 // PauseJob replaces job's schedule with "triggered", effectively preven ting
104 // it from running automatically (until it is resumed). Manual invocatio ns are
105 // still allowed. Does nothing if job is already paused. Any pending or
106 // running invocations are still executed.
107 PauseJob(c context.Context, jobID string) error
108
109 // ResumeJob resumes paused job. Does nothing if the job is not paused.
110 ResumeJob(c context.Context, jobID string) error
111
112 // AbortJob resets the job to scheduled state, aborting a currently pend ing or
113 // running invocation (if any).
114 //
115 // Returns nil if the job is not currently running.
116 AbortJob(c context.Context, jobID string) error
117
118 // AbortInvocation forcefully moves the invocation to failed state.
119 //
120 // It opportunistically tries to send "abort" signal to a job runner if it
121 // supports cancellation, but it doesn't wait for reply. It proceeds to
122 // modifying local state in the scheduler service datastore immediately.
123 //
124 // AbortInvocation can be used to manually "unstuck" jobs that got stuck due
125 // to missing PubSub notifications or other kinds of unexpected conditio ns.
126 //
127 // Does nothing if invocation is already in some final state.
128 AbortInvocation(c context.Context, jobID string, invID int64) error
129
130 // TriggerInvocation launches job invocation right now if job isn't runn ing
131 // now. Used by "Run now" UI button.
132 //
133 // Returns new invocation nonce (a random number that identifies an inte nt to
134 // start an invocation). Normally one nonce corresponds to one Invocatio n
135 // entity, but there can be more if job fails to start with a transient error.
136 TriggerInvocation(c context.Context, jobID string) (int64, error)
137 }
138
139 // EngineInternal is to be used by frontend initialization code only.
140 type EngineInternal interface {
66 // GetAllProjects returns a list of all projects that have at least one 141 // GetAllProjects returns a list of all projects that have at least one
67 // enabled scheduler job. 142 // enabled scheduler job.
68 GetAllProjects(c context.Context) ([]string, error) 143 GetAllProjects(c context.Context) ([]string, error)
69 144
70 // GetAllJobs returns a list of all enabled scheduler jobs in no particu lar
71 // order.
72 GetAllJobs(c context.Context) ([]*Job, error)
73
74 // GetProjectJobs returns a list of enabled scheduler jobs of some proje ct
75 // in no particular order.
76 GetProjectJobs(c context.Context, projectID string) ([]*Job, error)
77
78 // GetJob returns single scheduler job given its full ID or nil if no su ch
79 // job.
80 GetJob(c context.Context, jobID string) (*Job, error)
81
82 // ListInvocations returns invocations of a job, most recent first.
83 // Returns fetched invocations and cursor string if there's more.
84 ListInvocations(c context.Context, jobID string, pageSize int, cursor st ring) ([]*Invocation, string, error)
85
86 // GetInvocation returns single invocation of some job given its ID.
87 GetInvocation(c context.Context, jobID string, invID int64) (*Invocation , error)
88
89 // GetInvocationsByNonce returns a list of Invocations with given nonce.
90 //
91 // Invocation nonce is a random number that identifies an intent to star t
92 // an invocation. Normally one nonce corresponds to one Invocation entit y,
93 // but there can be more if job fails to start with a transient error.
94 GetInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error)
95
96 // UpdateProjectJobs adds new, removes old and updates existing jobs. 145 // UpdateProjectJobs adds new, removes old and updates existing jobs.
97 UpdateProjectJobs(c context.Context, projectID string, defs []catalog.De finition) error 146 UpdateProjectJobs(c context.Context, projectID string, defs []catalog.De finition) error
98 147
99 // ResetAllJobsOnDevServer forcefully resets state of all enabled jobs. 148 // ResetAllJobsOnDevServer forcefully resets state of all enabled jobs.
100 // Supposed to be used only on devserver, where task queue stub state is not 149 // Supposed to be used only on devserver, where task queue stub state is not
101 // preserved between appserver restarts and it messes everything. 150 // preserved between appserver restarts and it messes everything.
102 ResetAllJobsOnDevServer(c context.Context) error 151 ResetAllJobsOnDevServer(c context.Context) error
103 152
104 // ExecuteSerializedAction is called via a task queue to execute an acti on 153 // ExecuteSerializedAction is called via a task queue to execute an acti on
105 // produced by job state machine transition. These actions are POSTed 154 // produced by job state machine transition. These actions are POSTed
106 // to TimersQueue and InvocationsQueue defined in Config by Engine. 155 // to TimersQueue and InvocationsQueue defined in Config by Engine.
107 // 'retryCount' is 0 on first attempt, 1 if task queue service retries 156 // 'retryCount' is 0 on first attempt, 1 if task queue service retries
108 // request once, 2 - if twice, and so on. Returning transient errors her e 157 // request once, 2 - if twice, and so on. Returning transient errors her e
109 // causes the task queue to retry the task. 158 // causes the task queue to retry the task.
110 ExecuteSerializedAction(c context.Context, body []byte, retryCount int) error 159 ExecuteSerializedAction(c context.Context, body []byte, retryCount int) error
111 160
112 // ProcessPubSubPush is called whenever incoming PubSub message is recei ved. 161 // ProcessPubSubPush is called whenever incoming PubSub message is recei ved.
113 ProcessPubSubPush(c context.Context, body []byte) error 162 ProcessPubSubPush(c context.Context, body []byte) error
114 163
115 // PullPubSubOnDevServer is called on dev server to pull messages from P ubSub 164 // PullPubSubOnDevServer is called on dev server to pull messages from P ubSub
116 // subscription associated with given publisher. 165 // subscription associated with given publisher.
117 // 166 //
118 // It is needed to be able to manually tests PubSub related workflows on dev 167 // It is needed to be able to manually tests PubSub related workflows on dev
119 // server, since dev server can't accept PubSub push messages. 168 // server, since dev server can't accept PubSub push messages.
120 PullPubSubOnDevServer(c context.Context, taskManagerName, publisher stri ng) error 169 PullPubSubOnDevServer(c context.Context, taskManagerName, publisher stri ng) error
121 170
122 » // TriggerInvocation launches job invocation right now if job isn't runn ing 171 » PublicAPI() Engine
Vadim Sh. 2017/08/04 23:07:27 document
tandrii(chromium) 2017/08/07 12:48:17 Done.
123 » // now. Used by "Run now" UI button.
124 » //
125 » // Returns new invocation nonce (a random number that identifies an inte nt to
126 » // start an invocation). Normally one nonce corresponds to one Invocatio n
127 » // entity, but there can be more if job fails to start with a transient error.
128 » TriggerInvocation(c context.Context, jobID string, triggeredBy identity. Identity) (int64, error)
129
130 » // PauseJob replaces job's schedule with "triggered", effectively preven ting
131 » // it from running automatically (until it is resumed). Manual invocatio ns are
132 » // still allowed. Does nothing if job is already paused. Any pending or
133 » // running invocations are still executed.
134 » PauseJob(c context.Context, jobID string, who identity.Identity) error
135
136 » // ResumeJob resumes paused job. Doesn't nothing if the job is not pause d.
137 » ResumeJob(c context.Context, jobID string, who identity.Identity) error
138
139 » // AbortInvocation forcefully moves the invocation to failed state.
140 » //
141 » // It opportunistically tries to send "abort" signal to a job runner if it
142 » // supports cancellation, but it doesn't wait for reply. It proceeds to
143 » // modifying local state in the scheduler service datastore immediately.
144 » //
145 » // AbortInvocation can be used to manually "unstuck" jobs that got stuck due
146 » // to missing PubSub notifications or other kinds of unexpected conditio ns.
147 » //
148 » // Does nothing if invocation is already in some final state.
149 » AbortInvocation(c context.Context, jobID string, invID int64, who identi ty.Identity) error
150
151 » // AbortJob resets the job to scheduled state, aborting a currently pend ing or
152 » // running invocation (if any).
153 » //
154 » // Returns nil if the job is not currently running.
155 » AbortJob(c context.Context, jobID string, who identity.Identity) error
156 } 172 }
157 173
158 // Config contains parameters for the engine. 174 // Config contains parameters for the engine.
159 type Config struct { 175 type Config struct {
160 Catalog catalog.Catalog // provides task.Manager's to run t asks 176 Catalog catalog.Catalog // provides task.Manager's to run t asks
161 TimersQueuePath string // URL of a task queue handler for timer ticks 177 TimersQueuePath string // URL of a task queue handler for timer ticks
162 TimersQueueName string // queue name for timer ticks 178 TimersQueueName string // queue name for timer ticks
163 InvocationsQueuePath string // URL of a task queue handler that starts jobs 179 InvocationsQueuePath string // URL of a task queue handler that starts jobs
164 InvocationsQueueName string // queue name for job starts 180 InvocationsQueueName string // queue name for job starts
165 PubSubPushPath string // URL to use in PubSub push config 181 PubSubPushPath string // URL to use in PubSub push config
166 } 182 }
167 183
168 // NewEngine returns default implementation of Engine. 184 // NewEngine returns default implementation of EngineInternal.
169 func NewEngine(conf Config) Engine { 185 func NewEngine(conf Config) EngineInternal {
170 return &engineImpl{ 186 return &engineImpl{
171 Config: conf, 187 Config: conf,
172 doneFlags: make(map[string]bool), 188 doneFlags: make(map[string]bool),
173 } 189 }
174 } 190 }
175 191
176 //// Implementation. 192 //// Implementation.
177 193
178 const ( 194 const (
179 // invocationRetryLimit is how many times to retry an invocation before giving 195 // invocationRetryLimit is how many times to retry an invocation before giving
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
262 // an appropriate revision. 278 // an appropriate revision.
263 RevisionURL string `gae:",noindex"` 279 RevisionURL string `gae:",noindex"`
264 280
265 // Schedule is the job's schedule in regular cron expression format. 281 // Schedule is the job's schedule in regular cron expression format.
266 Schedule string `gae:",noindex"` 282 Schedule string `gae:",noindex"`
267 283
268 // Task is the job's payload in serialized form. Opaque from the point o f view 284 // Task is the job's payload in serialized form. Opaque from the point o f view
269 // of the engine. See Catalog.UnmarshalTask(). 285 // of the engine. See Catalog.UnmarshalTask().
270 Task []byte `gae:",noindex"` 286 Task []byte `gae:",noindex"`
271 287
288 // ACLs are the latest ACLs applied to Job and all its invocations.
289 Acls acl.GrantsByRole `gae:",noindex"`
290
272 // State is the job's state machine state, see StateMachine. 291 // State is the job's state machine state, see StateMachine.
273 State JobState 292 State JobState
274 } 293 }
275 294
276 // GetJobName returns name of this Job as defined its project's config. 295 // GetJobName returns name of this Job as defined its project's config.
277 func (e *Job) GetJobName() string { 296 func (e *Job) GetJobName() string {
278 // JobID has form <project>/<id>. Split it into components. 297 // JobID has form <project>/<id>. Split it into components.
279 chunks := strings.Split(e.JobID, "/") 298 chunks := strings.Split(e.JobID, "/")
280 return chunks[1] 299 return chunks[1]
281 } 300 }
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
321 e.State == other.State) 340 e.State == other.State)
322 } 341 }
323 342
324 // matches returns true if job definition in the entity matches the one 343 // matches returns true if job definition in the entity matches the one
325 // specified by catalog.Definition struct. UpdateProjectJobs skips updates for 344 // specified by catalog.Definition struct. UpdateProjectJobs skips updates for
326 // such jobs (assuming they are up-to-date). 345 // such jobs (assuming they are up-to-date).
327 func (e *Job) matches(def catalog.Definition) bool { 346 func (e *Job) matches(def catalog.Definition) bool {
328 return e.JobID == def.JobID && 347 return e.JobID == def.JobID &&
329 e.Flavor == def.Flavor && 348 e.Flavor == def.Flavor &&
330 e.Schedule == def.Schedule && 349 e.Schedule == def.Schedule &&
350 e.Acls.Equal(&def.Acls) &&
331 bytes.Equal(e.Task, def.Task) 351 bytes.Equal(e.Task, def.Task)
332 } 352 }
333 353
334 // Invocation entity stores single attempt to run a job. Its parent entity 354 // Invocation entity stores single attempt to run a job. Its parent entity
335 // is corresponding Job, its ID is generated based on time. 355 // is corresponding Job, its ID is generated based on time.
336 type Invocation struct { 356 type Invocation struct {
337 _kind string `gae:"$kind,Invocation"` 357 _kind string `gae:"$kind,Invocation"`
338 _extra ds.PropertyMap `gae:"-,extra"` 358 _extra ds.PropertyMap `gae:"-,extra"`
339 359
340 // ID is identifier of this particular attempt to run a job. Multiple at tempts 360 // ID is identifier of this particular attempt to run a job. Multiple at tempts
(...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after
531 551
532 return 0, errors.New("could not find available invocationID after 10 att empts") 552 return 0, errors.New("could not find available invocationID after 10 att empts")
533 } 553 }
534 554
535 // debugLog mutates a string by appending a line to it. 555 // debugLog mutates a string by appending a line to it.
536 func debugLog(c context.Context, str *string, format string, args ...interface{} ) { 556 func debugLog(c context.Context, str *string, format string, args ...interface{} ) {
537 prefix := clock.Now(c).UTC().Format("[15:04:05.000] ") 557 prefix := clock.Now(c).UTC().Format("[15:04:05.000] ")
538 *str += prefix + fmt.Sprintf(format+"\n", args...) 558 *str += prefix + fmt.Sprintf(format+"\n", args...)
539 } 559 }
540 560
541 //// 561 ////////////////////////////////////////////////////////////////////////////////
562 // engineImpl.
542 563
543 type engineImpl struct { 564 type engineImpl struct {
544 Config 565 Config
545 566
546 lock sync.Mutex 567 lock sync.Mutex
547 doneFlags map[string]bool // see doIfNotDone 568 doneFlags map[string]bool // see doIfNotDone
548 569
549 // configureTopic is used by prepareTopic, mocked in tests. 570 // configureTopic is used by prepareTopic, mocked in tests.
550 configureTopic func(c context.Context, topic, sub, pushURL, publisher st ring) error 571 configureTopic func(c context.Context, topic, sub, pushURL, publisher st ring) error
551 } 572 }
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
612 // Filter out duplicates, sort. 633 // Filter out duplicates, sort.
613 projects := stringset.New(len(entities)) 634 projects := stringset.New(len(entities))
614 for _, ent := range entities { 635 for _, ent := range entities {
615 projects.Add(ent.ProjectID) 636 projects.Add(ent.ProjectID)
616 } 637 }
617 out := projects.ToSlice() 638 out := projects.ToSlice()
618 sort.Strings(out) 639 sort.Strings(out)
619 return out, nil 640 return out, nil
620 } 641 }
621 642
622 func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) { 643 func (e *engineImpl) GetVisibleJobs(c context.Context) ([]*Job, error) {
623 q := ds.NewQuery("Job").Eq("Enabled", true) 644 q := ds.NewQuery("Job").Eq("Enabled", true)
624 » return e.queryEnabledJobs(c, q) 645 » return e.queryEnabledVisibleJobs(c, q)
625 } 646 }
626 647
627 func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job , error) { 648 func (e *engineImpl) GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, error) {
628 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) 649 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID)
629 » return e.queryEnabledJobs(c, q) 650 » return e.queryEnabledVisibleJobs(c, q)
630 } 651 }
631 652
632 func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e rror) { 653 func (e *engineImpl) queryEnabledVisibleJobs(c context.Context, q *ds.Query) ([] *Job, error) {
633 entities := []*Job{} 654 entities := []*Job{}
634 if err := ds.GetAll(c, q, &entities); err != nil { 655 if err := ds.GetAll(c, q, &entities); err != nil {
635 return nil, transient.Tag.Apply(err) 656 return nil, transient.Tag.Apply(err)
636 } 657 }
637 // Non-ancestor query used, need to recheck filters. 658 // Non-ancestor query used, need to recheck filters.
638 filtered := make([]*Job, 0, len(entities)) 659 filtered := make([]*Job, 0, len(entities))
639 for _, job := range entities { 660 for _, job := range entities {
640 » » if job.Enabled { 661 » » if !job.Enabled {
662 » » » continue
663 » » }
664 » » // TODO(tandrii): improve batch ACLs check here to take advantag e of likely
665 » » // shared ACLs between most jobs of the same project.
666 » » if ok, err := job.Acls.IsReader(c); err != nil {
667 » » » return nil, transient.Tag.Apply(err)
668 » » } else if ok {
641 filtered = append(filtered, job) 669 filtered = append(filtered, job)
642 } 670 }
643 } 671 }
644 return filtered, nil 672 return filtered, nil
645 } 673 }
646 674
647 func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { 675 func (e *engineImpl) GetVisibleJob(c context.Context, jobID string) (*Job, error ) {
676 » job, err := e.getJob(c, jobID)
677 » if err != nil {
678 » » return nil, err
679 » } else if job == nil {
680 » » return nil, ErrNoSuchJob
681 » }
682 » if ok, err := job.Acls.IsReader(c); err != nil {
683 » » return nil, err
684 » } else if !ok {
685 » » return nil, ErrNoSuchJob
686 » }
687 » return job, err
Vadim Sh. 2017/08/04 23:07:26 nit: return job, nil
tandrii(chromium) 2017/08/07 12:48:17 Done.
688 }
689
690 func (e *engineImpl) getOwnedJob(c context.Context, jobID string) (*Job, error) {
691 » job, err := e.getJob(c, jobID)
692 » if err != nil {
693 » » return nil, err
694 » } else if job == nil {
695 » » return nil, ErrNoSuchJob
696 » }
697
698 » switch owner, err := job.Acls.IsOwner(c); {
699 » case err != nil:
700 » » return nil, err
701 » case owner:
702 » » return job, nil
703 » }
704
705 » // Not owner, but maybe reader? Give nicer error in such case.
706 » switch reader, err := job.Acls.IsReader(c); {
707 » case err != nil:
708 » » return nil, err
709 » case reader:
710 » » return nil, ErrNoOwnerPermission
711 » default:
712 » » return nil, ErrNoSuchJob
713 » }
714 }
715
716 func (e *engineImpl) getJob(c context.Context, jobID string) (*Job, error) {
648 job := &Job{JobID: jobID} 717 job := &Job{JobID: jobID}
649 switch err := ds.Get(c, job); { 718 switch err := ds.Get(c, job); {
650 case err == nil: 719 case err == nil:
651 return job, nil 720 return job, nil
652 case err == ds.ErrNoSuchEntity: 721 case err == ds.ErrNoSuchEntity:
653 return nil, nil 722 return nil, nil
654 default: 723 default:
655 return nil, transient.Tag.Apply(err) 724 return nil, transient.Tag.Apply(err)
656 } 725 }
657 } 726 }
658 727
659 func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i nt, cursor string) ([]*Invocation, string, error) { 728 func (e *engineImpl) PublicAPI() Engine {
729 » return e
730 }
731
732 func (e *engineImpl) ListVisibleInvocations(c context.Context, jobID string, pag eSize int, cursor string) ([]*Invocation, string, error) {
733 » if job, err := e.GetVisibleJob(c, jobID); err != nil {
Vadim Sh. 2017/08/04 23:07:27 strictly speaking, it would be better if we put AC
tandrii(chromium) 2017/08/07 12:48:17 First, since API and UI QPS is indeed negligible,
Vadim Sh. 2017/08/07 22:53:21 Sure, it was just a general thought. I don't think
734 » » return nil, "", err
735 » } else if job == nil {
736 » » // Either no Job or no access, both imply returning no invocatio ns.
737 » » // return []*Invocation{}, "", nil
738 » » return nil, "", ErrNoSuchInvocation
739 » }
740
660 if pageSize == 0 || pageSize > 500 { 741 if pageSize == 0 || pageSize > 500 {
661 pageSize = 500 742 pageSize = 500
662 } 743 }
663 744
664 // Deserialize the cursor. 745 // Deserialize the cursor.
665 var cursorObj ds.Cursor 746 var cursorObj ds.Cursor
666 if cursor != "" { 747 if cursor != "" {
667 var err error 748 var err error
668 cursorObj, err = ds.DecodeCursor(c, cursor) 749 cursorObj, err = ds.DecodeCursor(c, cursor)
669 if err != nil { 750 if err != nil {
(...skipping 11 matching lines...) Expand all
681 } 762 }
682 763
683 // Fetch pageSize worth of invocations, then grab the cursor. 764 // Fetch pageSize worth of invocations, then grab the cursor.
684 out := make([]*Invocation, 0, pageSize) 765 out := make([]*Invocation, 0, pageSize)
685 var newCursor string 766 var newCursor string
686 err := ds.Run(c, q, func(obj *Invocation, getCursor ds.CursorCB) error { 767 err := ds.Run(c, q, func(obj *Invocation, getCursor ds.CursorCB) error {
687 out = append(out, obj) 768 out = append(out, obj)
688 if len(out) < pageSize { 769 if len(out) < pageSize {
689 return nil 770 return nil
690 } 771 }
691 » » c, err := getCursor() 772 » » cur, err := getCursor()
692 if err != nil { 773 if err != nil {
693 return err 774 return err
694 } 775 }
695 » » newCursor = c.String() 776 » » newCursor = cur.String()
696 return ds.Stop 777 return ds.Stop
697 }) 778 })
698 if err != nil { 779 if err != nil {
699 return nil, "", transient.Tag.Apply(err) 780 return nil, "", transient.Tag.Apply(err)
700 } 781 }
701 return out, newCursor, nil 782 return out, newCursor, nil
702 } 783 }
703 784
704 func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) { 785 func (e *engineImpl) GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) {
786 » switch _, err := e.GetVisibleJob(c, jobID); {
787 » case err == ErrNoSuchJob:
788 » » return nil, ErrNoSuchInvocation
789 » case err != nil:
790 » » return nil, err
791 » default:
792 » » return e.getInvocation(c, jobID, invID)
793 » }
794 }
795
796 func (e *engineImpl) getInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) {
705 inv := &Invocation{ 797 inv := &Invocation{
706 ID: invID, 798 ID: invID,
707 JobKey: ds.NewKey(c, "Job", jobID, 0, nil), 799 JobKey: ds.NewKey(c, "Job", jobID, 0, nil),
708 } 800 }
709 switch err := ds.Get(c, inv); { 801 switch err := ds.Get(c, inv); {
710 case err == nil: 802 case err == nil:
711 return inv, nil 803 return inv, nil
712 case err == ds.ErrNoSuchEntity: 804 case err == ds.ErrNoSuchEntity:
713 return nil, nil 805 return nil, nil
714 default: 806 default:
715 return nil, transient.Tag.Apply(err) 807 return nil, transient.Tag.Apply(err)
716 } 808 }
717 } 809 }
718 810
719 func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([ ]*Invocation, error) { 811 func (e *engineImpl) GetVisibleInvocationsByNonce(c context.Context, invNonce in t64) ([]*Invocation, error) {
720 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) 812 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce)
721 entities := []*Invocation{} 813 entities := []*Invocation{}
722 if err := ds.GetAll(c, q, &entities); err != nil { 814 if err := ds.GetAll(c, q, &entities); err != nil {
723 return nil, transient.Tag.Apply(err) 815 return nil, transient.Tag.Apply(err)
724 } 816 }
817 if len(entities) > 0 {
818 // All Invocations with the same nonce must belong to the same J ob.
819 switch _, err := e.GetVisibleJob(c, entities[0].JobKey.StringID( )); {
820 case err == ErrNoSuchJob:
821 return []*Invocation{}, nil
822 case err != nil:
823 return nil, err
824 }
825 }
725 return entities, nil 826 return entities, nil
726 } 827 }
727 828
728 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error { 829 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error {
729 // JobID -> *Job map. 830 // JobID -> *Job map.
730 existing, err := e.getProjectJobs(c, projectID) 831 existing, err := e.getProjectJobs(c, projectID)
731 if err != nil { 832 if err != nil {
732 return err 833 return err
733 } 834 }
734 // JobID -> new definition revision map. 835 // JobID -> new definition revision map.
(...skipping 309 matching lines...) Expand 10 before | Expand all | Expand 10 after
1044 1145
1045 func (e *engineImpl) executeInvAction(c context.Context, payload *actionTaskPayl oad, retryCount int) error { 1146 func (e *engineImpl) executeInvAction(c context.Context, payload *actionTaskPayl oad, retryCount int) error {
1046 switch { 1147 switch {
1047 case payload.InvTimer != nil: 1148 case payload.InvTimer != nil:
1048 return e.invocationTimerTick(c, payload.JobID, payload.InvID, pa yload.InvTimer) 1149 return e.invocationTimerTick(c, payload.JobID, payload.InvID, pa yload.InvTimer)
1049 default: 1150 default:
1050 return fmt.Errorf("unexpected invocation action kind %q", payloa d) 1151 return fmt.Errorf("unexpected invocation action kind %q", payloa d)
1051 } 1152 }
1052 } 1153 }
1053 1154
1054 func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere dBy identity.Identity) (int64, error) { 1155 func (e *engineImpl) TriggerInvocation(c context.Context, jobID string) (int64, error) {
1156 » // First, we check ACLs.
1157 » if _, err := e.getOwnedJob(c, jobID); err != nil {
1158 » » return 0, err
1159 » }
1160
1055 var err error 1161 var err error
1056 var invNonce int64 1162 var invNonce int64
1057 err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) er ror { 1163 err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) er ror {
1058 if isNew { 1164 if isNew {
1059 err = ErrNoSuchJob 1165 err = ErrNoSuchJob
1060 return errSkipPut 1166 return errSkipPut
1061 } 1167 }
1062 if !job.Enabled { 1168 if !job.Enabled {
1063 err = errors.New("the job is disabled") 1169 err = errors.New("the job is disabled")
1064 return errSkipPut 1170 return errSkipPut
1065 } 1171 }
1066 invNonce = 0 1172 invNonce = 0
1067 return e.rollSM(c, job, func(sm *StateMachine) error { 1173 return e.rollSM(c, job, func(sm *StateMachine) error {
1068 » » » if err := sm.OnManualInvocation(triggeredBy); err != nil { 1174 » » » if err := sm.OnManualInvocation(auth.CurrentIdentity(c)) ; err != nil {
1069 return err 1175 return err
1070 } 1176 }
1071 invNonce = sm.State.InvocationNonce 1177 invNonce = sm.State.InvocationNonce
1072 return nil 1178 return nil
1073 }) 1179 })
1074 }) 1180 })
1075 if err == nil { 1181 if err == nil {
1076 err = err2 1182 err = err2
1077 } 1183 }
1078 return invNonce, err 1184 return invNonce, err
1079 } 1185 }
1080 1186
1081 func (e *engineImpl) PauseJob(c context.Context, jobID string, who identity.Iden tity) error { 1187 func (e *engineImpl) PauseJob(c context.Context, jobID string) error {
1082 » return e.setPausedFlag(c, jobID, true, who) 1188 » // First, we check ACLs.
1189 » if _, err := e.getOwnedJob(c, jobID); err != nil {
Vadim Sh. 2017/08/04 23:07:26 why not move it into 'setPausedFlag'?
tandrii(chromium) 2017/08/07 12:48:17 Good point. Done.
1190 » » return err
1191 » }
1192 » return e.setPausedFlag(c, jobID, true, auth.CurrentIdentity(c))
1083 } 1193 }
1084 1194
1085 func (e *engineImpl) ResumeJob(c context.Context, jobID string, who identity.Ide ntity) error { 1195 func (e *engineImpl) ResumeJob(c context.Context, jobID string) error {
1086 » return e.setPausedFlag(c, jobID, false, who) 1196 » // First, we check ACLs.
1197 » if _, err := e.getOwnedJob(c, jobID); err != nil {
1198 » » return err
1199 » }
1200 » return e.setPausedFlag(c, jobID, false, auth.CurrentIdentity(c))
1087 } 1201 }
1088 1202
1089 func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool, who identity.Identity) error { 1203 func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool, who identity.Identity) error {
1090 return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or { 1204 return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or {
1091 if isNew || !job.Enabled { 1205 if isNew || !job.Enabled {
1092 return ErrNoSuchJob 1206 return ErrNoSuchJob
1093 } 1207 }
1094 if job.Paused == paused { 1208 if job.Paused == paused {
1095 return errSkipPut 1209 return errSkipPut
1096 } 1210 }
1097 if paused { 1211 if paused {
1098 logging.Warningf(c, "Job is paused by %s", who) 1212 logging.Warningf(c, "Job is paused by %s", who)
1099 } else { 1213 } else {
1100 logging.Warningf(c, "Job is resumed by %s", who) 1214 logging.Warningf(c, "Job is resumed by %s", who)
1101 } 1215 }
1102 job.Paused = paused 1216 job.Paused = paused
1103 return e.rollSM(c, job, func(sm *StateMachine) error { 1217 return e.rollSM(c, job, func(sm *StateMachine) error {
1104 sm.OnScheduleChange() 1218 sm.OnScheduleChange()
1105 return nil 1219 return nil
1106 }) 1220 })
1107 }) 1221 })
1108 } 1222 }
1109 1223
1110 func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6 4, who identity.Identity) error { 1224 func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6 4) error {
1225 » if _, err := e.getOwnedJob(c, jobID); err != nil {
1226 » » return err
1227 » }
1228 » return e.abortInvocation(c, jobID, invID)
1229 }
1230
1231 func (e *engineImpl) abortInvocation(c context.Context, jobID string, invID int6 4) error {
1111 c = logging.SetField(c, "JobID", jobID) 1232 c = logging.SetField(c, "JobID", jobID)
1112 c = logging.SetField(c, "InvID", invID) 1233 c = logging.SetField(c, "InvID", invID)
1113 1234
1114 var inv *Invocation 1235 var inv *Invocation
1115 var err error 1236 var err error
1116 » switch inv, err = e.GetInvocation(c, jobID, invID); { 1237 » switch inv, err = e.getInvocation(c, jobID, invID); {
1117 case err != nil: 1238 case err != nil:
1118 logging.Errorf(c, "Failed to fetch the invocation - %s", err) 1239 logging.Errorf(c, "Failed to fetch the invocation - %s", err)
1119 return err 1240 return err
1120 case inv == nil: 1241 case inv == nil:
1121 logging.Errorf(c, "The invocation doesn't exist") 1242 logging.Errorf(c, "The invocation doesn't exist")
1122 return ErrNoSuchInvocation 1243 return ErrNoSuchInvocation
1123 case inv.Status.Final(): 1244 case inv.Status.Final():
1124 return nil 1245 return nil
1125 } 1246 }
1126 1247
1127 ctl, err := e.controllerForInvocation(c, inv) 1248 ctl, err := e.controllerForInvocation(c, inv)
1128 if err != nil { 1249 if err != nil {
1129 logging.Errorf(c, "Cannot get controller - %s", err) 1250 logging.Errorf(c, "Cannot get controller - %s", err)
1130 return err 1251 return err
1131 } 1252 }
1132 1253
1133 » ctl.DebugLog("Invocation is manually aborted by %q", who) 1254 » ctl.DebugLog("Invocation is manually aborted by %q", auth.CurrentUser(c) )
1134 if err = ctl.manager.AbortTask(c, ctl); err != nil { 1255 if err = ctl.manager.AbortTask(c, ctl); err != nil {
1135 logging.Errorf(c, "Failed to abort the task - %s", err) 1256 logging.Errorf(c, "Failed to abort the task - %s", err)
1136 return err 1257 return err
1137 } 1258 }
1138 1259
1139 ctl.State().Status = task.StatusAborted 1260 ctl.State().Status = task.StatusAborted
1140 if err = ctl.Save(c); err != nil { 1261 if err = ctl.Save(c); err != nil {
1141 logging.Errorf(c, "Failed to save the invocation - %s", err) 1262 logging.Errorf(c, "Failed to save the invocation - %s", err)
1142 return err 1263 return err
1143 } 1264 }
1144 return nil 1265 return nil
1145 } 1266 }
1146 1267
1147 // AbortJob resets the job to scheduled state, aborting a currently pending or 1268 // AbortJob resets the job to scheduled state, aborting a currently pending or
1148 // running invocation (if any). 1269 // running invocation (if any).
1149 // 1270 //
1150 // Returns nil if the job is not currently running. 1271 // Returns nil if the job is not currently running.
1151 func (e *engineImpl) AbortJob(c context.Context, jobID string, who identity.Iden tity) error { 1272 func (e *engineImpl) AbortJob(c context.Context, jobID string) error {
1152 » // First we switch the job to the default state and disassociate the run ning 1273 » // First, we check ACLs.
1274 » if _, err := e.getOwnedJob(c, jobID); err != nil {
1275 » » return err
1276 » }
1277 » // Second, we switch the job to the default state and disassociate the r unning
1153 // invocation (if any) from the job entity. 1278 // invocation (if any) from the job entity.
1154 var invID int64 1279 var invID int64
1155 err := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or { 1280 err := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or {
1156 if isNew { 1281 if isNew {
1157 return errSkipPut // the job was removed, nothing to abo rt 1282 return errSkipPut // the job was removed, nothing to abo rt
1158 } 1283 }
1159 invID = job.State.InvocationID 1284 invID = job.State.InvocationID
1160 return e.rollSM(c, job, func(sm *StateMachine) error { 1285 return e.rollSM(c, job, func(sm *StateMachine) error {
1161 sm.OnManualAbort() 1286 sm.OnManualAbort()
1162 return nil 1287 return nil
1163 }) 1288 })
1164 }) 1289 })
1165 if err != nil { 1290 if err != nil {
1166 return err 1291 return err
1167 } 1292 }
1168 1293
1169 // Now we kill the invocation. We do it separately because it may involv e 1294 // Now we kill the invocation. We do it separately because it may involv e
1170 // an RPC to remote service (e.g. to cancel a task) that can't be done f rom 1295 // an RPC to remote service (e.g. to cancel a task) that can't be done f rom
1171 // the transaction. 1296 // the transaction.
1172 if invID != 0 { 1297 if invID != 0 {
1173 » » return e.AbortInvocation(c, jobID, invID, who) 1298 » » return e.abortInvocation(c, jobID, invID)
1174 } 1299 }
1175 return nil 1300 return nil
1176 } 1301 }
1177 1302
1178 // updateJob updates an existing job if its definition has changed, adds 1303 // updateJob updates an existing job if its definition has changed, adds
1179 // a completely new job or enables a previously disabled job. 1304 // a completely new job or enables a previously disabled job.
1180 func (e *engineImpl) updateJob(c context.Context, def catalog.Definition) error { 1305 func (e *engineImpl) updateJob(c context.Context, def catalog.Definition) error {
1181 return e.txn(c, def.JobID, func(c context.Context, job *Job, isNew bool) error { 1306 return e.txn(c, def.JobID, func(c context.Context, job *Job, isNew bool) error {
1182 if !isNew && job.Enabled && job.matches(def) { 1307 if !isNew && job.Enabled && job.matches(def) {
1183 return errSkipPut 1308 return errSkipPut
(...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after
1310 } 1435 }
1311 1436
1312 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks. 1437 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks.
1313 // 1438 //
1314 // See also handlePubSubMessage, it is quite similar. 1439 // See also handlePubSubMessage, it is quite similar.
1315 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID int64, timer *invocationTimer) error { 1440 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID int64, timer *invocationTimer) error {
1316 c = logging.SetField(c, "JobID", jobID) 1441 c = logging.SetField(c, "JobID", jobID)
1317 c = logging.SetField(c, "InvID", invID) 1442 c = logging.SetField(c, "InvID", invID)
1318 1443
1319 logging.Infof(c, "Handling invocation timer %q", timer.Name) 1444 logging.Infof(c, "Handling invocation timer %q", timer.Name)
1320 » inv, err := e.GetInvocation(c, jobID, invID) 1445 » inv, err := e.getInvocation(c, jobID, invID)
1321 if err != nil { 1446 if err != nil {
1322 logging.Errorf(c, "Failed to fetch the invocation - %s", err) 1447 logging.Errorf(c, "Failed to fetch the invocation - %s", err)
1323 return err 1448 return err
1324 } 1449 }
1325 if inv == nil { 1450 if inv == nil {
1326 return ErrNoSuchInvocation 1451 return ErrNoSuchInvocation
1327 } 1452 }
1328 1453
1329 // Finished invocations are immutable, skip the message. 1454 // Finished invocations are immutable, skip the message.
1330 if inv.Status.Final() { 1455 if inv.Status.Final() {
(...skipping 364 matching lines...) Expand 10 before | Expand all | Expand 10 after
1695 return err 1820 return err
1696 } 1821 }
1697 jobID = data["job"] 1822 jobID = data["job"]
1698 if invID, err = strconv.ParseInt(data["inv"], 10, 64); err != nil { 1823 if invID, err = strconv.ParseInt(data["inv"], 10, 64); err != nil {
1699 logging.Errorf(c, "Could not parse 'inv' %q - %s", data["inv"], err) 1824 logging.Errorf(c, "Could not parse 'inv' %q - %s", data["inv"], err)
1700 return err 1825 return err
1701 } 1826 }
1702 1827
1703 c = logging.SetField(c, "JobID", jobID) 1828 c = logging.SetField(c, "JobID", jobID)
1704 c = logging.SetField(c, "InvID", invID) 1829 c = logging.SetField(c, "InvID", invID)
1705 » inv, err := e.GetInvocation(c, jobID, invID) 1830 » inv, err := e.getInvocation(c, jobID, invID)
1706 if err != nil { 1831 if err != nil {
1707 logging.Errorf(c, "Failed to fetch the invocation - %s", err) 1832 logging.Errorf(c, "Failed to fetch the invocation - %s", err)
1708 return err 1833 return err
1709 } 1834 }
1710 if inv == nil { 1835 if inv == nil {
1711 return ErrNoSuchInvocation 1836 return ErrNoSuchInvocation
1712 } 1837 }
1713 1838
1714 // Finished invocations are immutable, skip the message. 1839 // Finished invocations are immutable, skip the message.
1715 if inv.Status.Final() { 1840 if inv.Status.Final() {
(...skipping 236 matching lines...) Expand 10 before | Expand all | Expand 10 after
1952 } 2077 }
1953 if hasFinished { 2078 if hasFinished {
1954 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err or { 2079 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err or {
1955 sm.OnInvocationDone(saving.ID) 2080 sm.OnInvocationDone(saving.ID)
1956 return nil 2081 return nil
1957 }) 2082 })
1958 } 2083 }
1959 return nil 2084 return nil
1960 }) 2085 })
1961 } 2086 }
OLDNEW
« no previous file with comments | « scheduler/appengine/catalog/catalog_test.go ('k') | scheduler/appengine/engine/engine_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698