Chromium Code Reviews| Index: scheduler/appengine/engine/engine.go |
| diff --git a/scheduler/appengine/engine/engine.go b/scheduler/appengine/engine/engine.go |
| index 57799b38eda49460691045f44521bc2fdc68d4d2..e27aea3c0d02efa1f422bf57a0f70ceb8eacba30 100644 |
| --- a/scheduler/appengine/engine/engine.go |
| +++ b/scheduler/appengine/engine/engine.go |
| @@ -48,50 +48,99 @@ import ( |
| "github.com/luci/luci-go/server/auth/signing" |
| "github.com/luci/luci-go/server/tokens" |
| + "github.com/luci/luci-go/scheduler/appengine/acl" |
| "github.com/luci/luci-go/scheduler/appengine/catalog" |
| "github.com/luci/luci-go/scheduler/appengine/schedule" |
| "github.com/luci/luci-go/scheduler/appengine/task" |
| ) |
| var ( |
| - ErrNoSuchJob = errors.New("no such job") |
| - ErrNoSuchInvocation = errors.New("the invocation doesn't exist") |
| + ErrNoOwnerPermission = errors.New("no OWNER permission on a job") |
| + ErrNoSuchJob = errors.New("no such job") |
| + ErrNoSuchInvocation = errors.New("the invocation doesn't exist") |
| ) |
| // Engine manages all scheduler jobs: keeps track of their state, runs state |
| // machine transactions, starts new invocations, etc. A method returns |
| // errors.Transient if the error is non-fatal and the call should be retried |
| // later. Any other error means that retry won't help. |
| +// ACLs are enforced unlike EngineInternal with the following implications: |
| +// * if caller lacks have READER access to Jobs, methods behave as if Jobs |
|
Vadim Sh.
2017/08/04 23:07:26
"lacks have" ?
tandrii(chromium)
2017/08/07 12:48:17
Done.
|
| +// do not exist. |
| +// * if caller lacks have OWNER access, calling mutating methods will result in |
|
Vadim Sh.
2017/08/04 23:07:27
same here
tandrii(chromium)
2017/08/07 12:48:17
Done.
|
| +// ErrNoOwnerPermission (assuming caller has READER access, else see above). |
| type Engine interface { |
| - // GetAllProjects returns a list of all projects that have at least one |
| - // enabled scheduler job. |
| - GetAllProjects(c context.Context) ([]string, error) |
| + // GetVisibleProjects returns a list of all projects that have at least one |
| + // enabled scheduler job to which caller has READER access. |
| + // GetVisibleProjects(c context.Context) ([]string, error) |
|
Vadim Sh.
2017/08/04 23:07:27
remove
I assume it's not needed now?
tandrii(chromium)
2017/08/07 12:48:16
Yep, Done.
|
| - // GetAllJobs returns a list of all enabled scheduler jobs in no particular |
| - // order. |
| - GetAllJobs(c context.Context) ([]*Job, error) |
| + // GetVisibleJobs returns a list of all enabled scheduler jobs in no |
| + // particular order. |
| + GetVisibleJobs(c context.Context) ([]*Job, error) |
| - // GetProjectJobs returns a list of enabled scheduler jobs of some project |
| - // in no particular order. |
| - GetProjectJobs(c context.Context, projectID string) ([]*Job, error) |
| + // GetVisibleProjectJobs returns a list of enabled scheduler jobs of some |
| + // project in no particular order. |
| + GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, error) |
| - // GetJob returns single scheduler job given its full ID or nil if no such |
| - // job. |
| - GetJob(c context.Context, jobID string) (*Job, error) |
| + // GetVisibleJob returns single scheduler job given its full ID or nil if no such |
| + // job or if not visible. |
| + GetVisibleJob(c context.Context, jobID string) (*Job, error) |
| - // ListInvocations returns invocations of a job, most recent first. |
| + // ListVisibleInvocations returns invocations of a visible job, most recent first. |
| // Returns fetched invocations and cursor string if there's more. |
| - ListInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error) |
| + ListVisibleInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error) |
| - // GetInvocation returns single invocation of some job given its ID. |
| - GetInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) |
| + // GetVisibleInvocation returns single invocation of some job given its ID. |
| + GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) |
| - // GetInvocationsByNonce returns a list of Invocations with given nonce. |
| + // GetVisibleInvocationsByNonce returns a list of Invocations with given nonce. |
| // |
| // Invocation nonce is a random number that identifies an intent to start |
| // an invocation. Normally one nonce corresponds to one Invocation entity, |
| // but there can be more if job fails to start with a transient error. |
| - GetInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error) |
| + GetVisibleInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error) |
| + |
| + // PauseJob replaces job's schedule with "triggered", effectively preventing |
| + // it from running automatically (until it is resumed). Manual invocations are |
| + // still allowed. Does nothing if job is already paused. Any pending or |
| + // running invocations are still executed. |
| + PauseJob(c context.Context, jobID string) error |
| + |
| + // ResumeJob resumes paused job. Does nothing if the job is not paused. |
| + ResumeJob(c context.Context, jobID string) error |
| + |
| + // AbortJob resets the job to scheduled state, aborting a currently pending or |
| + // running invocation (if any). |
| + // |
| + // Returns nil if the job is not currently running. |
| + AbortJob(c context.Context, jobID string) error |
| + |
| + // AbortInvocation forcefully moves the invocation to failed state. |
| + // |
| + // It opportunistically tries to send "abort" signal to a job runner if it |
| + // supports cancellation, but it doesn't wait for reply. It proceeds to |
| + // modifying local state in the scheduler service datastore immediately. |
| + // |
| + // AbortInvocation can be used to manually "unstuck" jobs that got stuck due |
| + // to missing PubSub notifications or other kinds of unexpected conditions. |
| + // |
| + // Does nothing if invocation is already in some final state. |
| + AbortInvocation(c context.Context, jobID string, invID int64) error |
| + |
| + // TriggerInvocation launches job invocation right now if job isn't running |
| + // now. Used by "Run now" UI button. |
| + // |
| + // Returns new invocation nonce (a random number that identifies an intent to |
| + // start an invocation). Normally one nonce corresponds to one Invocation |
| + // entity, but there can be more if job fails to start with a transient error. |
| + TriggerInvocation(c context.Context, jobID string) (int64, error) |
| +} |
| + |
| +// EngineInternal is to be used by frontend initialization code only. |
| +type EngineInternal interface { |
| + // GetAllProjects returns a list of all projects that have at least one |
| + // enabled scheduler job. |
| + GetAllProjects(c context.Context) ([]string, error) |
| // UpdateProjectJobs adds new, removes old and updates existing jobs. |
| UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error |
| @@ -119,40 +168,7 @@ type Engine interface { |
| // server, since dev server can't accept PubSub push messages. |
| PullPubSubOnDevServer(c context.Context, taskManagerName, publisher string) error |
| - // TriggerInvocation launches job invocation right now if job isn't running |
| - // now. Used by "Run now" UI button. |
| - // |
| - // Returns new invocation nonce (a random number that identifies an intent to |
| - // start an invocation). Normally one nonce corresponds to one Invocation |
| - // entity, but there can be more if job fails to start with a transient error. |
| - TriggerInvocation(c context.Context, jobID string, triggeredBy identity.Identity) (int64, error) |
| - |
| - // PauseJob replaces job's schedule with "triggered", effectively preventing |
| - // it from running automatically (until it is resumed). Manual invocations are |
| - // still allowed. Does nothing if job is already paused. Any pending or |
| - // running invocations are still executed. |
| - PauseJob(c context.Context, jobID string, who identity.Identity) error |
| - |
| - // ResumeJob resumes paused job. Doesn't nothing if the job is not paused. |
| - ResumeJob(c context.Context, jobID string, who identity.Identity) error |
| - |
| - // AbortInvocation forcefully moves the invocation to failed state. |
| - // |
| - // It opportunistically tries to send "abort" signal to a job runner if it |
| - // supports cancellation, but it doesn't wait for reply. It proceeds to |
| - // modifying local state in the scheduler service datastore immediately. |
| - // |
| - // AbortInvocation can be used to manually "unstuck" jobs that got stuck due |
| - // to missing PubSub notifications or other kinds of unexpected conditions. |
| - // |
| - // Does nothing if invocation is already in some final state. |
| - AbortInvocation(c context.Context, jobID string, invID int64, who identity.Identity) error |
| - |
| - // AbortJob resets the job to scheduled state, aborting a currently pending or |
| - // running invocation (if any). |
| - // |
| - // Returns nil if the job is not currently running. |
| - AbortJob(c context.Context, jobID string, who identity.Identity) error |
| + PublicAPI() Engine |
|
Vadim Sh.
2017/08/04 23:07:27
document
tandrii(chromium)
2017/08/07 12:48:17
Done.
|
| } |
| // Config contains parameters for the engine. |
| @@ -165,8 +181,8 @@ type Config struct { |
| PubSubPushPath string // URL to use in PubSub push config |
| } |
| -// NewEngine returns default implementation of Engine. |
| -func NewEngine(conf Config) Engine { |
| +// NewEngine returns default implementation of EngineInternal. |
| +func NewEngine(conf Config) EngineInternal { |
| return &engineImpl{ |
| Config: conf, |
| doneFlags: make(map[string]bool), |
| @@ -269,6 +285,9 @@ type Job struct { |
| // of the engine. See Catalog.UnmarshalTask(). |
| Task []byte `gae:",noindex"` |
| + // ACLs are the latest ACLs applied to Job and all its invocations. |
| + Acls acl.GrantsByRole `gae:",noindex"` |
| + |
| // State is the job's state machine state, see StateMachine. |
| State JobState |
| } |
| @@ -328,6 +347,7 @@ func (e *Job) matches(def catalog.Definition) bool { |
| return e.JobID == def.JobID && |
| e.Flavor == def.Flavor && |
| e.Schedule == def.Schedule && |
| + e.Acls.Equal(&def.Acls) && |
| bytes.Equal(e.Task, def.Task) |
| } |
| @@ -538,7 +558,8 @@ func debugLog(c context.Context, str *string, format string, args ...interface{} |
| *str += prefix + fmt.Sprintf(format+"\n", args...) |
| } |
| -//// |
| +//////////////////////////////////////////////////////////////////////////////// |
| +// engineImpl. |
| type engineImpl struct { |
| Config |
| @@ -619,17 +640,17 @@ func (e *engineImpl) GetAllProjects(c context.Context) ([]string, error) { |
| return out, nil |
| } |
| -func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) { |
| +func (e *engineImpl) GetVisibleJobs(c context.Context) ([]*Job, error) { |
| q := ds.NewQuery("Job").Eq("Enabled", true) |
| - return e.queryEnabledJobs(c, q) |
| + return e.queryEnabledVisibleJobs(c, q) |
| } |
| -func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job, error) { |
| +func (e *engineImpl) GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, error) { |
| q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) |
| - return e.queryEnabledJobs(c, q) |
| + return e.queryEnabledVisibleJobs(c, q) |
| } |
| -func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, error) { |
| +func (e *engineImpl) queryEnabledVisibleJobs(c context.Context, q *ds.Query) ([]*Job, error) { |
| entities := []*Job{} |
| if err := ds.GetAll(c, q, &entities); err != nil { |
| return nil, transient.Tag.Apply(err) |
| @@ -637,14 +658,62 @@ func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e |
| // Non-ancestor query used, need to recheck filters. |
| filtered := make([]*Job, 0, len(entities)) |
| for _, job := range entities { |
| - if job.Enabled { |
| + if !job.Enabled { |
| + continue |
| + } |
| + // TODO(tandrii): improve batch ACLs check here to take advantage of likely |
| + // shared ACLs between most jobs of the same project. |
| + if ok, err := job.Acls.IsReader(c); err != nil { |
| + return nil, transient.Tag.Apply(err) |
| + } else if ok { |
| filtered = append(filtered, job) |
| } |
| } |
| return filtered, nil |
| } |
| -func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { |
| +func (e *engineImpl) GetVisibleJob(c context.Context, jobID string) (*Job, error) { |
| + job, err := e.getJob(c, jobID) |
| + if err != nil { |
| + return nil, err |
| + } else if job == nil { |
| + return nil, ErrNoSuchJob |
| + } |
| + if ok, err := job.Acls.IsReader(c); err != nil { |
| + return nil, err |
| + } else if !ok { |
| + return nil, ErrNoSuchJob |
| + } |
| + return job, err |
|
Vadim Sh.
2017/08/04 23:07:26
nit: return job, nil
tandrii(chromium)
2017/08/07 12:48:17
Done.
|
| +} |
| + |
| +func (e *engineImpl) getOwnedJob(c context.Context, jobID string) (*Job, error) { |
| + job, err := e.getJob(c, jobID) |
| + if err != nil { |
| + return nil, err |
| + } else if job == nil { |
| + return nil, ErrNoSuchJob |
| + } |
| + |
| + switch owner, err := job.Acls.IsOwner(c); { |
| + case err != nil: |
| + return nil, err |
| + case owner: |
| + return job, nil |
| + } |
| + |
| + // Not owner, but maybe reader? Give nicer error in such case. |
| + switch reader, err := job.Acls.IsReader(c); { |
| + case err != nil: |
| + return nil, err |
| + case reader: |
| + return nil, ErrNoOwnerPermission |
| + default: |
| + return nil, ErrNoSuchJob |
| + } |
| +} |
| + |
| +func (e *engineImpl) getJob(c context.Context, jobID string) (*Job, error) { |
| job := &Job{JobID: jobID} |
| switch err := ds.Get(c, job); { |
| case err == nil: |
| @@ -656,7 +725,19 @@ func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { |
| } |
| } |
| -func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error) { |
| +func (e *engineImpl) PublicAPI() Engine { |
| + return e |
| +} |
| + |
| +func (e *engineImpl) ListVisibleInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error) { |
| + if job, err := e.GetVisibleJob(c, jobID); err != nil { |
|
Vadim Sh.
2017/08/04 23:07:27
strictly speaking, it would be better if we put AC
tandrii(chromium)
2017/08/07 12:48:17
First, since API and UI QPS is indeed negligible,
Vadim Sh.
2017/08/07 22:53:21
Sure, it was just a general thought. I don't think
|
| + return nil, "", err |
| + } else if job == nil { |
| + // Either no Job or no access, both imply returning no invocations. |
| + // return []*Invocation{}, "", nil |
| + return nil, "", ErrNoSuchInvocation |
| + } |
| + |
| if pageSize == 0 || pageSize > 500 { |
| pageSize = 500 |
| } |
| @@ -688,11 +769,11 @@ func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i |
| if len(out) < pageSize { |
| return nil |
| } |
| - c, err := getCursor() |
| + cur, err := getCursor() |
| if err != nil { |
| return err |
| } |
| - newCursor = c.String() |
| + newCursor = cur.String() |
| return ds.Stop |
| }) |
| if err != nil { |
| @@ -701,7 +782,18 @@ func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i |
| return out, newCursor, nil |
| } |
| -func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) { |
| +func (e *engineImpl) GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) { |
| + switch _, err := e.GetVisibleJob(c, jobID); { |
| + case err == ErrNoSuchJob: |
| + return nil, ErrNoSuchInvocation |
| + case err != nil: |
| + return nil, err |
| + default: |
| + return e.getInvocation(c, jobID, invID) |
| + } |
| +} |
| + |
| +func (e *engineImpl) getInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) { |
| inv := &Invocation{ |
| ID: invID, |
| JobKey: ds.NewKey(c, "Job", jobID, 0, nil), |
| @@ -716,12 +808,21 @@ func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64) |
| } |
| } |
| -func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error) { |
| +func (e *engineImpl) GetVisibleInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error) { |
| q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) |
| entities := []*Invocation{} |
| if err := ds.GetAll(c, q, &entities); err != nil { |
| return nil, transient.Tag.Apply(err) |
| } |
| + if len(entities) > 0 { |
| + // All Invocations with the same nonce must belong to the same Job. |
| + switch _, err := e.GetVisibleJob(c, entities[0].JobKey.StringID()); { |
| + case err == ErrNoSuchJob: |
| + return []*Invocation{}, nil |
| + case err != nil: |
| + return nil, err |
| + } |
| + } |
| return entities, nil |
| } |
| @@ -1051,7 +1152,12 @@ func (e *engineImpl) executeInvAction(c context.Context, payload *actionTaskPayl |
| } |
| } |
| -func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggeredBy identity.Identity) (int64, error) { |
| +func (e *engineImpl) TriggerInvocation(c context.Context, jobID string) (int64, error) { |
| + // First, we check ACLs. |
| + if _, err := e.getOwnedJob(c, jobID); err != nil { |
| + return 0, err |
| + } |
| + |
| var err error |
| var invNonce int64 |
| err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) error { |
| @@ -1065,7 +1171,7 @@ func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere |
| } |
| invNonce = 0 |
| return e.rollSM(c, job, func(sm *StateMachine) error { |
| - if err := sm.OnManualInvocation(triggeredBy); err != nil { |
| + if err := sm.OnManualInvocation(auth.CurrentIdentity(c)); err != nil { |
| return err |
| } |
| invNonce = sm.State.InvocationNonce |
| @@ -1078,12 +1184,20 @@ func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere |
| return invNonce, err |
| } |
| -func (e *engineImpl) PauseJob(c context.Context, jobID string, who identity.Identity) error { |
| - return e.setPausedFlag(c, jobID, true, who) |
| +func (e *engineImpl) PauseJob(c context.Context, jobID string) error { |
| + // First, we check ACLs. |
| + if _, err := e.getOwnedJob(c, jobID); err != nil { |
|
Vadim Sh.
2017/08/04 23:07:26
why not move it into 'setPausedFlag'?
tandrii(chromium)
2017/08/07 12:48:17
Good point. Done.
|
| + return err |
| + } |
| + return e.setPausedFlag(c, jobID, true, auth.CurrentIdentity(c)) |
| } |
| -func (e *engineImpl) ResumeJob(c context.Context, jobID string, who identity.Identity) error { |
| - return e.setPausedFlag(c, jobID, false, who) |
| +func (e *engineImpl) ResumeJob(c context.Context, jobID string) error { |
| + // First, we check ACLs. |
| + if _, err := e.getOwnedJob(c, jobID); err != nil { |
| + return err |
| + } |
| + return e.setPausedFlag(c, jobID, false, auth.CurrentIdentity(c)) |
| } |
| func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool, who identity.Identity) error { |
| @@ -1107,13 +1221,20 @@ func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool, |
| }) |
| } |
| -func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int64, who identity.Identity) error { |
| +func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int64) error { |
| + if _, err := e.getOwnedJob(c, jobID); err != nil { |
| + return err |
| + } |
| + return e.abortInvocation(c, jobID, invID) |
| +} |
| + |
| +func (e *engineImpl) abortInvocation(c context.Context, jobID string, invID int64) error { |
| c = logging.SetField(c, "JobID", jobID) |
| c = logging.SetField(c, "InvID", invID) |
| var inv *Invocation |
| var err error |
| - switch inv, err = e.GetInvocation(c, jobID, invID); { |
| + switch inv, err = e.getInvocation(c, jobID, invID); { |
| case err != nil: |
| logging.Errorf(c, "Failed to fetch the invocation - %s", err) |
| return err |
| @@ -1130,7 +1251,7 @@ func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6 |
| return err |
| } |
| - ctl.DebugLog("Invocation is manually aborted by %q", who) |
| + ctl.DebugLog("Invocation is manually aborted by %q", auth.CurrentUser(c)) |
| if err = ctl.manager.AbortTask(c, ctl); err != nil { |
| logging.Errorf(c, "Failed to abort the task - %s", err) |
| return err |
| @@ -1148,8 +1269,12 @@ func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6 |
| // running invocation (if any). |
| // |
| // Returns nil if the job is not currently running. |
| -func (e *engineImpl) AbortJob(c context.Context, jobID string, who identity.Identity) error { |
| - // First we switch the job to the default state and disassociate the running |
| +func (e *engineImpl) AbortJob(c context.Context, jobID string) error { |
| + // First, we check ACLs. |
| + if _, err := e.getOwnedJob(c, jobID); err != nil { |
| + return err |
| + } |
| + // Second, we switch the job to the default state and disassociate the running |
| // invocation (if any) from the job entity. |
| var invID int64 |
| err := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) error { |
| @@ -1170,7 +1295,7 @@ func (e *engineImpl) AbortJob(c context.Context, jobID string, who identity.Iden |
| // an RPC to remote service (e.g. to cancel a task) that can't be done from |
| // the transaction. |
| if invID != 0 { |
| - return e.AbortInvocation(c, jobID, invID, who) |
| + return e.abortInvocation(c, jobID, invID) |
| } |
| return nil |
| } |
| @@ -1317,7 +1442,7 @@ func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID |
| c = logging.SetField(c, "InvID", invID) |
| logging.Infof(c, "Handling invocation timer %q", timer.Name) |
| - inv, err := e.GetInvocation(c, jobID, invID) |
| + inv, err := e.getInvocation(c, jobID, invID) |
| if err != nil { |
| logging.Errorf(c, "Failed to fetch the invocation - %s", err) |
| return err |
| @@ -1702,7 +1827,7 @@ func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe |
| c = logging.SetField(c, "JobID", jobID) |
| c = logging.SetField(c, "InvID", invID) |
| - inv, err := e.GetInvocation(c, jobID, invID) |
| + inv, err := e.getInvocation(c, jobID, invID) |
| if err != nil { |
| logging.Errorf(c, "Failed to fetch the invocation - %s", err) |
| return err |