| Index: scheduler/appengine/engine/engine.go
|
| diff --git a/scheduler/appengine/engine/engine.go b/scheduler/appengine/engine/engine.go
|
| index 57799b38eda49460691045f44521bc2fdc68d4d2..075410e638a78129e843e52b3e247c0cb0015514 100644
|
| --- a/scheduler/appengine/engine/engine.go
|
| +++ b/scheduler/appengine/engine/engine.go
|
| @@ -48,50 +48,95 @@ import (
|
| "github.com/luci/luci-go/server/auth/signing"
|
| "github.com/luci/luci-go/server/tokens"
|
|
|
| + "github.com/luci/luci-go/scheduler/appengine/acl"
|
| "github.com/luci/luci-go/scheduler/appengine/catalog"
|
| "github.com/luci/luci-go/scheduler/appengine/schedule"
|
| "github.com/luci/luci-go/scheduler/appengine/task"
|
| )
|
|
|
| var (
|
| - ErrNoSuchJob = errors.New("no such job")
|
| - ErrNoSuchInvocation = errors.New("the invocation doesn't exist")
|
| + ErrNoOwnerPermission = errors.New("no OWNER permission on a job")
|
| + ErrNoSuchJob = errors.New("no such job")
|
| + ErrNoSuchInvocation = errors.New("the invocation doesn't exist")
|
| )
|
|
|
| // Engine manages all scheduler jobs: keeps track of their state, runs state
|
| // machine transactions, starts new invocations, etc. A method returns
|
| // errors.Transient if the error is non-fatal and the call should be retried
|
| // later. Any other error means that retry won't help.
|
| +// ACLs are enforced unlike EngineInternal with the following implications:
|
| +// * if caller lacks READER access to Jobs, methods behave as if Jobs do not
|
| +// exist.
|
| +// * if caller lacks OWNER access, calling mutating methods will result in
|
| +// ErrNoOwnerPermission (assuming caller has READER access, else see above).
|
| type Engine interface {
|
| - // GetAllProjects returns a list of all projects that have at least one
|
| - // enabled scheduler job.
|
| - GetAllProjects(c context.Context) ([]string, error)
|
| -
|
| - // GetAllJobs returns a list of all enabled scheduler jobs in no particular
|
| - // order.
|
| - GetAllJobs(c context.Context) ([]*Job, error)
|
| + // GetVisibleJobs returns a list of all enabled scheduler jobs in no
|
| + // particular order.
|
| + GetVisibleJobs(c context.Context) ([]*Job, error)
|
|
|
| - // GetProjectJobs returns a list of enabled scheduler jobs of some project
|
| - // in no particular order.
|
| - GetProjectJobs(c context.Context, projectID string) ([]*Job, error)
|
| + // GetVisibleProjectJobs returns a list of enabled scheduler jobs of some
|
| + // project in no particular order.
|
| + GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, error)
|
|
|
| - // GetJob returns single scheduler job given its full ID or nil if no such
|
| - // job.
|
| - GetJob(c context.Context, jobID string) (*Job, error)
|
| + // GetVisibleJob returns single scheduler job given its full ID or nil if no such
|
| + // job or if not visible.
|
| + GetVisibleJob(c context.Context, jobID string) (*Job, error)
|
|
|
| - // ListInvocations returns invocations of a job, most recent first.
|
| + // ListVisibleInvocations returns invocations of a visible job, most recent first.
|
| // Returns fetched invocations and cursor string if there's more.
|
| - ListInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error)
|
| + ListVisibleInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error)
|
|
|
| - // GetInvocation returns single invocation of some job given its ID.
|
| - GetInvocation(c context.Context, jobID string, invID int64) (*Invocation, error)
|
| + // GetVisibleInvocation returns single invocation of some job given its ID.
|
| + GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Invocation, error)
|
|
|
| - // GetInvocationsByNonce returns a list of Invocations with given nonce.
|
| + // GetVisibleInvocationsByNonce returns a list of Invocations with given nonce.
|
| //
|
| // Invocation nonce is a random number that identifies an intent to start
|
| // an invocation. Normally one nonce corresponds to one Invocation entity,
|
| // but there can be more if job fails to start with a transient error.
|
| - GetInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error)
|
| + GetVisibleInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error)
|
| +
|
| + // PauseJob replaces job's schedule with "triggered", effectively preventing
|
| + // it from running automatically (until it is resumed). Manual invocations are
|
| + // still allowed. Does nothing if job is already paused. Any pending or
|
| + // running invocations are still executed.
|
| + PauseJob(c context.Context, jobID string) error
|
| +
|
| + // ResumeJob resumes paused job. Does nothing if the job is not paused.
|
| + ResumeJob(c context.Context, jobID string) error
|
| +
|
| + // AbortJob resets the job to scheduled state, aborting a currently pending or
|
| + // running invocation (if any).
|
| + //
|
| + // Returns nil if the job is not currently running.
|
| + AbortJob(c context.Context, jobID string) error
|
| +
|
| + // AbortInvocation forcefully moves the invocation to failed state.
|
| + //
|
| + // It opportunistically tries to send "abort" signal to a job runner if it
|
| + // supports cancellation, but it doesn't wait for reply. It proceeds to
|
| + // modifying local state in the scheduler service datastore immediately.
|
| + //
|
| + // AbortInvocation can be used to manually "unstuck" jobs that got stuck due
|
| + // to missing PubSub notifications or other kinds of unexpected conditions.
|
| + //
|
| + // Does nothing if invocation is already in some final state.
|
| + AbortInvocation(c context.Context, jobID string, invID int64) error
|
| +
|
| + // TriggerInvocation launches job invocation right now if job isn't running
|
| + // now. Used by "Run now" UI button.
|
| + //
|
| + // Returns new invocation nonce (a random number that identifies an intent to
|
| + // start an invocation). Normally one nonce corresponds to one Invocation
|
| + // entity, but there can be more if job fails to start with a transient error.
|
| + TriggerInvocation(c context.Context, jobID string) (int64, error)
|
| +}
|
| +
|
| +// EngineInternal is to be used by frontend initialization code only.
|
| +type EngineInternal interface {
|
| + // GetAllProjects returns a list of all projects that have at least one
|
| + // enabled scheduler job.
|
| + GetAllProjects(c context.Context) ([]string, error)
|
|
|
| // UpdateProjectJobs adds new, removes old and updates existing jobs.
|
| UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error
|
| @@ -119,40 +164,8 @@ type Engine interface {
|
| // server, since dev server can't accept PubSub push messages.
|
| PullPubSubOnDevServer(c context.Context, taskManagerName, publisher string) error
|
|
|
| - // TriggerInvocation launches job invocation right now if job isn't running
|
| - // now. Used by "Run now" UI button.
|
| - //
|
| - // Returns new invocation nonce (a random number that identifies an intent to
|
| - // start an invocation). Normally one nonce corresponds to one Invocation
|
| - // entity, but there can be more if job fails to start with a transient error.
|
| - TriggerInvocation(c context.Context, jobID string, triggeredBy identity.Identity) (int64, error)
|
| -
|
| - // PauseJob replaces job's schedule with "triggered", effectively preventing
|
| - // it from running automatically (until it is resumed). Manual invocations are
|
| - // still allowed. Does nothing if job is already paused. Any pending or
|
| - // running invocations are still executed.
|
| - PauseJob(c context.Context, jobID string, who identity.Identity) error
|
| -
|
| - // ResumeJob resumes paused job. Doesn't nothing if the job is not paused.
|
| - ResumeJob(c context.Context, jobID string, who identity.Identity) error
|
| -
|
| - // AbortInvocation forcefully moves the invocation to failed state.
|
| - //
|
| - // It opportunistically tries to send "abort" signal to a job runner if it
|
| - // supports cancellation, but it doesn't wait for reply. It proceeds to
|
| - // modifying local state in the scheduler service datastore immediately.
|
| - //
|
| - // AbortInvocation can be used to manually "unstuck" jobs that got stuck due
|
| - // to missing PubSub notifications or other kinds of unexpected conditions.
|
| - //
|
| - // Does nothing if invocation is already in some final state.
|
| - AbortInvocation(c context.Context, jobID string, invID int64, who identity.Identity) error
|
| -
|
| - // AbortJob resets the job to scheduled state, aborting a currently pending or
|
| - // running invocation (if any).
|
| - //
|
| - // Returns nil if the job is not currently running.
|
| - AbortJob(c context.Context, jobID string, who identity.Identity) error
|
| + // PublicAPI returns ACL-enforced API.
|
| + PublicAPI() Engine
|
| }
|
|
|
| // Config contains parameters for the engine.
|
| @@ -165,8 +178,8 @@ type Config struct {
|
| PubSubPushPath string // URL to use in PubSub push config
|
| }
|
|
|
| -// NewEngine returns default implementation of Engine.
|
| -func NewEngine(conf Config) Engine {
|
| +// NewEngine returns default implementation of EngineInternal.
|
| +func NewEngine(conf Config) EngineInternal {
|
| return &engineImpl{
|
| Config: conf,
|
| doneFlags: make(map[string]bool),
|
| @@ -269,6 +282,9 @@ type Job struct {
|
| // of the engine. See Catalog.UnmarshalTask().
|
| Task []byte `gae:",noindex"`
|
|
|
| + // ACLs are the latest ACLs applied to Job and all its invocations.
|
| + Acls acl.GrantsByRole `gae:",noindex"`
|
| +
|
| // State is the job's state machine state, see StateMachine.
|
| State JobState
|
| }
|
| @@ -328,6 +344,7 @@ func (e *Job) matches(def catalog.Definition) bool {
|
| return e.JobID == def.JobID &&
|
| e.Flavor == def.Flavor &&
|
| e.Schedule == def.Schedule &&
|
| + e.Acls.Equal(&def.Acls) &&
|
| bytes.Equal(e.Task, def.Task)
|
| }
|
|
|
| @@ -538,7 +555,8 @@ func debugLog(c context.Context, str *string, format string, args ...interface{}
|
| *str += prefix + fmt.Sprintf(format+"\n", args...)
|
| }
|
|
|
| -////
|
| +////////////////////////////////////////////////////////////////////////////////
|
| +// engineImpl.
|
|
|
| type engineImpl struct {
|
| Config
|
| @@ -619,17 +637,17 @@ func (e *engineImpl) GetAllProjects(c context.Context) ([]string, error) {
|
| return out, nil
|
| }
|
|
|
| -func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) {
|
| +func (e *engineImpl) GetVisibleJobs(c context.Context) ([]*Job, error) {
|
| q := ds.NewQuery("Job").Eq("Enabled", true)
|
| - return e.queryEnabledJobs(c, q)
|
| + return e.queryEnabledVisibleJobs(c, q)
|
| }
|
|
|
| -func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job, error) {
|
| +func (e *engineImpl) GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, error) {
|
| q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID)
|
| - return e.queryEnabledJobs(c, q)
|
| + return e.queryEnabledVisibleJobs(c, q)
|
| }
|
|
|
| -func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, error) {
|
| +func (e *engineImpl) queryEnabledVisibleJobs(c context.Context, q *ds.Query) ([]*Job, error) {
|
| entities := []*Job{}
|
| if err := ds.GetAll(c, q, &entities); err != nil {
|
| return nil, transient.Tag.Apply(err)
|
| @@ -637,14 +655,62 @@ func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e
|
| // Non-ancestor query used, need to recheck filters.
|
| filtered := make([]*Job, 0, len(entities))
|
| for _, job := range entities {
|
| - if job.Enabled {
|
| + if !job.Enabled {
|
| + continue
|
| + }
|
| + // TODO(tandrii): improve batch ACLs check here to take advantage of likely
|
| + // shared ACLs between most jobs of the same project.
|
| + if ok, err := job.Acls.IsReader(c); err != nil {
|
| + return nil, transient.Tag.Apply(err)
|
| + } else if ok {
|
| filtered = append(filtered, job)
|
| }
|
| }
|
| return filtered, nil
|
| }
|
|
|
| -func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) {
|
| +func (e *engineImpl) GetVisibleJob(c context.Context, jobID string) (*Job, error) {
|
| + job, err := e.getJob(c, jobID)
|
| + if err != nil {
|
| + return nil, err
|
| + } else if job == nil {
|
| + return nil, ErrNoSuchJob
|
| + }
|
| + if ok, err := job.Acls.IsReader(c); err != nil {
|
| + return nil, err
|
| + } else if !ok {
|
| + return nil, ErrNoSuchJob
|
| + }
|
| + return job, nil
|
| +}
|
| +
|
| +func (e *engineImpl) getOwnedJob(c context.Context, jobID string) (*Job, error) {
|
| + job, err := e.getJob(c, jobID)
|
| + if err != nil {
|
| + return nil, err
|
| + } else if job == nil {
|
| + return nil, ErrNoSuchJob
|
| + }
|
| +
|
| + switch owner, err := job.Acls.IsOwner(c); {
|
| + case err != nil:
|
| + return nil, err
|
| + case owner:
|
| + return job, nil
|
| + }
|
| +
|
| + // Not owner, but maybe reader? Give nicer error in such case.
|
| + switch reader, err := job.Acls.IsReader(c); {
|
| + case err != nil:
|
| + return nil, err
|
| + case reader:
|
| + return nil, ErrNoOwnerPermission
|
| + default:
|
| + return nil, ErrNoSuchJob
|
| + }
|
| +}
|
| +
|
| +func (e *engineImpl) getJob(c context.Context, jobID string) (*Job, error) {
|
| job := &Job{JobID: jobID}
|
| switch err := ds.Get(c, job); {
|
| case err == nil:
|
| @@ -656,7 +722,18 @@ func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) {
|
| }
|
| }
|
|
|
| -func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error) {
|
| +func (e *engineImpl) PublicAPI() Engine {
|
| + return e
|
| +}
|
| +
|
| +func (e *engineImpl) ListVisibleInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error) {
|
| + if job, err := e.GetVisibleJob(c, jobID); err != nil {
|
| + return nil, "", err
|
| + } else if job == nil {
|
| + // Either no Job or no access, both imply returning no invocations.
|
| + return nil, "", ErrNoSuchInvocation
|
| + }
|
| +
|
| if pageSize == 0 || pageSize > 500 {
|
| pageSize = 500
|
| }
|
| @@ -688,11 +765,11 @@ func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i
|
| if len(out) < pageSize {
|
| return nil
|
| }
|
| - c, err := getCursor()
|
| + cur, err := getCursor()
|
| if err != nil {
|
| return err
|
| }
|
| - newCursor = c.String()
|
| + newCursor = cur.String()
|
| return ds.Stop
|
| })
|
| if err != nil {
|
| @@ -701,7 +778,18 @@ func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i
|
| return out, newCursor, nil
|
| }
|
|
|
| -func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) {
|
| +func (e *engineImpl) GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) {
|
| + switch _, err := e.GetVisibleJob(c, jobID); {
|
| + case err == ErrNoSuchJob:
|
| + return nil, ErrNoSuchInvocation
|
| + case err != nil:
|
| + return nil, err
|
| + default:
|
| + return e.getInvocation(c, jobID, invID)
|
| + }
|
| +}
|
| +
|
| +func (e *engineImpl) getInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) {
|
| inv := &Invocation{
|
| ID: invID,
|
| JobKey: ds.NewKey(c, "Job", jobID, 0, nil),
|
| @@ -716,12 +804,21 @@ func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64)
|
| }
|
| }
|
|
|
| -func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error) {
|
| +func (e *engineImpl) GetVisibleInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error) {
|
| q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce)
|
| entities := []*Invocation{}
|
| if err := ds.GetAll(c, q, &entities); err != nil {
|
| return nil, transient.Tag.Apply(err)
|
| }
|
| + if len(entities) > 0 {
|
| + // All Invocations with the same nonce must belong to the same Job.
|
| + switch _, err := e.GetVisibleJob(c, entities[0].JobKey.StringID()); {
|
| + case err == ErrNoSuchJob:
|
| + return []*Invocation{}, nil
|
| + case err != nil:
|
| + return nil, err
|
| + }
|
| + }
|
| return entities, nil
|
| }
|
|
|
| @@ -1051,7 +1148,12 @@ func (e *engineImpl) executeInvAction(c context.Context, payload *actionTaskPayl
|
| }
|
| }
|
|
|
| -func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggeredBy identity.Identity) (int64, error) {
|
| +func (e *engineImpl) TriggerInvocation(c context.Context, jobID string) (int64, error) {
|
| + // First, we check ACLs.
|
| + if _, err := e.getOwnedJob(c, jobID); err != nil {
|
| + return 0, err
|
| + }
|
| +
|
| var err error
|
| var invNonce int64
|
| err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) error {
|
| @@ -1065,7 +1167,7 @@ func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere
|
| }
|
| invNonce = 0
|
| return e.rollSM(c, job, func(sm *StateMachine) error {
|
| - if err := sm.OnManualInvocation(triggeredBy); err != nil {
|
| + if err := sm.OnManualInvocation(auth.CurrentIdentity(c)); err != nil {
|
| return err
|
| }
|
| invNonce = sm.State.InvocationNonce
|
| @@ -1078,15 +1180,20 @@ func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere
|
| return invNonce, err
|
| }
|
|
|
| -func (e *engineImpl) PauseJob(c context.Context, jobID string, who identity.Identity) error {
|
| - return e.setPausedFlag(c, jobID, true, who)
|
| +func (e *engineImpl) PauseJob(c context.Context, jobID string) error {
|
| + return e.setPausedFlag(c, jobID, true, auth.CurrentIdentity(c))
|
| }
|
|
|
| -func (e *engineImpl) ResumeJob(c context.Context, jobID string, who identity.Identity) error {
|
| - return e.setPausedFlag(c, jobID, false, who)
|
| +func (e *engineImpl) ResumeJob(c context.Context, jobID string) error {
|
| + return e.setPausedFlag(c, jobID, false, auth.CurrentIdentity(c))
|
| }
|
|
|
| func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool, who identity.Identity) error {
|
| + // First, we check ACLs outside of transaction. Yes, this allows for races
|
| + // between (un)pausing and ACLs changes but these races have no impact.
|
| + if _, err := e.getOwnedJob(c, jobID); err != nil {
|
| + return err
|
| + }
|
| return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) error {
|
| if isNew || !job.Enabled {
|
| return ErrNoSuchJob
|
| @@ -1107,13 +1214,20 @@ func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool,
|
| })
|
| }
|
|
|
| -func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int64, who identity.Identity) error {
|
| +func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int64) error {
|
| + if _, err := e.getOwnedJob(c, jobID); err != nil {
|
| + return err
|
| + }
|
| + return e.abortInvocation(c, jobID, invID)
|
| +}
|
| +
|
| +func (e *engineImpl) abortInvocation(c context.Context, jobID string, invID int64) error {
|
| c = logging.SetField(c, "JobID", jobID)
|
| c = logging.SetField(c, "InvID", invID)
|
|
|
| var inv *Invocation
|
| var err error
|
| - switch inv, err = e.GetInvocation(c, jobID, invID); {
|
| + switch inv, err = e.getInvocation(c, jobID, invID); {
|
| case err != nil:
|
| logging.Errorf(c, "Failed to fetch the invocation - %s", err)
|
| return err
|
| @@ -1130,7 +1244,7 @@ func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6
|
| return err
|
| }
|
|
|
| - ctl.DebugLog("Invocation is manually aborted by %q", who)
|
| + ctl.DebugLog("Invocation is manually aborted by %q", auth.CurrentUser(c))
|
| if err = ctl.manager.AbortTask(c, ctl); err != nil {
|
| logging.Errorf(c, "Failed to abort the task - %s", err)
|
| return err
|
| @@ -1148,8 +1262,12 @@ func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6
|
| // running invocation (if any).
|
| //
|
| // Returns nil if the job is not currently running.
|
| -func (e *engineImpl) AbortJob(c context.Context, jobID string, who identity.Identity) error {
|
| - // First we switch the job to the default state and disassociate the running
|
| +func (e *engineImpl) AbortJob(c context.Context, jobID string) error {
|
| + // First, we check ACLs.
|
| + if _, err := e.getOwnedJob(c, jobID); err != nil {
|
| + return err
|
| + }
|
| + // Second, we switch the job to the default state and disassociate the running
|
| // invocation (if any) from the job entity.
|
| var invID int64
|
| err := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) error {
|
| @@ -1170,7 +1288,7 @@ func (e *engineImpl) AbortJob(c context.Context, jobID string, who identity.Iden
|
| // an RPC to remote service (e.g. to cancel a task) that can't be done from
|
| // the transaction.
|
| if invID != 0 {
|
| - return e.AbortInvocation(c, jobID, invID, who)
|
| + return e.abortInvocation(c, jobID, invID)
|
| }
|
| return nil
|
| }
|
| @@ -1317,7 +1435,7 @@ func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID
|
| c = logging.SetField(c, "InvID", invID)
|
|
|
| logging.Infof(c, "Handling invocation timer %q", timer.Name)
|
| - inv, err := e.GetInvocation(c, jobID, invID)
|
| + inv, err := e.getInvocation(c, jobID, invID)
|
| if err != nil {
|
| logging.Errorf(c, "Failed to fetch the invocation - %s", err)
|
| return err
|
| @@ -1702,7 +1820,7 @@ func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe
|
|
|
| c = logging.SetField(c, "JobID", jobID)
|
| c = logging.SetField(c, "InvID", invID)
|
| - inv, err := e.GetInvocation(c, jobID, invID)
|
| + inv, err := e.getInvocation(c, jobID, invID)
|
| if err != nil {
|
| logging.Errorf(c, "Failed to fetch the invocation - %s", err)
|
| return err
|
|
|