Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1003)

Unified Diff: scheduler/appengine/engine/engine.go

Issue 2986033003: [scheduler]: ACLs phase 1 - per Job ACL specification and enforcement. (Closed)
Patch Set: Review. Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « scheduler/appengine/catalog/catalog_test.go ('k') | scheduler/appengine/engine/engine_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: scheduler/appengine/engine/engine.go
diff --git a/scheduler/appengine/engine/engine.go b/scheduler/appengine/engine/engine.go
index 57799b38eda49460691045f44521bc2fdc68d4d2..075410e638a78129e843e52b3e247c0cb0015514 100644
--- a/scheduler/appengine/engine/engine.go
+++ b/scheduler/appengine/engine/engine.go
@@ -48,50 +48,95 @@ import (
"github.com/luci/luci-go/server/auth/signing"
"github.com/luci/luci-go/server/tokens"
+ "github.com/luci/luci-go/scheduler/appengine/acl"
"github.com/luci/luci-go/scheduler/appengine/catalog"
"github.com/luci/luci-go/scheduler/appengine/schedule"
"github.com/luci/luci-go/scheduler/appengine/task"
)
var (
- ErrNoSuchJob = errors.New("no such job")
- ErrNoSuchInvocation = errors.New("the invocation doesn't exist")
+ ErrNoOwnerPermission = errors.New("no OWNER permission on a job")
+ ErrNoSuchJob = errors.New("no such job")
+ ErrNoSuchInvocation = errors.New("the invocation doesn't exist")
)
// Engine manages all scheduler jobs: keeps track of their state, runs state
// machine transactions, starts new invocations, etc. A method returns
// errors.Transient if the error is non-fatal and the call should be retried
// later. Any other error means that retry won't help.
+// ACLs are enforced unlike EngineInternal with the following implications:
+// * if caller lacks READER access to Jobs, methods behave as if Jobs do not
+// exist.
+// * if caller lacks OWNER access, calling mutating methods will result in
+// ErrNoOwnerPermission (assuming caller has READER access, else see above).
type Engine interface {
- // GetAllProjects returns a list of all projects that have at least one
- // enabled scheduler job.
- GetAllProjects(c context.Context) ([]string, error)
-
- // GetAllJobs returns a list of all enabled scheduler jobs in no particular
- // order.
- GetAllJobs(c context.Context) ([]*Job, error)
+ // GetVisibleJobs returns a list of all enabled scheduler jobs in no
+ // particular order.
+ GetVisibleJobs(c context.Context) ([]*Job, error)
- // GetProjectJobs returns a list of enabled scheduler jobs of some project
- // in no particular order.
- GetProjectJobs(c context.Context, projectID string) ([]*Job, error)
+ // GetVisibleProjectJobs returns a list of enabled scheduler jobs of some
+ // project in no particular order.
+ GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, error)
- // GetJob returns single scheduler job given its full ID or nil if no such
- // job.
- GetJob(c context.Context, jobID string) (*Job, error)
+ // GetVisibleJob returns single scheduler job given its full ID or nil if no such
+ // job or if not visible.
+ GetVisibleJob(c context.Context, jobID string) (*Job, error)
- // ListInvocations returns invocations of a job, most recent first.
+ // ListVisibleInvocations returns invocations of a visible job, most recent first.
// Returns fetched invocations and cursor string if there's more.
- ListInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error)
+ ListVisibleInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error)
- // GetInvocation returns single invocation of some job given its ID.
- GetInvocation(c context.Context, jobID string, invID int64) (*Invocation, error)
+ // GetVisibleInvocation returns single invocation of some job given its ID.
+ GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Invocation, error)
- // GetInvocationsByNonce returns a list of Invocations with given nonce.
+ // GetVisibleInvocationsByNonce returns a list of Invocations with given nonce.
//
// Invocation nonce is a random number that identifies an intent to start
// an invocation. Normally one nonce corresponds to one Invocation entity,
// but there can be more if job fails to start with a transient error.
- GetInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error)
+ GetVisibleInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error)
+
+ // PauseJob replaces job's schedule with "triggered", effectively preventing
+ // it from running automatically (until it is resumed). Manual invocations are
+ // still allowed. Does nothing if job is already paused. Any pending or
+ // running invocations are still executed.
+ PauseJob(c context.Context, jobID string) error
+
+ // ResumeJob resumes paused job. Does nothing if the job is not paused.
+ ResumeJob(c context.Context, jobID string) error
+
+ // AbortJob resets the job to scheduled state, aborting a currently pending or
+ // running invocation (if any).
+ //
+ // Returns nil if the job is not currently running.
+ AbortJob(c context.Context, jobID string) error
+
+ // AbortInvocation forcefully moves the invocation to failed state.
+ //
+ // It opportunistically tries to send "abort" signal to a job runner if it
+ // supports cancellation, but it doesn't wait for reply. It proceeds to
+ // modifying local state in the scheduler service datastore immediately.
+ //
+ // AbortInvocation can be used to manually "unstuck" jobs that got stuck due
+ // to missing PubSub notifications or other kinds of unexpected conditions.
+ //
+ // Does nothing if invocation is already in some final state.
+ AbortInvocation(c context.Context, jobID string, invID int64) error
+
+ // TriggerInvocation launches job invocation right now if job isn't running
+ // now. Used by "Run now" UI button.
+ //
+ // Returns new invocation nonce (a random number that identifies an intent to
+ // start an invocation). Normally one nonce corresponds to one Invocation
+ // entity, but there can be more if job fails to start with a transient error.
+ TriggerInvocation(c context.Context, jobID string) (int64, error)
+}
+
+// EngineInternal is to be used by frontend initialization code only.
+type EngineInternal interface {
+ // GetAllProjects returns a list of all projects that have at least one
+ // enabled scheduler job.
+ GetAllProjects(c context.Context) ([]string, error)
// UpdateProjectJobs adds new, removes old and updates existing jobs.
UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error
@@ -119,40 +164,8 @@ type Engine interface {
// server, since dev server can't accept PubSub push messages.
PullPubSubOnDevServer(c context.Context, taskManagerName, publisher string) error
- // TriggerInvocation launches job invocation right now if job isn't running
- // now. Used by "Run now" UI button.
- //
- // Returns new invocation nonce (a random number that identifies an intent to
- // start an invocation). Normally one nonce corresponds to one Invocation
- // entity, but there can be more if job fails to start with a transient error.
- TriggerInvocation(c context.Context, jobID string, triggeredBy identity.Identity) (int64, error)
-
- // PauseJob replaces job's schedule with "triggered", effectively preventing
- // it from running automatically (until it is resumed). Manual invocations are
- // still allowed. Does nothing if job is already paused. Any pending or
- // running invocations are still executed.
- PauseJob(c context.Context, jobID string, who identity.Identity) error
-
- // ResumeJob resumes paused job. Doesn't nothing if the job is not paused.
- ResumeJob(c context.Context, jobID string, who identity.Identity) error
-
- // AbortInvocation forcefully moves the invocation to failed state.
- //
- // It opportunistically tries to send "abort" signal to a job runner if it
- // supports cancellation, but it doesn't wait for reply. It proceeds to
- // modifying local state in the scheduler service datastore immediately.
- //
- // AbortInvocation can be used to manually "unstuck" jobs that got stuck due
- // to missing PubSub notifications or other kinds of unexpected conditions.
- //
- // Does nothing if invocation is already in some final state.
- AbortInvocation(c context.Context, jobID string, invID int64, who identity.Identity) error
-
- // AbortJob resets the job to scheduled state, aborting a currently pending or
- // running invocation (if any).
- //
- // Returns nil if the job is not currently running.
- AbortJob(c context.Context, jobID string, who identity.Identity) error
+ // PublicAPI returns ACL-enforced API.
+ PublicAPI() Engine
}
// Config contains parameters for the engine.
@@ -165,8 +178,8 @@ type Config struct {
PubSubPushPath string // URL to use in PubSub push config
}
-// NewEngine returns default implementation of Engine.
-func NewEngine(conf Config) Engine {
+// NewEngine returns default implementation of EngineInternal.
+func NewEngine(conf Config) EngineInternal {
return &engineImpl{
Config: conf,
doneFlags: make(map[string]bool),
@@ -269,6 +282,9 @@ type Job struct {
// of the engine. See Catalog.UnmarshalTask().
Task []byte `gae:",noindex"`
+ // ACLs are the latest ACLs applied to Job and all its invocations.
+ Acls acl.GrantsByRole `gae:",noindex"`
+
// State is the job's state machine state, see StateMachine.
State JobState
}
@@ -328,6 +344,7 @@ func (e *Job) matches(def catalog.Definition) bool {
return e.JobID == def.JobID &&
e.Flavor == def.Flavor &&
e.Schedule == def.Schedule &&
+ e.Acls.Equal(&def.Acls) &&
bytes.Equal(e.Task, def.Task)
}
@@ -538,7 +555,8 @@ func debugLog(c context.Context, str *string, format string, args ...interface{}
*str += prefix + fmt.Sprintf(format+"\n", args...)
}
-////
+////////////////////////////////////////////////////////////////////////////////
+// engineImpl.
type engineImpl struct {
Config
@@ -619,17 +637,17 @@ func (e *engineImpl) GetAllProjects(c context.Context) ([]string, error) {
return out, nil
}
-func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) {
+func (e *engineImpl) GetVisibleJobs(c context.Context) ([]*Job, error) {
q := ds.NewQuery("Job").Eq("Enabled", true)
- return e.queryEnabledJobs(c, q)
+ return e.queryEnabledVisibleJobs(c, q)
}
-func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job, error) {
+func (e *engineImpl) GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, error) {
q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID)
- return e.queryEnabledJobs(c, q)
+ return e.queryEnabledVisibleJobs(c, q)
}
-func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, error) {
+func (e *engineImpl) queryEnabledVisibleJobs(c context.Context, q *ds.Query) ([]*Job, error) {
entities := []*Job{}
if err := ds.GetAll(c, q, &entities); err != nil {
return nil, transient.Tag.Apply(err)
@@ -637,14 +655,62 @@ func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e
// Non-ancestor query used, need to recheck filters.
filtered := make([]*Job, 0, len(entities))
for _, job := range entities {
- if job.Enabled {
+ if !job.Enabled {
+ continue
+ }
+ // TODO(tandrii): improve batch ACLs check here to take advantage of likely
+ // shared ACLs between most jobs of the same project.
+ if ok, err := job.Acls.IsReader(c); err != nil {
+ return nil, transient.Tag.Apply(err)
+ } else if ok {
filtered = append(filtered, job)
}
}
return filtered, nil
}
-func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) {
+func (e *engineImpl) GetVisibleJob(c context.Context, jobID string) (*Job, error) {
+ job, err := e.getJob(c, jobID)
+ if err != nil {
+ return nil, err
+ } else if job == nil {
+ return nil, ErrNoSuchJob
+ }
+ if ok, err := job.Acls.IsReader(c); err != nil {
+ return nil, err
+ } else if !ok {
+ return nil, ErrNoSuchJob
+ }
+ return job, nil
+}
+
+func (e *engineImpl) getOwnedJob(c context.Context, jobID string) (*Job, error) {
+ job, err := e.getJob(c, jobID)
+ if err != nil {
+ return nil, err
+ } else if job == nil {
+ return nil, ErrNoSuchJob
+ }
+
+ switch owner, err := job.Acls.IsOwner(c); {
+ case err != nil:
+ return nil, err
+ case owner:
+ return job, nil
+ }
+
+ // Not owner, but maybe reader? Give nicer error in such case.
+ switch reader, err := job.Acls.IsReader(c); {
+ case err != nil:
+ return nil, err
+ case reader:
+ return nil, ErrNoOwnerPermission
+ default:
+ return nil, ErrNoSuchJob
+ }
+}
+
+func (e *engineImpl) getJob(c context.Context, jobID string) (*Job, error) {
job := &Job{JobID: jobID}
switch err := ds.Get(c, job); {
case err == nil:
@@ -656,7 +722,18 @@ func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) {
}
}
-func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error) {
+func (e *engineImpl) PublicAPI() Engine {
+ return e
+}
+
+func (e *engineImpl) ListVisibleInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error) {
+ if job, err := e.GetVisibleJob(c, jobID); err != nil {
+ return nil, "", err
+ } else if job == nil {
+ // Either no Job or no access, both imply returning no invocations.
+ return nil, "", ErrNoSuchInvocation
+ }
+
if pageSize == 0 || pageSize > 500 {
pageSize = 500
}
@@ -688,11 +765,11 @@ func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i
if len(out) < pageSize {
return nil
}
- c, err := getCursor()
+ cur, err := getCursor()
if err != nil {
return err
}
- newCursor = c.String()
+ newCursor = cur.String()
return ds.Stop
})
if err != nil {
@@ -701,7 +778,18 @@ func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i
return out, newCursor, nil
}
-func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) {
+func (e *engineImpl) GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) {
+ switch _, err := e.GetVisibleJob(c, jobID); {
+ case err == ErrNoSuchJob:
+ return nil, ErrNoSuchInvocation
+ case err != nil:
+ return nil, err
+ default:
+ return e.getInvocation(c, jobID, invID)
+ }
+}
+
+func (e *engineImpl) getInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) {
inv := &Invocation{
ID: invID,
JobKey: ds.NewKey(c, "Job", jobID, 0, nil),
@@ -716,12 +804,21 @@ func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64)
}
}
-func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error) {
+func (e *engineImpl) GetVisibleInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error) {
q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce)
entities := []*Invocation{}
if err := ds.GetAll(c, q, &entities); err != nil {
return nil, transient.Tag.Apply(err)
}
+ if len(entities) > 0 {
+ // All Invocations with the same nonce must belong to the same Job.
+ switch _, err := e.GetVisibleJob(c, entities[0].JobKey.StringID()); {
+ case err == ErrNoSuchJob:
+ return []*Invocation{}, nil
+ case err != nil:
+ return nil, err
+ }
+ }
return entities, nil
}
@@ -1051,7 +1148,12 @@ func (e *engineImpl) executeInvAction(c context.Context, payload *actionTaskPayl
}
}
-func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggeredBy identity.Identity) (int64, error) {
+func (e *engineImpl) TriggerInvocation(c context.Context, jobID string) (int64, error) {
+ // First, we check ACLs.
+ if _, err := e.getOwnedJob(c, jobID); err != nil {
+ return 0, err
+ }
+
var err error
var invNonce int64
err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) error {
@@ -1065,7 +1167,7 @@ func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere
}
invNonce = 0
return e.rollSM(c, job, func(sm *StateMachine) error {
- if err := sm.OnManualInvocation(triggeredBy); err != nil {
+ if err := sm.OnManualInvocation(auth.CurrentIdentity(c)); err != nil {
return err
}
invNonce = sm.State.InvocationNonce
@@ -1078,15 +1180,20 @@ func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere
return invNonce, err
}
-func (e *engineImpl) PauseJob(c context.Context, jobID string, who identity.Identity) error {
- return e.setPausedFlag(c, jobID, true, who)
+func (e *engineImpl) PauseJob(c context.Context, jobID string) error {
+ return e.setPausedFlag(c, jobID, true, auth.CurrentIdentity(c))
}
-func (e *engineImpl) ResumeJob(c context.Context, jobID string, who identity.Identity) error {
- return e.setPausedFlag(c, jobID, false, who)
+func (e *engineImpl) ResumeJob(c context.Context, jobID string) error {
+ return e.setPausedFlag(c, jobID, false, auth.CurrentIdentity(c))
}
func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool, who identity.Identity) error {
+ // First, we check ACLs outside of transaction. Yes, this allows for races
+ // between (un)pausing and ACLs changes but these races have no impact.
+ if _, err := e.getOwnedJob(c, jobID); err != nil {
+ return err
+ }
return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) error {
if isNew || !job.Enabled {
return ErrNoSuchJob
@@ -1107,13 +1214,20 @@ func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool,
})
}
-func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int64, who identity.Identity) error {
+func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int64) error {
+ if _, err := e.getOwnedJob(c, jobID); err != nil {
+ return err
+ }
+ return e.abortInvocation(c, jobID, invID)
+}
+
+func (e *engineImpl) abortInvocation(c context.Context, jobID string, invID int64) error {
c = logging.SetField(c, "JobID", jobID)
c = logging.SetField(c, "InvID", invID)
var inv *Invocation
var err error
- switch inv, err = e.GetInvocation(c, jobID, invID); {
+ switch inv, err = e.getInvocation(c, jobID, invID); {
case err != nil:
logging.Errorf(c, "Failed to fetch the invocation - %s", err)
return err
@@ -1130,7 +1244,7 @@ func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6
return err
}
- ctl.DebugLog("Invocation is manually aborted by %q", who)
+ ctl.DebugLog("Invocation is manually aborted by %q", auth.CurrentUser(c))
if err = ctl.manager.AbortTask(c, ctl); err != nil {
logging.Errorf(c, "Failed to abort the task - %s", err)
return err
@@ -1148,8 +1262,12 @@ func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6
// running invocation (if any).
//
// Returns nil if the job is not currently running.
-func (e *engineImpl) AbortJob(c context.Context, jobID string, who identity.Identity) error {
- // First we switch the job to the default state and disassociate the running
+func (e *engineImpl) AbortJob(c context.Context, jobID string) error {
+ // First, we check ACLs.
+ if _, err := e.getOwnedJob(c, jobID); err != nil {
+ return err
+ }
+ // Second, we switch the job to the default state and disassociate the running
// invocation (if any) from the job entity.
var invID int64
err := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) error {
@@ -1170,7 +1288,7 @@ func (e *engineImpl) AbortJob(c context.Context, jobID string, who identity.Iden
// an RPC to remote service (e.g. to cancel a task) that can't be done from
// the transaction.
if invID != 0 {
- return e.AbortInvocation(c, jobID, invID, who)
+ return e.abortInvocation(c, jobID, invID)
}
return nil
}
@@ -1317,7 +1435,7 @@ func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID
c = logging.SetField(c, "InvID", invID)
logging.Infof(c, "Handling invocation timer %q", timer.Name)
- inv, err := e.GetInvocation(c, jobID, invID)
+ inv, err := e.getInvocation(c, jobID, invID)
if err != nil {
logging.Errorf(c, "Failed to fetch the invocation - %s", err)
return err
@@ -1702,7 +1820,7 @@ func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe
c = logging.SetField(c, "JobID", jobID)
c = logging.SetField(c, "InvID", invID)
- inv, err := e.GetInvocation(c, jobID, invID)
+ inv, err := e.getInvocation(c, jobID, invID)
if err != nil {
logging.Errorf(c, "Failed to fetch the invocation - %s", err)
return err
« no previous file with comments | « scheduler/appengine/catalog/catalog_test.go ('k') | scheduler/appengine/engine/engine_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698