| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| (...skipping 30 matching lines...) Expand all Loading... |
| 41 "github.com/luci/luci-go/common/data/rand/mathrand" | 41 "github.com/luci/luci-go/common/data/rand/mathrand" |
| 42 "github.com/luci/luci-go/common/data/stringset" | 42 "github.com/luci/luci-go/common/data/stringset" |
| 43 "github.com/luci/luci-go/common/errors" | 43 "github.com/luci/luci-go/common/errors" |
| 44 "github.com/luci/luci-go/common/logging" | 44 "github.com/luci/luci-go/common/logging" |
| 45 "github.com/luci/luci-go/common/retry/transient" | 45 "github.com/luci/luci-go/common/retry/transient" |
| 46 "github.com/luci/luci-go/server/auth" | 46 "github.com/luci/luci-go/server/auth" |
| 47 "github.com/luci/luci-go/server/auth/identity" | 47 "github.com/luci/luci-go/server/auth/identity" |
| 48 "github.com/luci/luci-go/server/auth/signing" | 48 "github.com/luci/luci-go/server/auth/signing" |
| 49 "github.com/luci/luci-go/server/tokens" | 49 "github.com/luci/luci-go/server/tokens" |
| 50 | 50 |
| 51 "github.com/luci/luci-go/scheduler/appengine/acl" |
| 51 "github.com/luci/luci-go/scheduler/appengine/catalog" | 52 "github.com/luci/luci-go/scheduler/appengine/catalog" |
| 52 "github.com/luci/luci-go/scheduler/appengine/schedule" | 53 "github.com/luci/luci-go/scheduler/appengine/schedule" |
| 53 "github.com/luci/luci-go/scheduler/appengine/task" | 54 "github.com/luci/luci-go/scheduler/appengine/task" |
| 54 ) | 55 ) |
| 55 | 56 |
| 56 var ( | 57 var ( |
| 57 » ErrNoSuchJob = errors.New("no such job") | 58 » ErrNoOwnerPermission = errors.New("no OWNER permission on a job") |
| 58 » ErrNoSuchInvocation = errors.New("the invocation doesn't exist") | 59 » ErrNoSuchJob = errors.New("no such job") |
| 60 » ErrNoSuchInvocation = errors.New("the invocation doesn't exist") |
| 59 ) | 61 ) |
| 60 | 62 |
| 61 // Engine manages all scheduler jobs: keeps track of their state, runs state | 63 // Engine manages all scheduler jobs: keeps track of their state, runs state |
| 62 // machine transactions, starts new invocations, etc. A method returns | 64 // machine transactions, starts new invocations, etc. A method returns |
| 63 // errors.Transient if the error is non-fatal and the call should be retried | 65 // errors.Transient if the error is non-fatal and the call should be retried |
| 64 // later. Any other error means that retry won't help. | 66 // later. Any other error means that retry won't help. |
| 67 // ACLs are enforced unlike EngineInternal with the following implications: |
| 68 // * if caller lacks READER access to Jobs, methods behave as if Jobs do not |
| 69 // exist. |
| 70 // * if caller lacks OWNER access, calling mutating methods will result in |
| 71 // ErrNoOwnerPermission (assuming caller has READER access, else see above). |
| 65 type Engine interface { | 72 type Engine interface { |
| 73 // GetVisibleJobs returns a list of all enabled scheduler jobs in no |
| 74 // particular order. |
| 75 GetVisibleJobs(c context.Context) ([]*Job, error) |
| 76 |
| 77 // GetVisibleProjectJobs returns a list of enabled scheduler jobs of som
e |
| 78 // project in no particular order. |
| 79 GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, erro
r) |
| 80 |
| 81 // GetVisibleJob returns single scheduler job given its full ID or nil i
f no such |
| 82 // job or if not visible. |
| 83 GetVisibleJob(c context.Context, jobID string) (*Job, error) |
| 84 |
| 85 // ListVisibleInvocations returns invocations of a visible job, most rec
ent first. |
| 86 // Returns fetched invocations and cursor string if there's more. |
| 87 ListVisibleInvocations(c context.Context, jobID string, pageSize int, cu
rsor string) ([]*Invocation, string, error) |
| 88 |
| 89 // GetVisibleInvocation returns single invocation of some job given its
ID. |
| 90 GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Inv
ocation, error) |
| 91 |
| 92 // GetVisibleInvocationsByNonce returns a list of Invocations with given
nonce. |
| 93 // |
| 94 // Invocation nonce is a random number that identifies an intent to star
t |
| 95 // an invocation. Normally one nonce corresponds to one Invocation entit
y, |
| 96 // but there can be more if job fails to start with a transient error. |
| 97 GetVisibleInvocationsByNonce(c context.Context, invNonce int64) ([]*Invo
cation, error) |
| 98 |
| 99 // PauseJob replaces job's schedule with "triggered", effectively preven
ting |
| 100 // it from running automatically (until it is resumed). Manual invocatio
ns are |
| 101 // still allowed. Does nothing if job is already paused. Any pending or |
| 102 // running invocations are still executed. |
| 103 PauseJob(c context.Context, jobID string) error |
| 104 |
| 105 // ResumeJob resumes paused job. Does nothing if the job is not paused. |
| 106 ResumeJob(c context.Context, jobID string) error |
| 107 |
| 108 // AbortJob resets the job to scheduled state, aborting a currently pend
ing or |
| 109 // running invocation (if any). |
| 110 // |
| 111 // Returns nil if the job is not currently running. |
| 112 AbortJob(c context.Context, jobID string) error |
| 113 |
| 114 // AbortInvocation forcefully moves the invocation to failed state. |
| 115 // |
| 116 // It opportunistically tries to send "abort" signal to a job runner if
it |
| 117 // supports cancellation, but it doesn't wait for reply. It proceeds to |
| 118 // modifying local state in the scheduler service datastore immediately. |
| 119 // |
| 120 // AbortInvocation can be used to manually "unstuck" jobs that got stuck
due |
| 121 // to missing PubSub notifications or other kinds of unexpected conditio
ns. |
| 122 // |
| 123 // Does nothing if invocation is already in some final state. |
| 124 AbortInvocation(c context.Context, jobID string, invID int64) error |
| 125 |
| 126 // TriggerInvocation launches job invocation right now if job isn't runn
ing |
| 127 // now. Used by "Run now" UI button. |
| 128 // |
| 129 // Returns new invocation nonce (a random number that identifies an inte
nt to |
| 130 // start an invocation). Normally one nonce corresponds to one Invocatio
n |
| 131 // entity, but there can be more if job fails to start with a transient
error. |
| 132 TriggerInvocation(c context.Context, jobID string) (int64, error) |
| 133 } |
| 134 |
| 135 // EngineInternal is to be used by frontend initialization code only. |
| 136 type EngineInternal interface { |
| 66 // GetAllProjects returns a list of all projects that have at least one | 137 // GetAllProjects returns a list of all projects that have at least one |
| 67 // enabled scheduler job. | 138 // enabled scheduler job. |
| 68 GetAllProjects(c context.Context) ([]string, error) | 139 GetAllProjects(c context.Context) ([]string, error) |
| 69 | 140 |
| 70 // GetAllJobs returns a list of all enabled scheduler jobs in no particu
lar | |
| 71 // order. | |
| 72 GetAllJobs(c context.Context) ([]*Job, error) | |
| 73 | |
| 74 // GetProjectJobs returns a list of enabled scheduler jobs of some proje
ct | |
| 75 // in no particular order. | |
| 76 GetProjectJobs(c context.Context, projectID string) ([]*Job, error) | |
| 77 | |
| 78 // GetJob returns single scheduler job given its full ID or nil if no su
ch | |
| 79 // job. | |
| 80 GetJob(c context.Context, jobID string) (*Job, error) | |
| 81 | |
| 82 // ListInvocations returns invocations of a job, most recent first. | |
| 83 // Returns fetched invocations and cursor string if there's more. | |
| 84 ListInvocations(c context.Context, jobID string, pageSize int, cursor st
ring) ([]*Invocation, string, error) | |
| 85 | |
| 86 // GetInvocation returns single invocation of some job given its ID. | |
| 87 GetInvocation(c context.Context, jobID string, invID int64) (*Invocation
, error) | |
| 88 | |
| 89 // GetInvocationsByNonce returns a list of Invocations with given nonce. | |
| 90 // | |
| 91 // Invocation nonce is a random number that identifies an intent to star
t | |
| 92 // an invocation. Normally one nonce corresponds to one Invocation entit
y, | |
| 93 // but there can be more if job fails to start with a transient error. | |
| 94 GetInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation,
error) | |
| 95 | |
| 96 // UpdateProjectJobs adds new, removes old and updates existing jobs. | 141 // UpdateProjectJobs adds new, removes old and updates existing jobs. |
| 97 UpdateProjectJobs(c context.Context, projectID string, defs []catalog.De
finition) error | 142 UpdateProjectJobs(c context.Context, projectID string, defs []catalog.De
finition) error |
| 98 | 143 |
| 99 // ResetAllJobsOnDevServer forcefully resets state of all enabled jobs. | 144 // ResetAllJobsOnDevServer forcefully resets state of all enabled jobs. |
| 100 // Supposed to be used only on devserver, where task queue stub state is
not | 145 // Supposed to be used only on devserver, where task queue stub state is
not |
| 101 // preserved between appserver restarts and it messes everything. | 146 // preserved between appserver restarts and it messes everything. |
| 102 ResetAllJobsOnDevServer(c context.Context) error | 147 ResetAllJobsOnDevServer(c context.Context) error |
| 103 | 148 |
| 104 // ExecuteSerializedAction is called via a task queue to execute an acti
on | 149 // ExecuteSerializedAction is called via a task queue to execute an acti
on |
| 105 // produced by job state machine transition. These actions are POSTed | 150 // produced by job state machine transition. These actions are POSTed |
| 106 // to TimersQueue and InvocationsQueue defined in Config by Engine. | 151 // to TimersQueue and InvocationsQueue defined in Config by Engine. |
| 107 // 'retryCount' is 0 on first attempt, 1 if task queue service retries | 152 // 'retryCount' is 0 on first attempt, 1 if task queue service retries |
| 108 // request once, 2 - if twice, and so on. Returning transient errors her
e | 153 // request once, 2 - if twice, and so on. Returning transient errors her
e |
| 109 // causes the task queue to retry the task. | 154 // causes the task queue to retry the task. |
| 110 ExecuteSerializedAction(c context.Context, body []byte, retryCount int)
error | 155 ExecuteSerializedAction(c context.Context, body []byte, retryCount int)
error |
| 111 | 156 |
| 112 // ProcessPubSubPush is called whenever incoming PubSub message is recei
ved. | 157 // ProcessPubSubPush is called whenever incoming PubSub message is recei
ved. |
| 113 ProcessPubSubPush(c context.Context, body []byte) error | 158 ProcessPubSubPush(c context.Context, body []byte) error |
| 114 | 159 |
| 115 // PullPubSubOnDevServer is called on dev server to pull messages from P
ubSub | 160 // PullPubSubOnDevServer is called on dev server to pull messages from P
ubSub |
| 116 // subscription associated with given publisher. | 161 // subscription associated with given publisher. |
| 117 // | 162 // |
| 118 // It is needed to be able to manually tests PubSub related workflows on
dev | 163 // It is needed to be able to manually tests PubSub related workflows on
dev |
| 119 // server, since dev server can't accept PubSub push messages. | 164 // server, since dev server can't accept PubSub push messages. |
| 120 PullPubSubOnDevServer(c context.Context, taskManagerName, publisher stri
ng) error | 165 PullPubSubOnDevServer(c context.Context, taskManagerName, publisher stri
ng) error |
| 121 | 166 |
| 122 » // TriggerInvocation launches job invocation right now if job isn't runn
ing | 167 » // PublicAPI returns ACL-enforced API. |
| 123 » // now. Used by "Run now" UI button. | 168 » PublicAPI() Engine |
| 124 » // | |
| 125 » // Returns new invocation nonce (a random number that identifies an inte
nt to | |
| 126 » // start an invocation). Normally one nonce corresponds to one Invocatio
n | |
| 127 » // entity, but there can be more if job fails to start with a transient
error. | |
| 128 » TriggerInvocation(c context.Context, jobID string, triggeredBy identity.
Identity) (int64, error) | |
| 129 | |
| 130 » // PauseJob replaces job's schedule with "triggered", effectively preven
ting | |
| 131 » // it from running automatically (until it is resumed). Manual invocatio
ns are | |
| 132 » // still allowed. Does nothing if job is already paused. Any pending or | |
| 133 » // running invocations are still executed. | |
| 134 » PauseJob(c context.Context, jobID string, who identity.Identity) error | |
| 135 | |
| 136 » // ResumeJob resumes paused job. Doesn't nothing if the job is not pause
d. | |
| 137 » ResumeJob(c context.Context, jobID string, who identity.Identity) error | |
| 138 | |
| 139 » // AbortInvocation forcefully moves the invocation to failed state. | |
| 140 » // | |
| 141 » // It opportunistically tries to send "abort" signal to a job runner if
it | |
| 142 » // supports cancellation, but it doesn't wait for reply. It proceeds to | |
| 143 » // modifying local state in the scheduler service datastore immediately. | |
| 144 » // | |
| 145 » // AbortInvocation can be used to manually "unstuck" jobs that got stuck
due | |
| 146 » // to missing PubSub notifications or other kinds of unexpected conditio
ns. | |
| 147 » // | |
| 148 » // Does nothing if invocation is already in some final state. | |
| 149 » AbortInvocation(c context.Context, jobID string, invID int64, who identi
ty.Identity) error | |
| 150 | |
| 151 » // AbortJob resets the job to scheduled state, aborting a currently pend
ing or | |
| 152 » // running invocation (if any). | |
| 153 » // | |
| 154 » // Returns nil if the job is not currently running. | |
| 155 » AbortJob(c context.Context, jobID string, who identity.Identity) error | |
| 156 } | 169 } |
| 157 | 170 |
| 158 // Config contains parameters for the engine. | 171 // Config contains parameters for the engine. |
| 159 type Config struct { | 172 type Config struct { |
| 160 Catalog catalog.Catalog // provides task.Manager's to run t
asks | 173 Catalog catalog.Catalog // provides task.Manager's to run t
asks |
| 161 TimersQueuePath string // URL of a task queue handler for
timer ticks | 174 TimersQueuePath string // URL of a task queue handler for
timer ticks |
| 162 TimersQueueName string // queue name for timer ticks | 175 TimersQueueName string // queue name for timer ticks |
| 163 InvocationsQueuePath string // URL of a task queue handler that
starts jobs | 176 InvocationsQueuePath string // URL of a task queue handler that
starts jobs |
| 164 InvocationsQueueName string // queue name for job starts | 177 InvocationsQueueName string // queue name for job starts |
| 165 PubSubPushPath string // URL to use in PubSub push config | 178 PubSubPushPath string // URL to use in PubSub push config |
| 166 } | 179 } |
| 167 | 180 |
| 168 // NewEngine returns default implementation of Engine. | 181 // NewEngine returns default implementation of EngineInternal. |
| 169 func NewEngine(conf Config) Engine { | 182 func NewEngine(conf Config) EngineInternal { |
| 170 return &engineImpl{ | 183 return &engineImpl{ |
| 171 Config: conf, | 184 Config: conf, |
| 172 doneFlags: make(map[string]bool), | 185 doneFlags: make(map[string]bool), |
| 173 } | 186 } |
| 174 } | 187 } |
| 175 | 188 |
| 176 //// Implementation. | 189 //// Implementation. |
| 177 | 190 |
| 178 const ( | 191 const ( |
| 179 // invocationRetryLimit is how many times to retry an invocation before
giving | 192 // invocationRetryLimit is how many times to retry an invocation before
giving |
| (...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 262 // an appropriate revision. | 275 // an appropriate revision. |
| 263 RevisionURL string `gae:",noindex"` | 276 RevisionURL string `gae:",noindex"` |
| 264 | 277 |
| 265 // Schedule is the job's schedule in regular cron expression format. | 278 // Schedule is the job's schedule in regular cron expression format. |
| 266 Schedule string `gae:",noindex"` | 279 Schedule string `gae:",noindex"` |
| 267 | 280 |
| 268 // Task is the job's payload in serialized form. Opaque from the point o
f view | 281 // Task is the job's payload in serialized form. Opaque from the point o
f view |
| 269 // of the engine. See Catalog.UnmarshalTask(). | 282 // of the engine. See Catalog.UnmarshalTask(). |
| 270 Task []byte `gae:",noindex"` | 283 Task []byte `gae:",noindex"` |
| 271 | 284 |
| 285 // ACLs are the latest ACLs applied to Job and all its invocations. |
| 286 Acls acl.GrantsByRole `gae:",noindex"` |
| 287 |
| 272 // State is the job's state machine state, see StateMachine. | 288 // State is the job's state machine state, see StateMachine. |
| 273 State JobState | 289 State JobState |
| 274 } | 290 } |
| 275 | 291 |
| 276 // GetJobName returns name of this Job as defined its project's config. | 292 // GetJobName returns name of this Job as defined its project's config. |
| 277 func (e *Job) GetJobName() string { | 293 func (e *Job) GetJobName() string { |
| 278 // JobID has form <project>/<id>. Split it into components. | 294 // JobID has form <project>/<id>. Split it into components. |
| 279 chunks := strings.Split(e.JobID, "/") | 295 chunks := strings.Split(e.JobID, "/") |
| 280 return chunks[1] | 296 return chunks[1] |
| 281 } | 297 } |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 321 e.State == other.State) | 337 e.State == other.State) |
| 322 } | 338 } |
| 323 | 339 |
| 324 // matches returns true if job definition in the entity matches the one | 340 // matches returns true if job definition in the entity matches the one |
| 325 // specified by catalog.Definition struct. UpdateProjectJobs skips updates for | 341 // specified by catalog.Definition struct. UpdateProjectJobs skips updates for |
| 326 // such jobs (assuming they are up-to-date). | 342 // such jobs (assuming they are up-to-date). |
| 327 func (e *Job) matches(def catalog.Definition) bool { | 343 func (e *Job) matches(def catalog.Definition) bool { |
| 328 return e.JobID == def.JobID && | 344 return e.JobID == def.JobID && |
| 329 e.Flavor == def.Flavor && | 345 e.Flavor == def.Flavor && |
| 330 e.Schedule == def.Schedule && | 346 e.Schedule == def.Schedule && |
| 347 e.Acls.Equal(&def.Acls) && |
| 331 bytes.Equal(e.Task, def.Task) | 348 bytes.Equal(e.Task, def.Task) |
| 332 } | 349 } |
| 333 | 350 |
| 334 // Invocation entity stores single attempt to run a job. Its parent entity | 351 // Invocation entity stores single attempt to run a job. Its parent entity |
| 335 // is corresponding Job, its ID is generated based on time. | 352 // is corresponding Job, its ID is generated based on time. |
| 336 type Invocation struct { | 353 type Invocation struct { |
| 337 _kind string `gae:"$kind,Invocation"` | 354 _kind string `gae:"$kind,Invocation"` |
| 338 _extra ds.PropertyMap `gae:"-,extra"` | 355 _extra ds.PropertyMap `gae:"-,extra"` |
| 339 | 356 |
| 340 // ID is identifier of this particular attempt to run a job. Multiple at
tempts | 357 // ID is identifier of this particular attempt to run a job. Multiple at
tempts |
| (...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 531 | 548 |
| 532 return 0, errors.New("could not find available invocationID after 10 att
empts") | 549 return 0, errors.New("could not find available invocationID after 10 att
empts") |
| 533 } | 550 } |
| 534 | 551 |
| 535 // debugLog mutates a string by appending a line to it. | 552 // debugLog mutates a string by appending a line to it. |
| 536 func debugLog(c context.Context, str *string, format string, args ...interface{}
) { | 553 func debugLog(c context.Context, str *string, format string, args ...interface{}
) { |
| 537 prefix := clock.Now(c).UTC().Format("[15:04:05.000] ") | 554 prefix := clock.Now(c).UTC().Format("[15:04:05.000] ") |
| 538 *str += prefix + fmt.Sprintf(format+"\n", args...) | 555 *str += prefix + fmt.Sprintf(format+"\n", args...) |
| 539 } | 556 } |
| 540 | 557 |
| 541 //// | 558 //////////////////////////////////////////////////////////////////////////////// |
| 559 // engineImpl. |
| 542 | 560 |
| 543 type engineImpl struct { | 561 type engineImpl struct { |
| 544 Config | 562 Config |
| 545 | 563 |
| 546 lock sync.Mutex | 564 lock sync.Mutex |
| 547 doneFlags map[string]bool // see doIfNotDone | 565 doneFlags map[string]bool // see doIfNotDone |
| 548 | 566 |
| 549 // configureTopic is used by prepareTopic, mocked in tests. | 567 // configureTopic is used by prepareTopic, mocked in tests. |
| 550 configureTopic func(c context.Context, topic, sub, pushURL, publisher st
ring) error | 568 configureTopic func(c context.Context, topic, sub, pushURL, publisher st
ring) error |
| 551 } | 569 } |
| (...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 612 // Filter out duplicates, sort. | 630 // Filter out duplicates, sort. |
| 613 projects := stringset.New(len(entities)) | 631 projects := stringset.New(len(entities)) |
| 614 for _, ent := range entities { | 632 for _, ent := range entities { |
| 615 projects.Add(ent.ProjectID) | 633 projects.Add(ent.ProjectID) |
| 616 } | 634 } |
| 617 out := projects.ToSlice() | 635 out := projects.ToSlice() |
| 618 sort.Strings(out) | 636 sort.Strings(out) |
| 619 return out, nil | 637 return out, nil |
| 620 } | 638 } |
| 621 | 639 |
| 622 func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) { | 640 func (e *engineImpl) GetVisibleJobs(c context.Context) ([]*Job, error) { |
| 623 q := ds.NewQuery("Job").Eq("Enabled", true) | 641 q := ds.NewQuery("Job").Eq("Enabled", true) |
| 624 » return e.queryEnabledJobs(c, q) | 642 » return e.queryEnabledVisibleJobs(c, q) |
| 625 } | 643 } |
| 626 | 644 |
| 627 func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job
, error) { | 645 func (e *engineImpl) GetVisibleProjectJobs(c context.Context, projectID string)
([]*Job, error) { |
| 628 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) | 646 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) |
| 629 » return e.queryEnabledJobs(c, q) | 647 » return e.queryEnabledVisibleJobs(c, q) |
| 630 } | 648 } |
| 631 | 649 |
| 632 func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e
rror) { | 650 func (e *engineImpl) queryEnabledVisibleJobs(c context.Context, q *ds.Query) ([]
*Job, error) { |
| 633 entities := []*Job{} | 651 entities := []*Job{} |
| 634 if err := ds.GetAll(c, q, &entities); err != nil { | 652 if err := ds.GetAll(c, q, &entities); err != nil { |
| 635 return nil, transient.Tag.Apply(err) | 653 return nil, transient.Tag.Apply(err) |
| 636 } | 654 } |
| 637 // Non-ancestor query used, need to recheck filters. | 655 // Non-ancestor query used, need to recheck filters. |
| 638 filtered := make([]*Job, 0, len(entities)) | 656 filtered := make([]*Job, 0, len(entities)) |
| 639 for _, job := range entities { | 657 for _, job := range entities { |
| 640 » » if job.Enabled { | 658 » » if !job.Enabled { |
| 659 » » » continue |
| 660 » » } |
| 661 » » // TODO(tandrii): improve batch ACLs check here to take advantag
e of likely |
| 662 » » // shared ACLs between most jobs of the same project. |
| 663 » » if ok, err := job.Acls.IsReader(c); err != nil { |
| 664 » » » return nil, transient.Tag.Apply(err) |
| 665 » » } else if ok { |
| 641 filtered = append(filtered, job) | 666 filtered = append(filtered, job) |
| 642 } | 667 } |
| 643 } | 668 } |
| 644 return filtered, nil | 669 return filtered, nil |
| 645 } | 670 } |
| 646 | 671 |
| 647 func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { | 672 func (e *engineImpl) GetVisibleJob(c context.Context, jobID string) (*Job, error
) { |
| 673 » job, err := e.getJob(c, jobID) |
| 674 » if err != nil { |
| 675 » » return nil, err |
| 676 » } else if job == nil { |
| 677 » » return nil, ErrNoSuchJob |
| 678 » } |
| 679 » if ok, err := job.Acls.IsReader(c); err != nil { |
| 680 » » return nil, err |
| 681 » } else if !ok { |
| 682 » » return nil, ErrNoSuchJob |
| 683 » } |
| 684 » return job, nil |
| 685 } |
| 686 |
| 687 func (e *engineImpl) getOwnedJob(c context.Context, jobID string) (*Job, error)
{ |
| 688 » job, err := e.getJob(c, jobID) |
| 689 » if err != nil { |
| 690 » » return nil, err |
| 691 » } else if job == nil { |
| 692 » » return nil, ErrNoSuchJob |
| 693 » } |
| 694 |
| 695 » switch owner, err := job.Acls.IsOwner(c); { |
| 696 » case err != nil: |
| 697 » » return nil, err |
| 698 » case owner: |
| 699 » » return job, nil |
| 700 » } |
| 701 |
| 702 » // Not owner, but maybe reader? Give nicer error in such case. |
| 703 » switch reader, err := job.Acls.IsReader(c); { |
| 704 » case err != nil: |
| 705 » » return nil, err |
| 706 » case reader: |
| 707 » » return nil, ErrNoOwnerPermission |
| 708 » default: |
| 709 » » return nil, ErrNoSuchJob |
| 710 » } |
| 711 } |
| 712 |
| 713 func (e *engineImpl) getJob(c context.Context, jobID string) (*Job, error) { |
| 648 job := &Job{JobID: jobID} | 714 job := &Job{JobID: jobID} |
| 649 switch err := ds.Get(c, job); { | 715 switch err := ds.Get(c, job); { |
| 650 case err == nil: | 716 case err == nil: |
| 651 return job, nil | 717 return job, nil |
| 652 case err == ds.ErrNoSuchEntity: | 718 case err == ds.ErrNoSuchEntity: |
| 653 return nil, nil | 719 return nil, nil |
| 654 default: | 720 default: |
| 655 return nil, transient.Tag.Apply(err) | 721 return nil, transient.Tag.Apply(err) |
| 656 } | 722 } |
| 657 } | 723 } |
| 658 | 724 |
| 659 func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i
nt, cursor string) ([]*Invocation, string, error) { | 725 func (e *engineImpl) PublicAPI() Engine { |
| 726 » return e |
| 727 } |
| 728 |
| 729 func (e *engineImpl) ListVisibleInvocations(c context.Context, jobID string, pag
eSize int, cursor string) ([]*Invocation, string, error) { |
| 730 » if job, err := e.GetVisibleJob(c, jobID); err != nil { |
| 731 » » return nil, "", err |
| 732 » } else if job == nil { |
| 733 » » // Either no Job or no access, both imply returning no invocatio
ns. |
| 734 » » return nil, "", ErrNoSuchInvocation |
| 735 » } |
| 736 |
| 660 if pageSize == 0 || pageSize > 500 { | 737 if pageSize == 0 || pageSize > 500 { |
| 661 pageSize = 500 | 738 pageSize = 500 |
| 662 } | 739 } |
| 663 | 740 |
| 664 // Deserialize the cursor. | 741 // Deserialize the cursor. |
| 665 var cursorObj ds.Cursor | 742 var cursorObj ds.Cursor |
| 666 if cursor != "" { | 743 if cursor != "" { |
| 667 var err error | 744 var err error |
| 668 cursorObj, err = ds.DecodeCursor(c, cursor) | 745 cursorObj, err = ds.DecodeCursor(c, cursor) |
| 669 if err != nil { | 746 if err != nil { |
| (...skipping 11 matching lines...) Expand all Loading... |
| 681 } | 758 } |
| 682 | 759 |
| 683 // Fetch pageSize worth of invocations, then grab the cursor. | 760 // Fetch pageSize worth of invocations, then grab the cursor. |
| 684 out := make([]*Invocation, 0, pageSize) | 761 out := make([]*Invocation, 0, pageSize) |
| 685 var newCursor string | 762 var newCursor string |
| 686 err := ds.Run(c, q, func(obj *Invocation, getCursor ds.CursorCB) error { | 763 err := ds.Run(c, q, func(obj *Invocation, getCursor ds.CursorCB) error { |
| 687 out = append(out, obj) | 764 out = append(out, obj) |
| 688 if len(out) < pageSize { | 765 if len(out) < pageSize { |
| 689 return nil | 766 return nil |
| 690 } | 767 } |
| 691 » » c, err := getCursor() | 768 » » cur, err := getCursor() |
| 692 if err != nil { | 769 if err != nil { |
| 693 return err | 770 return err |
| 694 } | 771 } |
| 695 » » newCursor = c.String() | 772 » » newCursor = cur.String() |
| 696 return ds.Stop | 773 return ds.Stop |
| 697 }) | 774 }) |
| 698 if err != nil { | 775 if err != nil { |
| 699 return nil, "", transient.Tag.Apply(err) | 776 return nil, "", transient.Tag.Apply(err) |
| 700 } | 777 } |
| 701 return out, newCursor, nil | 778 return out, newCursor, nil |
| 702 } | 779 } |
| 703 | 780 |
| 704 func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64)
(*Invocation, error) { | 781 func (e *engineImpl) GetVisibleInvocation(c context.Context, jobID string, invID
int64) (*Invocation, error) { |
| 782 » switch _, err := e.GetVisibleJob(c, jobID); { |
| 783 » case err == ErrNoSuchJob: |
| 784 » » return nil, ErrNoSuchInvocation |
| 785 » case err != nil: |
| 786 » » return nil, err |
| 787 » default: |
| 788 » » return e.getInvocation(c, jobID, invID) |
| 789 » } |
| 790 } |
| 791 |
| 792 func (e *engineImpl) getInvocation(c context.Context, jobID string, invID int64)
(*Invocation, error) { |
| 705 inv := &Invocation{ | 793 inv := &Invocation{ |
| 706 ID: invID, | 794 ID: invID, |
| 707 JobKey: ds.NewKey(c, "Job", jobID, 0, nil), | 795 JobKey: ds.NewKey(c, "Job", jobID, 0, nil), |
| 708 } | 796 } |
| 709 switch err := ds.Get(c, inv); { | 797 switch err := ds.Get(c, inv); { |
| 710 case err == nil: | 798 case err == nil: |
| 711 return inv, nil | 799 return inv, nil |
| 712 case err == ds.ErrNoSuchEntity: | 800 case err == ds.ErrNoSuchEntity: |
| 713 return nil, nil | 801 return nil, nil |
| 714 default: | 802 default: |
| 715 return nil, transient.Tag.Apply(err) | 803 return nil, transient.Tag.Apply(err) |
| 716 } | 804 } |
| 717 } | 805 } |
| 718 | 806 |
| 719 func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([
]*Invocation, error) { | 807 func (e *engineImpl) GetVisibleInvocationsByNonce(c context.Context, invNonce in
t64) ([]*Invocation, error) { |
| 720 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) | 808 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) |
| 721 entities := []*Invocation{} | 809 entities := []*Invocation{} |
| 722 if err := ds.GetAll(c, q, &entities); err != nil { | 810 if err := ds.GetAll(c, q, &entities); err != nil { |
| 723 return nil, transient.Tag.Apply(err) | 811 return nil, transient.Tag.Apply(err) |
| 724 } | 812 } |
| 813 if len(entities) > 0 { |
| 814 // All Invocations with the same nonce must belong to the same J
ob. |
| 815 switch _, err := e.GetVisibleJob(c, entities[0].JobKey.StringID(
)); { |
| 816 case err == ErrNoSuchJob: |
| 817 return []*Invocation{}, nil |
| 818 case err != nil: |
| 819 return nil, err |
| 820 } |
| 821 } |
| 725 return entities, nil | 822 return entities, nil |
| 726 } | 823 } |
| 727 | 824 |
| 728 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs
[]catalog.Definition) error { | 825 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs
[]catalog.Definition) error { |
| 729 // JobID -> *Job map. | 826 // JobID -> *Job map. |
| 730 existing, err := e.getProjectJobs(c, projectID) | 827 existing, err := e.getProjectJobs(c, projectID) |
| 731 if err != nil { | 828 if err != nil { |
| 732 return err | 829 return err |
| 733 } | 830 } |
| 734 // JobID -> new definition revision map. | 831 // JobID -> new definition revision map. |
| (...skipping 309 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1044 | 1141 |
| 1045 func (e *engineImpl) executeInvAction(c context.Context, payload *actionTaskPayl
oad, retryCount int) error { | 1142 func (e *engineImpl) executeInvAction(c context.Context, payload *actionTaskPayl
oad, retryCount int) error { |
| 1046 switch { | 1143 switch { |
| 1047 case payload.InvTimer != nil: | 1144 case payload.InvTimer != nil: |
| 1048 return e.invocationTimerTick(c, payload.JobID, payload.InvID, pa
yload.InvTimer) | 1145 return e.invocationTimerTick(c, payload.JobID, payload.InvID, pa
yload.InvTimer) |
| 1049 default: | 1146 default: |
| 1050 return fmt.Errorf("unexpected invocation action kind %q", payloa
d) | 1147 return fmt.Errorf("unexpected invocation action kind %q", payloa
d) |
| 1051 } | 1148 } |
| 1052 } | 1149 } |
| 1053 | 1150 |
| 1054 func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere
dBy identity.Identity) (int64, error) { | 1151 func (e *engineImpl) TriggerInvocation(c context.Context, jobID string) (int64,
error) { |
| 1152 » // First, we check ACLs. |
| 1153 » if _, err := e.getOwnedJob(c, jobID); err != nil { |
| 1154 » » return 0, err |
| 1155 » } |
| 1156 |
| 1055 var err error | 1157 var err error |
| 1056 var invNonce int64 | 1158 var invNonce int64 |
| 1057 err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) er
ror { | 1159 err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) er
ror { |
| 1058 if isNew { | 1160 if isNew { |
| 1059 err = ErrNoSuchJob | 1161 err = ErrNoSuchJob |
| 1060 return errSkipPut | 1162 return errSkipPut |
| 1061 } | 1163 } |
| 1062 if !job.Enabled { | 1164 if !job.Enabled { |
| 1063 err = errors.New("the job is disabled") | 1165 err = errors.New("the job is disabled") |
| 1064 return errSkipPut | 1166 return errSkipPut |
| 1065 } | 1167 } |
| 1066 invNonce = 0 | 1168 invNonce = 0 |
| 1067 return e.rollSM(c, job, func(sm *StateMachine) error { | 1169 return e.rollSM(c, job, func(sm *StateMachine) error { |
| 1068 » » » if err := sm.OnManualInvocation(triggeredBy); err != nil
{ | 1170 » » » if err := sm.OnManualInvocation(auth.CurrentIdentity(c))
; err != nil { |
| 1069 return err | 1171 return err |
| 1070 } | 1172 } |
| 1071 invNonce = sm.State.InvocationNonce | 1173 invNonce = sm.State.InvocationNonce |
| 1072 return nil | 1174 return nil |
| 1073 }) | 1175 }) |
| 1074 }) | 1176 }) |
| 1075 if err == nil { | 1177 if err == nil { |
| 1076 err = err2 | 1178 err = err2 |
| 1077 } | 1179 } |
| 1078 return invNonce, err | 1180 return invNonce, err |
| 1079 } | 1181 } |
| 1080 | 1182 |
| 1081 func (e *engineImpl) PauseJob(c context.Context, jobID string, who identity.Iden
tity) error { | 1183 func (e *engineImpl) PauseJob(c context.Context, jobID string) error { |
| 1082 » return e.setPausedFlag(c, jobID, true, who) | 1184 » return e.setPausedFlag(c, jobID, true, auth.CurrentIdentity(c)) |
| 1083 } | 1185 } |
| 1084 | 1186 |
| 1085 func (e *engineImpl) ResumeJob(c context.Context, jobID string, who identity.Ide
ntity) error { | 1187 func (e *engineImpl) ResumeJob(c context.Context, jobID string) error { |
| 1086 » return e.setPausedFlag(c, jobID, false, who) | 1188 » return e.setPausedFlag(c, jobID, false, auth.CurrentIdentity(c)) |
| 1087 } | 1189 } |
| 1088 | 1190 |
| 1089 func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool,
who identity.Identity) error { | 1191 func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool,
who identity.Identity) error { |
| 1192 // First, we check ACLs outside of transaction. Yes, this allows for rac
es |
| 1193 // between (un)pausing and ACLs changes but these races have no impact. |
| 1194 if _, err := e.getOwnedJob(c, jobID); err != nil { |
| 1195 return err |
| 1196 } |
| 1090 return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err
or { | 1197 return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err
or { |
| 1091 if isNew || !job.Enabled { | 1198 if isNew || !job.Enabled { |
| 1092 return ErrNoSuchJob | 1199 return ErrNoSuchJob |
| 1093 } | 1200 } |
| 1094 if job.Paused == paused { | 1201 if job.Paused == paused { |
| 1095 return errSkipPut | 1202 return errSkipPut |
| 1096 } | 1203 } |
| 1097 if paused { | 1204 if paused { |
| 1098 logging.Warningf(c, "Job is paused by %s", who) | 1205 logging.Warningf(c, "Job is paused by %s", who) |
| 1099 } else { | 1206 } else { |
| 1100 logging.Warningf(c, "Job is resumed by %s", who) | 1207 logging.Warningf(c, "Job is resumed by %s", who) |
| 1101 } | 1208 } |
| 1102 job.Paused = paused | 1209 job.Paused = paused |
| 1103 return e.rollSM(c, job, func(sm *StateMachine) error { | 1210 return e.rollSM(c, job, func(sm *StateMachine) error { |
| 1104 sm.OnScheduleChange() | 1211 sm.OnScheduleChange() |
| 1105 return nil | 1212 return nil |
| 1106 }) | 1213 }) |
| 1107 }) | 1214 }) |
| 1108 } | 1215 } |
| 1109 | 1216 |
| 1110 func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6
4, who identity.Identity) error { | 1217 func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6
4) error { |
| 1218 » if _, err := e.getOwnedJob(c, jobID); err != nil { |
| 1219 » » return err |
| 1220 » } |
| 1221 » return e.abortInvocation(c, jobID, invID) |
| 1222 } |
| 1223 |
| 1224 func (e *engineImpl) abortInvocation(c context.Context, jobID string, invID int6
4) error { |
| 1111 c = logging.SetField(c, "JobID", jobID) | 1225 c = logging.SetField(c, "JobID", jobID) |
| 1112 c = logging.SetField(c, "InvID", invID) | 1226 c = logging.SetField(c, "InvID", invID) |
| 1113 | 1227 |
| 1114 var inv *Invocation | 1228 var inv *Invocation |
| 1115 var err error | 1229 var err error |
| 1116 » switch inv, err = e.GetInvocation(c, jobID, invID); { | 1230 » switch inv, err = e.getInvocation(c, jobID, invID); { |
| 1117 case err != nil: | 1231 case err != nil: |
| 1118 logging.Errorf(c, "Failed to fetch the invocation - %s", err) | 1232 logging.Errorf(c, "Failed to fetch the invocation - %s", err) |
| 1119 return err | 1233 return err |
| 1120 case inv == nil: | 1234 case inv == nil: |
| 1121 logging.Errorf(c, "The invocation doesn't exist") | 1235 logging.Errorf(c, "The invocation doesn't exist") |
| 1122 return ErrNoSuchInvocation | 1236 return ErrNoSuchInvocation |
| 1123 case inv.Status.Final(): | 1237 case inv.Status.Final(): |
| 1124 return nil | 1238 return nil |
| 1125 } | 1239 } |
| 1126 | 1240 |
| 1127 ctl, err := e.controllerForInvocation(c, inv) | 1241 ctl, err := e.controllerForInvocation(c, inv) |
| 1128 if err != nil { | 1242 if err != nil { |
| 1129 logging.Errorf(c, "Cannot get controller - %s", err) | 1243 logging.Errorf(c, "Cannot get controller - %s", err) |
| 1130 return err | 1244 return err |
| 1131 } | 1245 } |
| 1132 | 1246 |
| 1133 » ctl.DebugLog("Invocation is manually aborted by %q", who) | 1247 » ctl.DebugLog("Invocation is manually aborted by %q", auth.CurrentUser(c)
) |
| 1134 if err = ctl.manager.AbortTask(c, ctl); err != nil { | 1248 if err = ctl.manager.AbortTask(c, ctl); err != nil { |
| 1135 logging.Errorf(c, "Failed to abort the task - %s", err) | 1249 logging.Errorf(c, "Failed to abort the task - %s", err) |
| 1136 return err | 1250 return err |
| 1137 } | 1251 } |
| 1138 | 1252 |
| 1139 ctl.State().Status = task.StatusAborted | 1253 ctl.State().Status = task.StatusAborted |
| 1140 if err = ctl.Save(c); err != nil { | 1254 if err = ctl.Save(c); err != nil { |
| 1141 logging.Errorf(c, "Failed to save the invocation - %s", err) | 1255 logging.Errorf(c, "Failed to save the invocation - %s", err) |
| 1142 return err | 1256 return err |
| 1143 } | 1257 } |
| 1144 return nil | 1258 return nil |
| 1145 } | 1259 } |
| 1146 | 1260 |
| 1147 // AbortJob resets the job to scheduled state, aborting a currently pending or | 1261 // AbortJob resets the job to scheduled state, aborting a currently pending or |
| 1148 // running invocation (if any). | 1262 // running invocation (if any). |
| 1149 // | 1263 // |
| 1150 // Returns nil if the job is not currently running. | 1264 // Returns nil if the job is not currently running. |
| 1151 func (e *engineImpl) AbortJob(c context.Context, jobID string, who identity.Iden
tity) error { | 1265 func (e *engineImpl) AbortJob(c context.Context, jobID string) error { |
| 1152 » // First we switch the job to the default state and disassociate the run
ning | 1266 » // First, we check ACLs. |
| 1267 » if _, err := e.getOwnedJob(c, jobID); err != nil { |
| 1268 » » return err |
| 1269 » } |
| 1270 » // Second, we switch the job to the default state and disassociate the r
unning |
| 1153 // invocation (if any) from the job entity. | 1271 // invocation (if any) from the job entity. |
| 1154 var invID int64 | 1272 var invID int64 |
| 1155 err := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err
or { | 1273 err := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err
or { |
| 1156 if isNew { | 1274 if isNew { |
| 1157 return errSkipPut // the job was removed, nothing to abo
rt | 1275 return errSkipPut // the job was removed, nothing to abo
rt |
| 1158 } | 1276 } |
| 1159 invID = job.State.InvocationID | 1277 invID = job.State.InvocationID |
| 1160 return e.rollSM(c, job, func(sm *StateMachine) error { | 1278 return e.rollSM(c, job, func(sm *StateMachine) error { |
| 1161 sm.OnManualAbort() | 1279 sm.OnManualAbort() |
| 1162 return nil | 1280 return nil |
| 1163 }) | 1281 }) |
| 1164 }) | 1282 }) |
| 1165 if err != nil { | 1283 if err != nil { |
| 1166 return err | 1284 return err |
| 1167 } | 1285 } |
| 1168 | 1286 |
| 1169 // Now we kill the invocation. We do it separately because it may involv
e | 1287 // Now we kill the invocation. We do it separately because it may involv
e |
| 1170 // an RPC to remote service (e.g. to cancel a task) that can't be done f
rom | 1288 // an RPC to remote service (e.g. to cancel a task) that can't be done f
rom |
| 1171 // the transaction. | 1289 // the transaction. |
| 1172 if invID != 0 { | 1290 if invID != 0 { |
| 1173 » » return e.AbortInvocation(c, jobID, invID, who) | 1291 » » return e.abortInvocation(c, jobID, invID) |
| 1174 } | 1292 } |
| 1175 return nil | 1293 return nil |
| 1176 } | 1294 } |
| 1177 | 1295 |
| 1178 // updateJob updates an existing job if its definition has changed, adds | 1296 // updateJob updates an existing job if its definition has changed, adds |
| 1179 // a completely new job or enables a previously disabled job. | 1297 // a completely new job or enables a previously disabled job. |
| 1180 func (e *engineImpl) updateJob(c context.Context, def catalog.Definition) error
{ | 1298 func (e *engineImpl) updateJob(c context.Context, def catalog.Definition) error
{ |
| 1181 return e.txn(c, def.JobID, func(c context.Context, job *Job, isNew bool)
error { | 1299 return e.txn(c, def.JobID, func(c context.Context, job *Job, isNew bool)
error { |
| 1182 if !isNew && job.Enabled && job.matches(def) { | 1300 if !isNew && job.Enabled && job.matches(def) { |
| 1183 return errSkipPut | 1301 return errSkipPut |
| (...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1310 } | 1428 } |
| 1311 | 1429 |
| 1312 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks. | 1430 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks. |
| 1313 // | 1431 // |
| 1314 // See also handlePubSubMessage, it is quite similar. | 1432 // See also handlePubSubMessage, it is quite similar. |
| 1315 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID
int64, timer *invocationTimer) error { | 1433 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID
int64, timer *invocationTimer) error { |
| 1316 c = logging.SetField(c, "JobID", jobID) | 1434 c = logging.SetField(c, "JobID", jobID) |
| 1317 c = logging.SetField(c, "InvID", invID) | 1435 c = logging.SetField(c, "InvID", invID) |
| 1318 | 1436 |
| 1319 logging.Infof(c, "Handling invocation timer %q", timer.Name) | 1437 logging.Infof(c, "Handling invocation timer %q", timer.Name) |
| 1320 » inv, err := e.GetInvocation(c, jobID, invID) | 1438 » inv, err := e.getInvocation(c, jobID, invID) |
| 1321 if err != nil { | 1439 if err != nil { |
| 1322 logging.Errorf(c, "Failed to fetch the invocation - %s", err) | 1440 logging.Errorf(c, "Failed to fetch the invocation - %s", err) |
| 1323 return err | 1441 return err |
| 1324 } | 1442 } |
| 1325 if inv == nil { | 1443 if inv == nil { |
| 1326 return ErrNoSuchInvocation | 1444 return ErrNoSuchInvocation |
| 1327 } | 1445 } |
| 1328 | 1446 |
| 1329 // Finished invocations are immutable, skip the message. | 1447 // Finished invocations are immutable, skip the message. |
| 1330 if inv.Status.Final() { | 1448 if inv.Status.Final() { |
| (...skipping 364 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1695 return err | 1813 return err |
| 1696 } | 1814 } |
| 1697 jobID = data["job"] | 1815 jobID = data["job"] |
| 1698 if invID, err = strconv.ParseInt(data["inv"], 10, 64); err != nil { | 1816 if invID, err = strconv.ParseInt(data["inv"], 10, 64); err != nil { |
| 1699 logging.Errorf(c, "Could not parse 'inv' %q - %s", data["inv"],
err) | 1817 logging.Errorf(c, "Could not parse 'inv' %q - %s", data["inv"],
err) |
| 1700 return err | 1818 return err |
| 1701 } | 1819 } |
| 1702 | 1820 |
| 1703 c = logging.SetField(c, "JobID", jobID) | 1821 c = logging.SetField(c, "JobID", jobID) |
| 1704 c = logging.SetField(c, "InvID", invID) | 1822 c = logging.SetField(c, "InvID", invID) |
| 1705 » inv, err := e.GetInvocation(c, jobID, invID) | 1823 » inv, err := e.getInvocation(c, jobID, invID) |
| 1706 if err != nil { | 1824 if err != nil { |
| 1707 logging.Errorf(c, "Failed to fetch the invocation - %s", err) | 1825 logging.Errorf(c, "Failed to fetch the invocation - %s", err) |
| 1708 return err | 1826 return err |
| 1709 } | 1827 } |
| 1710 if inv == nil { | 1828 if inv == nil { |
| 1711 return ErrNoSuchInvocation | 1829 return ErrNoSuchInvocation |
| 1712 } | 1830 } |
| 1713 | 1831 |
| 1714 // Finished invocations are immutable, skip the message. | 1832 // Finished invocations are immutable, skip the message. |
| 1715 if inv.Status.Final() { | 1833 if inv.Status.Final() { |
| (...skipping 236 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1952 } | 2070 } |
| 1953 if hasFinished { | 2071 if hasFinished { |
| 1954 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err
or { | 2072 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err
or { |
| 1955 sm.OnInvocationDone(saving.ID) | 2073 sm.OnInvocationDone(saving.ID) |
| 1956 return nil | 2074 return nil |
| 1957 }) | 2075 }) |
| 1958 } | 2076 } |
| 1959 return nil | 2077 return nil |
| 1960 }) | 2078 }) |
| 1961 } | 2079 } |
| OLD | NEW |