Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| (...skipping 30 matching lines...) Expand all Loading... | |
| 41 "github.com/luci/luci-go/common/data/rand/mathrand" | 41 "github.com/luci/luci-go/common/data/rand/mathrand" |
| 42 "github.com/luci/luci-go/common/data/stringset" | 42 "github.com/luci/luci-go/common/data/stringset" |
| 43 "github.com/luci/luci-go/common/errors" | 43 "github.com/luci/luci-go/common/errors" |
| 44 "github.com/luci/luci-go/common/logging" | 44 "github.com/luci/luci-go/common/logging" |
| 45 "github.com/luci/luci-go/common/retry/transient" | 45 "github.com/luci/luci-go/common/retry/transient" |
| 46 "github.com/luci/luci-go/server/auth" | 46 "github.com/luci/luci-go/server/auth" |
| 47 "github.com/luci/luci-go/server/auth/identity" | 47 "github.com/luci/luci-go/server/auth/identity" |
| 48 "github.com/luci/luci-go/server/auth/signing" | 48 "github.com/luci/luci-go/server/auth/signing" |
| 49 "github.com/luci/luci-go/server/tokens" | 49 "github.com/luci/luci-go/server/tokens" |
| 50 | 50 |
| 51 "github.com/luci/luci-go/scheduler/appengine/acl" | |
| 51 "github.com/luci/luci-go/scheduler/appengine/catalog" | 52 "github.com/luci/luci-go/scheduler/appengine/catalog" |
| 52 "github.com/luci/luci-go/scheduler/appengine/schedule" | 53 "github.com/luci/luci-go/scheduler/appengine/schedule" |
| 53 "github.com/luci/luci-go/scheduler/appengine/task" | 54 "github.com/luci/luci-go/scheduler/appengine/task" |
| 54 ) | 55 ) |
| 55 | 56 |
| 56 var ( | 57 var ( |
| 57 ErrNoSuchJob = errors.New("no such job") | 58 ErrNoSuchJob = errors.New("no such job") |
| 58 ErrNoSuchInvocation = errors.New("the invocation doesn't exist") | 59 ErrNoSuchInvocation = errors.New("the invocation doesn't exist") |
| 59 ) | 60 ) |
| 60 | 61 |
| 61 // Engine manages all scheduler jobs: keeps track of their state, runs state | 62 // Engine manages all scheduler jobs: keeps track of their state, runs state |
| 62 // machine transactions, starts new invocations, etc. A method returns | 63 // machine transactions, starts new invocations, etc. A method returns |
| 63 // errors.Transient if the error is non-fatal and the call should be retried | 64 // errors.Transient if the error is non-fatal and the call should be retried |
| 64 // later. Any other error means that retry won't help. | 65 // later. Any other error means that retry won't help. |
| 65 type Engine interface { | 66 type Engine interface { |
| 66 // GetAllProjects returns a list of all projects that have at least one | 67 // GetAllProjects returns a list of all projects that have at least one |
| 67 // enabled scheduler job. | 68 // enabled scheduler job. |
| 69 // ACLs are NOT enforced; internal scheduler use only. | |
| 68 GetAllProjects(c context.Context) ([]string, error) | 70 GetAllProjects(c context.Context) ([]string, error) |
| 69 | 71 |
| 70 » // GetAllJobs returns a list of all enabled scheduler jobs in no particu lar | 72 » // GetAllJobsRA returns a list of all enabled scheduler jobs to which |
| 71 » // order. | 73 » // current identity has READER access in no particular order. |
| 72 » GetAllJobs(c context.Context) ([]*Job, error) | 74 » GetAllJobsRA(c context.Context) ([]*Job, error) |
|
tandrii(chromium)
2017/08/02 20:23:46
RA here and below stands for READER Access. I wish
| |
| 73 | 75 |
| 74 » // GetProjectJobs returns a list of enabled scheduler jobs of some proje ct | 76 » // GetProjectJobsRA returns a list of enabled scheduler jobs to which cu rreent |
| 75 » // in no particular order. | 77 » // identity has READER access of some project in no particular order. |
| 76 » GetProjectJobs(c context.Context, projectID string) ([]*Job, error) | 78 » GetProjectJobsRA(c context.Context, projectID string) ([]*Job, error) |
| 77 | 79 |
| 78 » // GetJob returns single scheduler job given its full ID or nil if no su ch | 80 » // GetJobRA returns single scheduler job given its full ID if and only i f |
| 79 » // job. | 81 » // current identity has READER access to it. Returns nil otherwise. |
| 80 » GetJob(c context.Context, jobID string) (*Job, error) | 82 » GetJobRA(c context.Context, jobID string) (*Job, error) |
| 81 | 83 |
| 82 // ListInvocations returns invocations of a job, most recent first. | 84 // ListInvocations returns invocations of a job, most recent first. |
| 83 // Returns fetched invocations and cursor string if there's more. | 85 // Returns fetched invocations and cursor string if there's more. |
| 86 // TODO(tandrii): enforce ACLs. | |
| 84 ListInvocations(c context.Context, jobID string, pageSize int, cursor st ring) ([]*Invocation, string, error) | 87 ListInvocations(c context.Context, jobID string, pageSize int, cursor st ring) ([]*Invocation, string, error) |
|
tandrii(chromium)
2017/08/02 20:23:46
this and two methods below will fetch JobId concur
Vadim Sh.
2017/08/02 21:14:13
Yes.
| |
| 85 | 88 |
| 86 // GetInvocation returns single invocation of some job given its ID. | 89 // GetInvocation returns single invocation of some job given its ID. |
| 90 // TODO(tandrii): enforce ACLs. | |
| 87 GetInvocation(c context.Context, jobID string, invID int64) (*Invocation , error) | 91 GetInvocation(c context.Context, jobID string, invID int64) (*Invocation , error) |
| 88 | 92 |
| 89 // GetInvocationsByNonce returns a list of Invocations with given nonce. | 93 // GetInvocationsByNonce returns a list of Invocations with given nonce. |
| 90 // | 94 // |
| 91 // Invocation nonce is a random number that identifies an intent to star t | 95 // Invocation nonce is a random number that identifies an intent to star t |
| 92 // an invocation. Normally one nonce corresponds to one Invocation entit y, | 96 // an invocation. Normally one nonce corresponds to one Invocation entit y, |
| 93 // but there can be more if job fails to start with a transient error. | 97 // but there can be more if job fails to start with a transient error. |
| 98 // TODO(tandrii): enforce ACLs. | |
| 94 GetInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error) | 99 GetInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error) |
| 95 | 100 |
| 96 // UpdateProjectJobs adds new, removes old and updates existing jobs. | 101 // UpdateProjectJobs adds new, removes old and updates existing jobs. |
| 97 UpdateProjectJobs(c context.Context, projectID string, defs []catalog.De finition) error | 102 UpdateProjectJobs(c context.Context, projectID string, defs []catalog.De finition) error |
| 98 | 103 |
| 99 // ResetAllJobsOnDevServer forcefully resets state of all enabled jobs. | 104 // ResetAllJobsOnDevServer forcefully resets state of all enabled jobs. |
| 100 // Supposed to be used only on devserver, where task queue stub state is not | 105 // Supposed to be used only on devserver, where task queue stub state is not |
| 101 // preserved between appserver restarts and it messes everything. | 106 // preserved between appserver restarts and it messes everything. |
| 102 ResetAllJobsOnDevServer(c context.Context) error | 107 ResetAllJobsOnDevServer(c context.Context) error |
| 103 | 108 |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 118 // It is needed to be able to manually tests PubSub related workflows on dev | 123 // It is needed to be able to manually tests PubSub related workflows on dev |
| 119 // server, since dev server can't accept PubSub push messages. | 124 // server, since dev server can't accept PubSub push messages. |
| 120 PullPubSubOnDevServer(c context.Context, taskManagerName, publisher stri ng) error | 125 PullPubSubOnDevServer(c context.Context, taskManagerName, publisher stri ng) error |
| 121 | 126 |
| 122 // TriggerInvocation launches job invocation right now if job isn't runn ing | 127 // TriggerInvocation launches job invocation right now if job isn't runn ing |
| 123 // now. Used by "Run now" UI button. | 128 // now. Used by "Run now" UI button. |
| 124 // | 129 // |
| 125 // Returns new invocation nonce (a random number that identifies an inte nt to | 130 // Returns new invocation nonce (a random number that identifies an inte nt to |
| 126 // start an invocation). Normally one nonce corresponds to one Invocatio n | 131 // start an invocation). Normally one nonce corresponds to one Invocatio n |
| 127 // entity, but there can be more if job fails to start with a transient error. | 132 // entity, but there can be more if job fails to start with a transient error. |
| 128 TriggerInvocation(c context.Context, jobID string, triggeredBy identity. Identity) (int64, error) | 133 TriggerInvocation(c context.Context, jobID string, triggeredBy identity. Identity) (int64, error) |
|
tandrii(chromium)
2017/08/02 20:23:46
this and methods below: i want to remove triggered
Vadim Sh.
2017/08/02 21:14:13
sgtm
| |
| 129 | 134 |
| 130 // PauseJob replaces job's schedule with "triggered", effectively preven ting | 135 // PauseJob replaces job's schedule with "triggered", effectively preven ting |
| 131 // it from running automatically (until it is resumed). Manual invocatio ns are | 136 // it from running automatically (until it is resumed). Manual invocatio ns are |
| 132 // still allowed. Does nothing if job is already paused. Any pending or | 137 // still allowed. Does nothing if job is already paused. Any pending or |
| 133 // running invocations are still executed. | 138 // running invocations are still executed. |
| 134 PauseJob(c context.Context, jobID string, who identity.Identity) error | 139 PauseJob(c context.Context, jobID string, who identity.Identity) error |
| 135 | 140 |
| 136 // ResumeJob resumes paused job. Doesn't nothing if the job is not pause d. | 141 // ResumeJob resumes paused job. Doesn't nothing if the job is not pause d. |
| 137 ResumeJob(c context.Context, jobID string, who identity.Identity) error | 142 ResumeJob(c context.Context, jobID string, who identity.Identity) error |
| 138 | 143 |
| (...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 262 // an appropriate revision. | 267 // an appropriate revision. |
| 263 RevisionURL string `gae:",noindex"` | 268 RevisionURL string `gae:",noindex"` |
| 264 | 269 |
| 265 // Schedule is the job's schedule in regular cron expression format. | 270 // Schedule is the job's schedule in regular cron expression format. |
| 266 Schedule string `gae:",noindex"` | 271 Schedule string `gae:",noindex"` |
| 267 | 272 |
| 268 // Task is the job's payload in serialized form. Opaque from the point o f view | 273 // Task is the job's payload in serialized form. Opaque from the point o f view |
| 269 // of the engine. See Catalog.UnmarshalTask(). | 274 // of the engine. See Catalog.UnmarshalTask(). |
| 270 Task []byte `gae:",noindex"` | 275 Task []byte `gae:",noindex"` |
| 271 | 276 |
| 277 // ACLs are the latest ACLs applied to Job and all its invocations. | |
| 278 Acls acl.GrantsByRole `gae:",noindex"` | |
| 279 | |
| 272 // State is the job's state machine state, see StateMachine. | 280 // State is the job's state machine state, see StateMachine. |
| 273 State JobState | 281 State JobState |
| 274 } | 282 } |
| 275 | 283 |
| 276 // GetJobName returns name of this Job as defined its project's config. | 284 // GetJobName returns name of this Job as defined its project's config. |
| 277 func (e *Job) GetJobName() string { | 285 func (e *Job) GetJobName() string { |
| 278 // JobID has form <project>/<id>. Split it into components. | 286 // JobID has form <project>/<id>. Split it into components. |
| 279 chunks := strings.Split(e.JobID, "/") | 287 chunks := strings.Split(e.JobID, "/") |
| 280 return chunks[1] | 288 return chunks[1] |
| 281 } | 289 } |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 321 e.State == other.State) | 329 e.State == other.State) |
| 322 } | 330 } |
| 323 | 331 |
| 324 // matches returns true if job definition in the entity matches the one | 332 // matches returns true if job definition in the entity matches the one |
| 325 // specified by catalog.Definition struct. UpdateProjectJobs skips updates for | 333 // specified by catalog.Definition struct. UpdateProjectJobs skips updates for |
| 326 // such jobs (assuming they are up-to-date). | 334 // such jobs (assuming they are up-to-date). |
| 327 func (e *Job) matches(def catalog.Definition) bool { | 335 func (e *Job) matches(def catalog.Definition) bool { |
| 328 return e.JobID == def.JobID && | 336 return e.JobID == def.JobID && |
| 329 e.Flavor == def.Flavor && | 337 e.Flavor == def.Flavor && |
| 330 e.Schedule == def.Schedule && | 338 e.Schedule == def.Schedule && |
| 339 e.Acls.Equal(&def.Acls) && | |
| 331 bytes.Equal(e.Task, def.Task) | 340 bytes.Equal(e.Task, def.Task) |
| 332 } | 341 } |
| 333 | 342 |
| 334 // Invocation entity stores single attempt to run a job. Its parent entity | 343 // Invocation entity stores single attempt to run a job. Its parent entity |
| 335 // is corresponding Job, its ID is generated based on time. | 344 // is corresponding Job, its ID is generated based on time. |
| 336 type Invocation struct { | 345 type Invocation struct { |
| 337 _kind string `gae:"$kind,Invocation"` | 346 _kind string `gae:"$kind,Invocation"` |
| 338 _extra ds.PropertyMap `gae:"-,extra"` | 347 _extra ds.PropertyMap `gae:"-,extra"` |
| 339 | 348 |
| 340 // ID is identifier of this particular attempt to run a job. Multiple at tempts | 349 // ID is identifier of this particular attempt to run a job. Multiple at tempts |
| (...skipping 271 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 612 // Filter out duplicates, sort. | 621 // Filter out duplicates, sort. |
| 613 projects := stringset.New(len(entities)) | 622 projects := stringset.New(len(entities)) |
| 614 for _, ent := range entities { | 623 for _, ent := range entities { |
| 615 projects.Add(ent.ProjectID) | 624 projects.Add(ent.ProjectID) |
| 616 } | 625 } |
| 617 out := projects.ToSlice() | 626 out := projects.ToSlice() |
| 618 sort.Strings(out) | 627 sort.Strings(out) |
| 619 return out, nil | 628 return out, nil |
| 620 } | 629 } |
| 621 | 630 |
| 622 func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) { | 631 func (e *engineImpl) GetAllJobsRA(c context.Context) ([]*Job, error) { |
| 623 q := ds.NewQuery("Job").Eq("Enabled", true) | 632 q := ds.NewQuery("Job").Eq("Enabled", true) |
| 624 » return e.queryEnabledJobs(c, q) | 633 » return e.queryEnabledJobsRA(c, q) |
| 625 } | 634 } |
| 626 | 635 |
| 627 func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job , error) { | 636 func (e *engineImpl) GetProjectJobsRA(c context.Context, projectID string) ([]*J ob, error) { |
| 628 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) | 637 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) |
| 629 » return e.queryEnabledJobs(c, q) | 638 » return e.queryEnabledJobsRA(c, q) |
| 630 } | 639 } |
| 631 | 640 |
| 632 func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e rror) { | 641 func (e *engineImpl) queryEnabledJobsRA(c context.Context, q *ds.Query) ([]*Job, error) { |
| 633 entities := []*Job{} | 642 entities := []*Job{} |
| 634 if err := ds.GetAll(c, q, &entities); err != nil { | 643 if err := ds.GetAll(c, q, &entities); err != nil { |
| 635 return nil, transient.Tag.Apply(err) | 644 return nil, transient.Tag.Apply(err) |
| 636 } | 645 } |
| 637 // Non-ancestor query used, need to recheck filters. | 646 // Non-ancestor query used, need to recheck filters. |
| 638 filtered := make([]*Job, 0, len(entities)) | 647 filtered := make([]*Job, 0, len(entities)) |
| 639 for _, job := range entities { | 648 for _, job := range entities { |
| 640 » » if job.Enabled { | 649 » » if !job.Enabled { |
| 650 » » » continue | |
| 651 » » } | |
| 652 » » // TODO(tandrii): improve batch ACLs check here to take advantag e of likely | |
| 653 » » // shared ACLs between most jobs of the same project. | |
| 654 » » if ok, err := job.Acls.IsReader(c); err != nil { | |
| 655 » » » return nil, transient.Tag.Apply(err) | |
| 656 » » } else if ok { | |
| 641 filtered = append(filtered, job) | 657 filtered = append(filtered, job) |
| 642 } | 658 } |
| 643 } | 659 } |
| 644 return filtered, nil | 660 return filtered, nil |
| 645 } | 661 } |
| 646 | 662 |
| 647 func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { | 663 func (e *engineImpl) GetJobRA(c context.Context, jobID string) (*Job, error) { |
| 648 job := &Job{JobID: jobID} | 664 job := &Job{JobID: jobID} |
| 649 switch err := ds.Get(c, job); { | 665 switch err := ds.Get(c, job); { |
| 650 case err == nil: | 666 case err == nil: |
| 667 if hasAccess, err := job.Acls.IsReader(c); err != nil { | |
| 668 return nil, transient.Tag.Apply(err) | |
| 669 } else if !hasAccess { | |
| 670 return nil, nil | |
| 671 } | |
| 651 return job, nil | 672 return job, nil |
| 652 case err == ds.ErrNoSuchEntity: | 673 case err == ds.ErrNoSuchEntity: |
| 653 return nil, nil | 674 return nil, nil |
| 654 default: | 675 default: |
| 655 return nil, transient.Tag.Apply(err) | 676 return nil, transient.Tag.Apply(err) |
| 656 } | 677 } |
| 657 } | 678 } |
| 658 | 679 |
| 659 func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i nt, cursor string) ([]*Invocation, string, error) { | 680 func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i nt, cursor string) ([]*Invocation, string, error) { |
| 660 if pageSize == 0 || pageSize > 500 { | 681 if pageSize == 0 || pageSize > 500 { |
| (...skipping 1291 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1952 } | 1973 } |
| 1953 if hasFinished { | 1974 if hasFinished { |
| 1954 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err or { | 1975 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err or { |
| 1955 sm.OnInvocationDone(saving.ID) | 1976 sm.OnInvocationDone(saving.ID) |
| 1956 return nil | 1977 return nil |
| 1957 }) | 1978 }) |
| 1958 } | 1979 } |
| 1959 return nil | 1980 return nil |
| 1960 }) | 1981 }) |
| 1961 } | 1982 } |
| OLD | NEW |