Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| (...skipping 30 matching lines...) Expand all Loading... | |
| 41 "github.com/luci/luci-go/common/data/rand/mathrand" | 41 "github.com/luci/luci-go/common/data/rand/mathrand" |
| 42 "github.com/luci/luci-go/common/data/stringset" | 42 "github.com/luci/luci-go/common/data/stringset" |
| 43 "github.com/luci/luci-go/common/errors" | 43 "github.com/luci/luci-go/common/errors" |
| 44 "github.com/luci/luci-go/common/logging" | 44 "github.com/luci/luci-go/common/logging" |
| 45 "github.com/luci/luci-go/common/retry/transient" | 45 "github.com/luci/luci-go/common/retry/transient" |
| 46 "github.com/luci/luci-go/server/auth" | 46 "github.com/luci/luci-go/server/auth" |
| 47 "github.com/luci/luci-go/server/auth/identity" | 47 "github.com/luci/luci-go/server/auth/identity" |
| 48 "github.com/luci/luci-go/server/auth/signing" | 48 "github.com/luci/luci-go/server/auth/signing" |
| 49 "github.com/luci/luci-go/server/tokens" | 49 "github.com/luci/luci-go/server/tokens" |
| 50 | 50 |
| 51 "github.com/luci/luci-go/scheduler/appengine/acl" | |
| 51 "github.com/luci/luci-go/scheduler/appengine/catalog" | 52 "github.com/luci/luci-go/scheduler/appengine/catalog" |
| 52 "github.com/luci/luci-go/scheduler/appengine/schedule" | 53 "github.com/luci/luci-go/scheduler/appengine/schedule" |
| 53 "github.com/luci/luci-go/scheduler/appengine/task" | 54 "github.com/luci/luci-go/scheduler/appengine/task" |
| 54 ) | 55 ) |
| 55 | 56 |
| 56 var ( | 57 var ( |
| 57 » ErrNoSuchJob = errors.New("no such job") | 58 » ErrNoOwnerPermission = errors.New("no OWNER permission on a job") |
| 58 » ErrNoSuchInvocation = errors.New("the invocation doesn't exist") | 59 » ErrNoSuchJob = errors.New("no such job") |
| 60 » ErrNoSuchInvocation = errors.New("the invocation doesn't exist") | |
| 59 ) | 61 ) |
| 60 | 62 |
| 61 // Engine manages all scheduler jobs: keeps track of their state, runs state | 63 // Engine manages all scheduler jobs: keeps track of their state, runs state |
| 62 // machine transactions, starts new invocations, etc. A method returns | 64 // machine transactions, starts new invocations, etc. A method returns |
| 63 // errors.Transient if the error is non-fatal and the call should be retried | 65 // errors.Transient if the error is non-fatal and the call should be retried |
| 64 // later. Any other error means that retry won't help. | 66 // later. Any other error means that retry won't help. |
| 67 // ACLs are enforced unlike EngineInternal with the following implications: | |
| 68 // * if caller lacks have READER access to Jobs, methods behave as if Jobs | |
|
Vadim Sh.
2017/08/04 23:07:26
"lacks have" ?
tandrii(chromium)
2017/08/07 12:48:17
Done.
| |
| 69 // do not exist. | |
| 70 // * if caller lacks have OWNER access, calling mutating methods will result in | |
|
Vadim Sh.
2017/08/04 23:07:27
same here
tandrii(chromium)
2017/08/07 12:48:17
Done.
| |
| 71 // ErrNoOwnerPermission (assuming caller has READER access, else see above). | |
| 65 type Engine interface { | 72 type Engine interface { |
| 73 // GetVisibleProjects returns a list of all projects that have at least one | |
| 74 // enabled scheduler job to which caller has READER access. | |
| 75 // GetVisibleProjects(c context.Context) ([]string, error) | |
|
Vadim Sh.
2017/08/04 23:07:27
remove
I assume it's not needed now?
tandrii(chromium)
2017/08/07 12:48:16
Yep, Done.
| |
| 76 | |
| 77 // GetVisibleJobs returns a list of all enabled scheduler jobs in no | |
| 78 // particular order. | |
| 79 GetVisibleJobs(c context.Context) ([]*Job, error) | |
| 80 | |
| 81 // GetVisibleProjectJobs returns a list of enabled scheduler jobs of som e | |
| 82 // project in no particular order. | |
| 83 GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, erro r) | |
| 84 | |
| 85 // GetVisibleJob returns single scheduler job given its full ID or nil i f no such | |
| 86 // job or if not visible. | |
| 87 GetVisibleJob(c context.Context, jobID string) (*Job, error) | |
| 88 | |
| 89 // ListVisibleInvocations returns invocations of a visible job, most rec ent first. | |
| 90 // Returns fetched invocations and cursor string if there's more. | |
| 91 ListVisibleInvocations(c context.Context, jobID string, pageSize int, cu rsor string) ([]*Invocation, string, error) | |
| 92 | |
| 93 // GetVisibleInvocation returns single invocation of some job given its ID. | |
| 94 GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Inv ocation, error) | |
| 95 | |
| 96 // GetVisibleInvocationsByNonce returns a list of Invocations with given nonce. | |
| 97 // | |
| 98 // Invocation nonce is a random number that identifies an intent to star t | |
| 99 // an invocation. Normally one nonce corresponds to one Invocation entit y, | |
| 100 // but there can be more if job fails to start with a transient error. | |
| 101 GetVisibleInvocationsByNonce(c context.Context, invNonce int64) ([]*Invo cation, error) | |
| 102 | |
| 103 // PauseJob replaces job's schedule with "triggered", effectively preven ting | |
| 104 // it from running automatically (until it is resumed). Manual invocatio ns are | |
| 105 // still allowed. Does nothing if job is already paused. Any pending or | |
| 106 // running invocations are still executed. | |
| 107 PauseJob(c context.Context, jobID string) error | |
| 108 | |
| 109 // ResumeJob resumes paused job. Does nothing if the job is not paused. | |
| 110 ResumeJob(c context.Context, jobID string) error | |
| 111 | |
| 112 // AbortJob resets the job to scheduled state, aborting a currently pend ing or | |
| 113 // running invocation (if any). | |
| 114 // | |
| 115 // Returns nil if the job is not currently running. | |
| 116 AbortJob(c context.Context, jobID string) error | |
| 117 | |
| 118 // AbortInvocation forcefully moves the invocation to failed state. | |
| 119 // | |
| 120 // It opportunistically tries to send "abort" signal to a job runner if it | |
| 121 // supports cancellation, but it doesn't wait for reply. It proceeds to | |
| 122 // modifying local state in the scheduler service datastore immediately. | |
| 123 // | |
| 124 // AbortInvocation can be used to manually "unstuck" jobs that got stuck due | |
| 125 // to missing PubSub notifications or other kinds of unexpected conditio ns. | |
| 126 // | |
| 127 // Does nothing if invocation is already in some final state. | |
| 128 AbortInvocation(c context.Context, jobID string, invID int64) error | |
| 129 | |
| 130 // TriggerInvocation launches job invocation right now if job isn't runn ing | |
| 131 // now. Used by "Run now" UI button. | |
| 132 // | |
| 133 // Returns new invocation nonce (a random number that identifies an inte nt to | |
| 134 // start an invocation). Normally one nonce corresponds to one Invocatio n | |
| 135 // entity, but there can be more if job fails to start with a transient error. | |
| 136 TriggerInvocation(c context.Context, jobID string) (int64, error) | |
| 137 } | |
| 138 | |
| 139 // EngineInternal is to be used by frontend initialization code only. | |
| 140 type EngineInternal interface { | |
| 66 // GetAllProjects returns a list of all projects that have at least one | 141 // GetAllProjects returns a list of all projects that have at least one |
| 67 // enabled scheduler job. | 142 // enabled scheduler job. |
| 68 GetAllProjects(c context.Context) ([]string, error) | 143 GetAllProjects(c context.Context) ([]string, error) |
| 69 | 144 |
| 70 // GetAllJobs returns a list of all enabled scheduler jobs in no particu lar | |
| 71 // order. | |
| 72 GetAllJobs(c context.Context) ([]*Job, error) | |
| 73 | |
| 74 // GetProjectJobs returns a list of enabled scheduler jobs of some proje ct | |
| 75 // in no particular order. | |
| 76 GetProjectJobs(c context.Context, projectID string) ([]*Job, error) | |
| 77 | |
| 78 // GetJob returns single scheduler job given its full ID or nil if no su ch | |
| 79 // job. | |
| 80 GetJob(c context.Context, jobID string) (*Job, error) | |
| 81 | |
| 82 // ListInvocations returns invocations of a job, most recent first. | |
| 83 // Returns fetched invocations and cursor string if there's more. | |
| 84 ListInvocations(c context.Context, jobID string, pageSize int, cursor st ring) ([]*Invocation, string, error) | |
| 85 | |
| 86 // GetInvocation returns single invocation of some job given its ID. | |
| 87 GetInvocation(c context.Context, jobID string, invID int64) (*Invocation , error) | |
| 88 | |
| 89 // GetInvocationsByNonce returns a list of Invocations with given nonce. | |
| 90 // | |
| 91 // Invocation nonce is a random number that identifies an intent to star t | |
| 92 // an invocation. Normally one nonce corresponds to one Invocation entit y, | |
| 93 // but there can be more if job fails to start with a transient error. | |
| 94 GetInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error) | |
| 95 | |
| 96 // UpdateProjectJobs adds new, removes old and updates existing jobs. | 145 // UpdateProjectJobs adds new, removes old and updates existing jobs. |
| 97 UpdateProjectJobs(c context.Context, projectID string, defs []catalog.De finition) error | 146 UpdateProjectJobs(c context.Context, projectID string, defs []catalog.De finition) error |
| 98 | 147 |
| 99 // ResetAllJobsOnDevServer forcefully resets state of all enabled jobs. | 148 // ResetAllJobsOnDevServer forcefully resets state of all enabled jobs. |
| 100 // Supposed to be used only on devserver, where task queue stub state is not | 149 // Supposed to be used only on devserver, where task queue stub state is not |
| 101 // preserved between appserver restarts and it messes everything. | 150 // preserved between appserver restarts and it messes everything. |
| 102 ResetAllJobsOnDevServer(c context.Context) error | 151 ResetAllJobsOnDevServer(c context.Context) error |
| 103 | 152 |
| 104 // ExecuteSerializedAction is called via a task queue to execute an acti on | 153 // ExecuteSerializedAction is called via a task queue to execute an acti on |
| 105 // produced by job state machine transition. These actions are POSTed | 154 // produced by job state machine transition. These actions are POSTed |
| 106 // to TimersQueue and InvocationsQueue defined in Config by Engine. | 155 // to TimersQueue and InvocationsQueue defined in Config by Engine. |
| 107 // 'retryCount' is 0 on first attempt, 1 if task queue service retries | 156 // 'retryCount' is 0 on first attempt, 1 if task queue service retries |
| 108 // request once, 2 - if twice, and so on. Returning transient errors her e | 157 // request once, 2 - if twice, and so on. Returning transient errors her e |
| 109 // causes the task queue to retry the task. | 158 // causes the task queue to retry the task. |
| 110 ExecuteSerializedAction(c context.Context, body []byte, retryCount int) error | 159 ExecuteSerializedAction(c context.Context, body []byte, retryCount int) error |
| 111 | 160 |
| 112 // ProcessPubSubPush is called whenever incoming PubSub message is recei ved. | 161 // ProcessPubSubPush is called whenever incoming PubSub message is recei ved. |
| 113 ProcessPubSubPush(c context.Context, body []byte) error | 162 ProcessPubSubPush(c context.Context, body []byte) error |
| 114 | 163 |
| 115 // PullPubSubOnDevServer is called on dev server to pull messages from P ubSub | 164 // PullPubSubOnDevServer is called on dev server to pull messages from P ubSub |
| 116 // subscription associated with given publisher. | 165 // subscription associated with given publisher. |
| 117 // | 166 // |
| 118 // It is needed to be able to manually tests PubSub related workflows on dev | 167 // It is needed to be able to manually tests PubSub related workflows on dev |
| 119 // server, since dev server can't accept PubSub push messages. | 168 // server, since dev server can't accept PubSub push messages. |
| 120 PullPubSubOnDevServer(c context.Context, taskManagerName, publisher stri ng) error | 169 PullPubSubOnDevServer(c context.Context, taskManagerName, publisher stri ng) error |
| 121 | 170 |
| 122 » // TriggerInvocation launches job invocation right now if job isn't runn ing | 171 » PublicAPI() Engine |
|
Vadim Sh.
2017/08/04 23:07:27
document
tandrii(chromium)
2017/08/07 12:48:17
Done.
| |
| 123 » // now. Used by "Run now" UI button. | |
| 124 » // | |
| 125 » // Returns new invocation nonce (a random number that identifies an inte nt to | |
| 126 » // start an invocation). Normally one nonce corresponds to one Invocatio n | |
| 127 » // entity, but there can be more if job fails to start with a transient error. | |
| 128 » TriggerInvocation(c context.Context, jobID string, triggeredBy identity. Identity) (int64, error) | |
| 129 | |
| 130 » // PauseJob replaces job's schedule with "triggered", effectively preven ting | |
| 131 » // it from running automatically (until it is resumed). Manual invocatio ns are | |
| 132 » // still allowed. Does nothing if job is already paused. Any pending or | |
| 133 » // running invocations are still executed. | |
| 134 » PauseJob(c context.Context, jobID string, who identity.Identity) error | |
| 135 | |
| 136 » // ResumeJob resumes paused job. Doesn't nothing if the job is not pause d. | |
| 137 » ResumeJob(c context.Context, jobID string, who identity.Identity) error | |
| 138 | |
| 139 » // AbortInvocation forcefully moves the invocation to failed state. | |
| 140 » // | |
| 141 » // It opportunistically tries to send "abort" signal to a job runner if it | |
| 142 » // supports cancellation, but it doesn't wait for reply. It proceeds to | |
| 143 » // modifying local state in the scheduler service datastore immediately. | |
| 144 » // | |
| 145 » // AbortInvocation can be used to manually "unstuck" jobs that got stuck due | |
| 146 » // to missing PubSub notifications or other kinds of unexpected conditio ns. | |
| 147 » // | |
| 148 » // Does nothing if invocation is already in some final state. | |
| 149 » AbortInvocation(c context.Context, jobID string, invID int64, who identi ty.Identity) error | |
| 150 | |
| 151 » // AbortJob resets the job to scheduled state, aborting a currently pend ing or | |
| 152 » // running invocation (if any). | |
| 153 » // | |
| 154 » // Returns nil if the job is not currently running. | |
| 155 » AbortJob(c context.Context, jobID string, who identity.Identity) error | |
| 156 } | 172 } |
| 157 | 173 |
| 158 // Config contains parameters for the engine. | 174 // Config contains parameters for the engine. |
| 159 type Config struct { | 175 type Config struct { |
| 160 Catalog catalog.Catalog // provides task.Manager's to run t asks | 176 Catalog catalog.Catalog // provides task.Manager's to run t asks |
| 161 TimersQueuePath string // URL of a task queue handler for timer ticks | 177 TimersQueuePath string // URL of a task queue handler for timer ticks |
| 162 TimersQueueName string // queue name for timer ticks | 178 TimersQueueName string // queue name for timer ticks |
| 163 InvocationsQueuePath string // URL of a task queue handler that starts jobs | 179 InvocationsQueuePath string // URL of a task queue handler that starts jobs |
| 164 InvocationsQueueName string // queue name for job starts | 180 InvocationsQueueName string // queue name for job starts |
| 165 PubSubPushPath string // URL to use in PubSub push config | 181 PubSubPushPath string // URL to use in PubSub push config |
| 166 } | 182 } |
| 167 | 183 |
| 168 // NewEngine returns default implementation of Engine. | 184 // NewEngine returns default implementation of EngineInternal. |
| 169 func NewEngine(conf Config) Engine { | 185 func NewEngine(conf Config) EngineInternal { |
| 170 return &engineImpl{ | 186 return &engineImpl{ |
| 171 Config: conf, | 187 Config: conf, |
| 172 doneFlags: make(map[string]bool), | 188 doneFlags: make(map[string]bool), |
| 173 } | 189 } |
| 174 } | 190 } |
| 175 | 191 |
| 176 //// Implementation. | 192 //// Implementation. |
| 177 | 193 |
| 178 const ( | 194 const ( |
| 179 // invocationRetryLimit is how many times to retry an invocation before giving | 195 // invocationRetryLimit is how many times to retry an invocation before giving |
| (...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 262 // an appropriate revision. | 278 // an appropriate revision. |
| 263 RevisionURL string `gae:",noindex"` | 279 RevisionURL string `gae:",noindex"` |
| 264 | 280 |
| 265 // Schedule is the job's schedule in regular cron expression format. | 281 // Schedule is the job's schedule in regular cron expression format. |
| 266 Schedule string `gae:",noindex"` | 282 Schedule string `gae:",noindex"` |
| 267 | 283 |
| 268 // Task is the job's payload in serialized form. Opaque from the point o f view | 284 // Task is the job's payload in serialized form. Opaque from the point o f view |
| 269 // of the engine. See Catalog.UnmarshalTask(). | 285 // of the engine. See Catalog.UnmarshalTask(). |
| 270 Task []byte `gae:",noindex"` | 286 Task []byte `gae:",noindex"` |
| 271 | 287 |
| 288 // ACLs are the latest ACLs applied to Job and all its invocations. | |
| 289 Acls acl.GrantsByRole `gae:",noindex"` | |
| 290 | |
| 272 // State is the job's state machine state, see StateMachine. | 291 // State is the job's state machine state, see StateMachine. |
| 273 State JobState | 292 State JobState |
| 274 } | 293 } |
| 275 | 294 |
| 276 // GetJobName returns name of this Job as defined its project's config. | 295 // GetJobName returns name of this Job as defined its project's config. |
| 277 func (e *Job) GetJobName() string { | 296 func (e *Job) GetJobName() string { |
| 278 // JobID has form <project>/<id>. Split it into components. | 297 // JobID has form <project>/<id>. Split it into components. |
| 279 chunks := strings.Split(e.JobID, "/") | 298 chunks := strings.Split(e.JobID, "/") |
| 280 return chunks[1] | 299 return chunks[1] |
| 281 } | 300 } |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 321 e.State == other.State) | 340 e.State == other.State) |
| 322 } | 341 } |
| 323 | 342 |
| 324 // matches returns true if job definition in the entity matches the one | 343 // matches returns true if job definition in the entity matches the one |
| 325 // specified by catalog.Definition struct. UpdateProjectJobs skips updates for | 344 // specified by catalog.Definition struct. UpdateProjectJobs skips updates for |
| 326 // such jobs (assuming they are up-to-date). | 345 // such jobs (assuming they are up-to-date). |
| 327 func (e *Job) matches(def catalog.Definition) bool { | 346 func (e *Job) matches(def catalog.Definition) bool { |
| 328 return e.JobID == def.JobID && | 347 return e.JobID == def.JobID && |
| 329 e.Flavor == def.Flavor && | 348 e.Flavor == def.Flavor && |
| 330 e.Schedule == def.Schedule && | 349 e.Schedule == def.Schedule && |
| 350 e.Acls.Equal(&def.Acls) && | |
| 331 bytes.Equal(e.Task, def.Task) | 351 bytes.Equal(e.Task, def.Task) |
| 332 } | 352 } |
| 333 | 353 |
| 334 // Invocation entity stores single attempt to run a job. Its parent entity | 354 // Invocation entity stores single attempt to run a job. Its parent entity |
| 335 // is corresponding Job, its ID is generated based on time. | 355 // is corresponding Job, its ID is generated based on time. |
| 336 type Invocation struct { | 356 type Invocation struct { |
| 337 _kind string `gae:"$kind,Invocation"` | 357 _kind string `gae:"$kind,Invocation"` |
| 338 _extra ds.PropertyMap `gae:"-,extra"` | 358 _extra ds.PropertyMap `gae:"-,extra"` |
| 339 | 359 |
| 340 // ID is identifier of this particular attempt to run a job. Multiple at tempts | 360 // ID is identifier of this particular attempt to run a job. Multiple at tempts |
| (...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 531 | 551 |
| 532 return 0, errors.New("could not find available invocationID after 10 att empts") | 552 return 0, errors.New("could not find available invocationID after 10 att empts") |
| 533 } | 553 } |
| 534 | 554 |
| 535 // debugLog mutates a string by appending a line to it. | 555 // debugLog mutates a string by appending a line to it. |
| 536 func debugLog(c context.Context, str *string, format string, args ...interface{} ) { | 556 func debugLog(c context.Context, str *string, format string, args ...interface{} ) { |
| 537 prefix := clock.Now(c).UTC().Format("[15:04:05.000] ") | 557 prefix := clock.Now(c).UTC().Format("[15:04:05.000] ") |
| 538 *str += prefix + fmt.Sprintf(format+"\n", args...) | 558 *str += prefix + fmt.Sprintf(format+"\n", args...) |
| 539 } | 559 } |
| 540 | 560 |
| 541 //// | 561 //////////////////////////////////////////////////////////////////////////////// |
| 562 // engineImpl. | |
| 542 | 563 |
| 543 type engineImpl struct { | 564 type engineImpl struct { |
| 544 Config | 565 Config |
| 545 | 566 |
| 546 lock sync.Mutex | 567 lock sync.Mutex |
| 547 doneFlags map[string]bool // see doIfNotDone | 568 doneFlags map[string]bool // see doIfNotDone |
| 548 | 569 |
| 549 // configureTopic is used by prepareTopic, mocked in tests. | 570 // configureTopic is used by prepareTopic, mocked in tests. |
| 550 configureTopic func(c context.Context, topic, sub, pushURL, publisher st ring) error | 571 configureTopic func(c context.Context, topic, sub, pushURL, publisher st ring) error |
| 551 } | 572 } |
| (...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 612 // Filter out duplicates, sort. | 633 // Filter out duplicates, sort. |
| 613 projects := stringset.New(len(entities)) | 634 projects := stringset.New(len(entities)) |
| 614 for _, ent := range entities { | 635 for _, ent := range entities { |
| 615 projects.Add(ent.ProjectID) | 636 projects.Add(ent.ProjectID) |
| 616 } | 637 } |
| 617 out := projects.ToSlice() | 638 out := projects.ToSlice() |
| 618 sort.Strings(out) | 639 sort.Strings(out) |
| 619 return out, nil | 640 return out, nil |
| 620 } | 641 } |
| 621 | 642 |
| 622 func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) { | 643 func (e *engineImpl) GetVisibleJobs(c context.Context) ([]*Job, error) { |
| 623 q := ds.NewQuery("Job").Eq("Enabled", true) | 644 q := ds.NewQuery("Job").Eq("Enabled", true) |
| 624 » return e.queryEnabledJobs(c, q) | 645 » return e.queryEnabledVisibleJobs(c, q) |
| 625 } | 646 } |
| 626 | 647 |
| 627 func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job , error) { | 648 func (e *engineImpl) GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, error) { |
| 628 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) | 649 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) |
| 629 » return e.queryEnabledJobs(c, q) | 650 » return e.queryEnabledVisibleJobs(c, q) |
| 630 } | 651 } |
| 631 | 652 |
| 632 func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e rror) { | 653 func (e *engineImpl) queryEnabledVisibleJobs(c context.Context, q *ds.Query) ([] *Job, error) { |
| 633 entities := []*Job{} | 654 entities := []*Job{} |
| 634 if err := ds.GetAll(c, q, &entities); err != nil { | 655 if err := ds.GetAll(c, q, &entities); err != nil { |
| 635 return nil, transient.Tag.Apply(err) | 656 return nil, transient.Tag.Apply(err) |
| 636 } | 657 } |
| 637 // Non-ancestor query used, need to recheck filters. | 658 // Non-ancestor query used, need to recheck filters. |
| 638 filtered := make([]*Job, 0, len(entities)) | 659 filtered := make([]*Job, 0, len(entities)) |
| 639 for _, job := range entities { | 660 for _, job := range entities { |
| 640 » » if job.Enabled { | 661 » » if !job.Enabled { |
| 662 » » » continue | |
| 663 » » } | |
| 664 » » // TODO(tandrii): improve batch ACLs check here to take advantag e of likely | |
| 665 » » // shared ACLs between most jobs of the same project. | |
| 666 » » if ok, err := job.Acls.IsReader(c); err != nil { | |
| 667 » » » return nil, transient.Tag.Apply(err) | |
| 668 » » } else if ok { | |
| 641 filtered = append(filtered, job) | 669 filtered = append(filtered, job) |
| 642 } | 670 } |
| 643 } | 671 } |
| 644 return filtered, nil | 672 return filtered, nil |
| 645 } | 673 } |
| 646 | 674 |
| 647 func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { | 675 func (e *engineImpl) GetVisibleJob(c context.Context, jobID string) (*Job, error ) { |
| 676 » job, err := e.getJob(c, jobID) | |
| 677 » if err != nil { | |
| 678 » » return nil, err | |
| 679 » } else if job == nil { | |
| 680 » » return nil, ErrNoSuchJob | |
| 681 » } | |
| 682 » if ok, err := job.Acls.IsReader(c); err != nil { | |
| 683 » » return nil, err | |
| 684 » } else if !ok { | |
| 685 » » return nil, ErrNoSuchJob | |
| 686 » } | |
| 687 » return job, err | |
|
Vadim Sh.
2017/08/04 23:07:26
nit: return job, nil
tandrii(chromium)
2017/08/07 12:48:17
Done.
| |
| 688 } | |
| 689 | |
| 690 func (e *engineImpl) getOwnedJob(c context.Context, jobID string) (*Job, error) { | |
| 691 » job, err := e.getJob(c, jobID) | |
| 692 » if err != nil { | |
| 693 » » return nil, err | |
| 694 » } else if job == nil { | |
| 695 » » return nil, ErrNoSuchJob | |
| 696 » } | |
| 697 | |
| 698 » switch owner, err := job.Acls.IsOwner(c); { | |
| 699 » case err != nil: | |
| 700 » » return nil, err | |
| 701 » case owner: | |
| 702 » » return job, nil | |
| 703 » } | |
| 704 | |
| 705 » // Not owner, but maybe reader? Give nicer error in such case. | |
| 706 » switch reader, err := job.Acls.IsReader(c); { | |
| 707 » case err != nil: | |
| 708 » » return nil, err | |
| 709 » case reader: | |
| 710 » » return nil, ErrNoOwnerPermission | |
| 711 » default: | |
| 712 » » return nil, ErrNoSuchJob | |
| 713 » } | |
| 714 } | |
| 715 | |
| 716 func (e *engineImpl) getJob(c context.Context, jobID string) (*Job, error) { | |
| 648 job := &Job{JobID: jobID} | 717 job := &Job{JobID: jobID} |
| 649 switch err := ds.Get(c, job); { | 718 switch err := ds.Get(c, job); { |
| 650 case err == nil: | 719 case err == nil: |
| 651 return job, nil | 720 return job, nil |
| 652 case err == ds.ErrNoSuchEntity: | 721 case err == ds.ErrNoSuchEntity: |
| 653 return nil, nil | 722 return nil, nil |
| 654 default: | 723 default: |
| 655 return nil, transient.Tag.Apply(err) | 724 return nil, transient.Tag.Apply(err) |
| 656 } | 725 } |
| 657 } | 726 } |
| 658 | 727 |
| 659 func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i nt, cursor string) ([]*Invocation, string, error) { | 728 func (e *engineImpl) PublicAPI() Engine { |
| 729 » return e | |
| 730 } | |
| 731 | |
| 732 func (e *engineImpl) ListVisibleInvocations(c context.Context, jobID string, pag eSize int, cursor string) ([]*Invocation, string, error) { | |
| 733 » if job, err := e.GetVisibleJob(c, jobID); err != nil { | |
|
Vadim Sh.
2017/08/04 23:07:27
strictly speaking, it would be better if we put AC
tandrii(chromium)
2017/08/07 12:48:17
First, since API and UI QPS is indeed negligible,
Vadim Sh.
2017/08/07 22:53:21
Sure, it was just a general thought. I don't think
| |
| 734 » » return nil, "", err | |
| 735 » } else if job == nil { | |
| 736 » » // Either no Job or no access, both imply returning no invocatio ns. | |
| 737 » » // return []*Invocation{}, "", nil | |
| 738 » » return nil, "", ErrNoSuchInvocation | |
| 739 » } | |
| 740 | |
| 660 if pageSize == 0 || pageSize > 500 { | 741 if pageSize == 0 || pageSize > 500 { |
| 661 pageSize = 500 | 742 pageSize = 500 |
| 662 } | 743 } |
| 663 | 744 |
| 664 // Deserialize the cursor. | 745 // Deserialize the cursor. |
| 665 var cursorObj ds.Cursor | 746 var cursorObj ds.Cursor |
| 666 if cursor != "" { | 747 if cursor != "" { |
| 667 var err error | 748 var err error |
| 668 cursorObj, err = ds.DecodeCursor(c, cursor) | 749 cursorObj, err = ds.DecodeCursor(c, cursor) |
| 669 if err != nil { | 750 if err != nil { |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 681 } | 762 } |
| 682 | 763 |
| 683 // Fetch pageSize worth of invocations, then grab the cursor. | 764 // Fetch pageSize worth of invocations, then grab the cursor. |
| 684 out := make([]*Invocation, 0, pageSize) | 765 out := make([]*Invocation, 0, pageSize) |
| 685 var newCursor string | 766 var newCursor string |
| 686 err := ds.Run(c, q, func(obj *Invocation, getCursor ds.CursorCB) error { | 767 err := ds.Run(c, q, func(obj *Invocation, getCursor ds.CursorCB) error { |
| 687 out = append(out, obj) | 768 out = append(out, obj) |
| 688 if len(out) < pageSize { | 769 if len(out) < pageSize { |
| 689 return nil | 770 return nil |
| 690 } | 771 } |
| 691 » » c, err := getCursor() | 772 » » cur, err := getCursor() |
| 692 if err != nil { | 773 if err != nil { |
| 693 return err | 774 return err |
| 694 } | 775 } |
| 695 » » newCursor = c.String() | 776 » » newCursor = cur.String() |
| 696 return ds.Stop | 777 return ds.Stop |
| 697 }) | 778 }) |
| 698 if err != nil { | 779 if err != nil { |
| 699 return nil, "", transient.Tag.Apply(err) | 780 return nil, "", transient.Tag.Apply(err) |
| 700 } | 781 } |
| 701 return out, newCursor, nil | 782 return out, newCursor, nil |
| 702 } | 783 } |
| 703 | 784 |
| 704 func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) { | 785 func (e *engineImpl) GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) { |
| 786 » switch _, err := e.GetVisibleJob(c, jobID); { | |
| 787 » case err == ErrNoSuchJob: | |
| 788 » » return nil, ErrNoSuchInvocation | |
| 789 » case err != nil: | |
| 790 » » return nil, err | |
| 791 » default: | |
| 792 » » return e.getInvocation(c, jobID, invID) | |
| 793 » } | |
| 794 } | |
| 795 | |
| 796 func (e *engineImpl) getInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) { | |
| 705 inv := &Invocation{ | 797 inv := &Invocation{ |
| 706 ID: invID, | 798 ID: invID, |
| 707 JobKey: ds.NewKey(c, "Job", jobID, 0, nil), | 799 JobKey: ds.NewKey(c, "Job", jobID, 0, nil), |
| 708 } | 800 } |
| 709 switch err := ds.Get(c, inv); { | 801 switch err := ds.Get(c, inv); { |
| 710 case err == nil: | 802 case err == nil: |
| 711 return inv, nil | 803 return inv, nil |
| 712 case err == ds.ErrNoSuchEntity: | 804 case err == ds.ErrNoSuchEntity: |
| 713 return nil, nil | 805 return nil, nil |
| 714 default: | 806 default: |
| 715 return nil, transient.Tag.Apply(err) | 807 return nil, transient.Tag.Apply(err) |
| 716 } | 808 } |
| 717 } | 809 } |
| 718 | 810 |
| 719 func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([ ]*Invocation, error) { | 811 func (e *engineImpl) GetVisibleInvocationsByNonce(c context.Context, invNonce in t64) ([]*Invocation, error) { |
| 720 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) | 812 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) |
| 721 entities := []*Invocation{} | 813 entities := []*Invocation{} |
| 722 if err := ds.GetAll(c, q, &entities); err != nil { | 814 if err := ds.GetAll(c, q, &entities); err != nil { |
| 723 return nil, transient.Tag.Apply(err) | 815 return nil, transient.Tag.Apply(err) |
| 724 } | 816 } |
| 817 if len(entities) > 0 { | |
| 818 // All Invocations with the same nonce must belong to the same J ob. | |
| 819 switch _, err := e.GetVisibleJob(c, entities[0].JobKey.StringID( )); { | |
| 820 case err == ErrNoSuchJob: | |
| 821 return []*Invocation{}, nil | |
| 822 case err != nil: | |
| 823 return nil, err | |
| 824 } | |
| 825 } | |
| 725 return entities, nil | 826 return entities, nil |
| 726 } | 827 } |
| 727 | 828 |
| 728 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error { | 829 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error { |
| 729 // JobID -> *Job map. | 830 // JobID -> *Job map. |
| 730 existing, err := e.getProjectJobs(c, projectID) | 831 existing, err := e.getProjectJobs(c, projectID) |
| 731 if err != nil { | 832 if err != nil { |
| 732 return err | 833 return err |
| 733 } | 834 } |
| 734 // JobID -> new definition revision map. | 835 // JobID -> new definition revision map. |
| (...skipping 309 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1044 | 1145 |
| 1045 func (e *engineImpl) executeInvAction(c context.Context, payload *actionTaskPayl oad, retryCount int) error { | 1146 func (e *engineImpl) executeInvAction(c context.Context, payload *actionTaskPayl oad, retryCount int) error { |
| 1046 switch { | 1147 switch { |
| 1047 case payload.InvTimer != nil: | 1148 case payload.InvTimer != nil: |
| 1048 return e.invocationTimerTick(c, payload.JobID, payload.InvID, pa yload.InvTimer) | 1149 return e.invocationTimerTick(c, payload.JobID, payload.InvID, pa yload.InvTimer) |
| 1049 default: | 1150 default: |
| 1050 return fmt.Errorf("unexpected invocation action kind %q", payloa d) | 1151 return fmt.Errorf("unexpected invocation action kind %q", payloa d) |
| 1051 } | 1152 } |
| 1052 } | 1153 } |
| 1053 | 1154 |
| 1054 func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere dBy identity.Identity) (int64, error) { | 1155 func (e *engineImpl) TriggerInvocation(c context.Context, jobID string) (int64, error) { |
| 1156 » // First, we check ACLs. | |
| 1157 » if _, err := e.getOwnedJob(c, jobID); err != nil { | |
| 1158 » » return 0, err | |
| 1159 » } | |
| 1160 | |
| 1055 var err error | 1161 var err error |
| 1056 var invNonce int64 | 1162 var invNonce int64 |
| 1057 err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) er ror { | 1163 err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) er ror { |
| 1058 if isNew { | 1164 if isNew { |
| 1059 err = ErrNoSuchJob | 1165 err = ErrNoSuchJob |
| 1060 return errSkipPut | 1166 return errSkipPut |
| 1061 } | 1167 } |
| 1062 if !job.Enabled { | 1168 if !job.Enabled { |
| 1063 err = errors.New("the job is disabled") | 1169 err = errors.New("the job is disabled") |
| 1064 return errSkipPut | 1170 return errSkipPut |
| 1065 } | 1171 } |
| 1066 invNonce = 0 | 1172 invNonce = 0 |
| 1067 return e.rollSM(c, job, func(sm *StateMachine) error { | 1173 return e.rollSM(c, job, func(sm *StateMachine) error { |
| 1068 » » » if err := sm.OnManualInvocation(triggeredBy); err != nil { | 1174 » » » if err := sm.OnManualInvocation(auth.CurrentIdentity(c)) ; err != nil { |
| 1069 return err | 1175 return err |
| 1070 } | 1176 } |
| 1071 invNonce = sm.State.InvocationNonce | 1177 invNonce = sm.State.InvocationNonce |
| 1072 return nil | 1178 return nil |
| 1073 }) | 1179 }) |
| 1074 }) | 1180 }) |
| 1075 if err == nil { | 1181 if err == nil { |
| 1076 err = err2 | 1182 err = err2 |
| 1077 } | 1183 } |
| 1078 return invNonce, err | 1184 return invNonce, err |
| 1079 } | 1185 } |
| 1080 | 1186 |
| 1081 func (e *engineImpl) PauseJob(c context.Context, jobID string, who identity.Iden tity) error { | 1187 func (e *engineImpl) PauseJob(c context.Context, jobID string) error { |
| 1082 » return e.setPausedFlag(c, jobID, true, who) | 1188 » // First, we check ACLs. |
| 1189 » if _, err := e.getOwnedJob(c, jobID); err != nil { | |
|
Vadim Sh.
2017/08/04 23:07:26
why not move it into 'setPausedFlag'?
tandrii(chromium)
2017/08/07 12:48:17
Good point. Done.
| |
| 1190 » » return err | |
| 1191 » } | |
| 1192 » return e.setPausedFlag(c, jobID, true, auth.CurrentIdentity(c)) | |
| 1083 } | 1193 } |
| 1084 | 1194 |
| 1085 func (e *engineImpl) ResumeJob(c context.Context, jobID string, who identity.Ide ntity) error { | 1195 func (e *engineImpl) ResumeJob(c context.Context, jobID string) error { |
| 1086 » return e.setPausedFlag(c, jobID, false, who) | 1196 » // First, we check ACLs. |
| 1197 » if _, err := e.getOwnedJob(c, jobID); err != nil { | |
| 1198 » » return err | |
| 1199 » } | |
| 1200 » return e.setPausedFlag(c, jobID, false, auth.CurrentIdentity(c)) | |
| 1087 } | 1201 } |
| 1088 | 1202 |
| 1089 func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool, who identity.Identity) error { | 1203 func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool, who identity.Identity) error { |
| 1090 return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or { | 1204 return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or { |
| 1091 if isNew || !job.Enabled { | 1205 if isNew || !job.Enabled { |
| 1092 return ErrNoSuchJob | 1206 return ErrNoSuchJob |
| 1093 } | 1207 } |
| 1094 if job.Paused == paused { | 1208 if job.Paused == paused { |
| 1095 return errSkipPut | 1209 return errSkipPut |
| 1096 } | 1210 } |
| 1097 if paused { | 1211 if paused { |
| 1098 logging.Warningf(c, "Job is paused by %s", who) | 1212 logging.Warningf(c, "Job is paused by %s", who) |
| 1099 } else { | 1213 } else { |
| 1100 logging.Warningf(c, "Job is resumed by %s", who) | 1214 logging.Warningf(c, "Job is resumed by %s", who) |
| 1101 } | 1215 } |
| 1102 job.Paused = paused | 1216 job.Paused = paused |
| 1103 return e.rollSM(c, job, func(sm *StateMachine) error { | 1217 return e.rollSM(c, job, func(sm *StateMachine) error { |
| 1104 sm.OnScheduleChange() | 1218 sm.OnScheduleChange() |
| 1105 return nil | 1219 return nil |
| 1106 }) | 1220 }) |
| 1107 }) | 1221 }) |
| 1108 } | 1222 } |
| 1109 | 1223 |
| 1110 func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6 4, who identity.Identity) error { | 1224 func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6 4) error { |
| 1225 » if _, err := e.getOwnedJob(c, jobID); err != nil { | |
| 1226 » » return err | |
| 1227 » } | |
| 1228 » return e.abortInvocation(c, jobID, invID) | |
| 1229 } | |
| 1230 | |
| 1231 func (e *engineImpl) abortInvocation(c context.Context, jobID string, invID int6 4) error { | |
| 1111 c = logging.SetField(c, "JobID", jobID) | 1232 c = logging.SetField(c, "JobID", jobID) |
| 1112 c = logging.SetField(c, "InvID", invID) | 1233 c = logging.SetField(c, "InvID", invID) |
| 1113 | 1234 |
| 1114 var inv *Invocation | 1235 var inv *Invocation |
| 1115 var err error | 1236 var err error |
| 1116 » switch inv, err = e.GetInvocation(c, jobID, invID); { | 1237 » switch inv, err = e.getInvocation(c, jobID, invID); { |
| 1117 case err != nil: | 1238 case err != nil: |
| 1118 logging.Errorf(c, "Failed to fetch the invocation - %s", err) | 1239 logging.Errorf(c, "Failed to fetch the invocation - %s", err) |
| 1119 return err | 1240 return err |
| 1120 case inv == nil: | 1241 case inv == nil: |
| 1121 logging.Errorf(c, "The invocation doesn't exist") | 1242 logging.Errorf(c, "The invocation doesn't exist") |
| 1122 return ErrNoSuchInvocation | 1243 return ErrNoSuchInvocation |
| 1123 case inv.Status.Final(): | 1244 case inv.Status.Final(): |
| 1124 return nil | 1245 return nil |
| 1125 } | 1246 } |
| 1126 | 1247 |
| 1127 ctl, err := e.controllerForInvocation(c, inv) | 1248 ctl, err := e.controllerForInvocation(c, inv) |
| 1128 if err != nil { | 1249 if err != nil { |
| 1129 logging.Errorf(c, "Cannot get controller - %s", err) | 1250 logging.Errorf(c, "Cannot get controller - %s", err) |
| 1130 return err | 1251 return err |
| 1131 } | 1252 } |
| 1132 | 1253 |
| 1133 » ctl.DebugLog("Invocation is manually aborted by %q", who) | 1254 » ctl.DebugLog("Invocation is manually aborted by %q", auth.CurrentUser(c) ) |
| 1134 if err = ctl.manager.AbortTask(c, ctl); err != nil { | 1255 if err = ctl.manager.AbortTask(c, ctl); err != nil { |
| 1135 logging.Errorf(c, "Failed to abort the task - %s", err) | 1256 logging.Errorf(c, "Failed to abort the task - %s", err) |
| 1136 return err | 1257 return err |
| 1137 } | 1258 } |
| 1138 | 1259 |
| 1139 ctl.State().Status = task.StatusAborted | 1260 ctl.State().Status = task.StatusAborted |
| 1140 if err = ctl.Save(c); err != nil { | 1261 if err = ctl.Save(c); err != nil { |
| 1141 logging.Errorf(c, "Failed to save the invocation - %s", err) | 1262 logging.Errorf(c, "Failed to save the invocation - %s", err) |
| 1142 return err | 1263 return err |
| 1143 } | 1264 } |
| 1144 return nil | 1265 return nil |
| 1145 } | 1266 } |
| 1146 | 1267 |
| 1147 // AbortJob resets the job to scheduled state, aborting a currently pending or | 1268 // AbortJob resets the job to scheduled state, aborting a currently pending or |
| 1148 // running invocation (if any). | 1269 // running invocation (if any). |
| 1149 // | 1270 // |
| 1150 // Returns nil if the job is not currently running. | 1271 // Returns nil if the job is not currently running. |
| 1151 func (e *engineImpl) AbortJob(c context.Context, jobID string, who identity.Iden tity) error { | 1272 func (e *engineImpl) AbortJob(c context.Context, jobID string) error { |
| 1152 » // First we switch the job to the default state and disassociate the run ning | 1273 » // First, we check ACLs. |
| 1274 » if _, err := e.getOwnedJob(c, jobID); err != nil { | |
| 1275 » » return err | |
| 1276 » } | |
| 1277 » // Second, we switch the job to the default state and disassociate the r unning | |
| 1153 // invocation (if any) from the job entity. | 1278 // invocation (if any) from the job entity. |
| 1154 var invID int64 | 1279 var invID int64 |
| 1155 err := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or { | 1280 err := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or { |
| 1156 if isNew { | 1281 if isNew { |
| 1157 return errSkipPut // the job was removed, nothing to abo rt | 1282 return errSkipPut // the job was removed, nothing to abo rt |
| 1158 } | 1283 } |
| 1159 invID = job.State.InvocationID | 1284 invID = job.State.InvocationID |
| 1160 return e.rollSM(c, job, func(sm *StateMachine) error { | 1285 return e.rollSM(c, job, func(sm *StateMachine) error { |
| 1161 sm.OnManualAbort() | 1286 sm.OnManualAbort() |
| 1162 return nil | 1287 return nil |
| 1163 }) | 1288 }) |
| 1164 }) | 1289 }) |
| 1165 if err != nil { | 1290 if err != nil { |
| 1166 return err | 1291 return err |
| 1167 } | 1292 } |
| 1168 | 1293 |
| 1169 // Now we kill the invocation. We do it separately because it may involv e | 1294 // Now we kill the invocation. We do it separately because it may involv e |
| 1170 // an RPC to remote service (e.g. to cancel a task) that can't be done f rom | 1295 // an RPC to remote service (e.g. to cancel a task) that can't be done f rom |
| 1171 // the transaction. | 1296 // the transaction. |
| 1172 if invID != 0 { | 1297 if invID != 0 { |
| 1173 » » return e.AbortInvocation(c, jobID, invID, who) | 1298 » » return e.abortInvocation(c, jobID, invID) |
| 1174 } | 1299 } |
| 1175 return nil | 1300 return nil |
| 1176 } | 1301 } |
| 1177 | 1302 |
| 1178 // updateJob updates an existing job if its definition has changed, adds | 1303 // updateJob updates an existing job if its definition has changed, adds |
| 1179 // a completely new job or enables a previously disabled job. | 1304 // a completely new job or enables a previously disabled job. |
| 1180 func (e *engineImpl) updateJob(c context.Context, def catalog.Definition) error { | 1305 func (e *engineImpl) updateJob(c context.Context, def catalog.Definition) error { |
| 1181 return e.txn(c, def.JobID, func(c context.Context, job *Job, isNew bool) error { | 1306 return e.txn(c, def.JobID, func(c context.Context, job *Job, isNew bool) error { |
| 1182 if !isNew && job.Enabled && job.matches(def) { | 1307 if !isNew && job.Enabled && job.matches(def) { |
| 1183 return errSkipPut | 1308 return errSkipPut |
| (...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1310 } | 1435 } |
| 1311 | 1436 |
| 1312 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks. | 1437 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks. |
| 1313 // | 1438 // |
| 1314 // See also handlePubSubMessage, it is quite similar. | 1439 // See also handlePubSubMessage, it is quite similar. |
| 1315 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID int64, timer *invocationTimer) error { | 1440 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID int64, timer *invocationTimer) error { |
| 1316 c = logging.SetField(c, "JobID", jobID) | 1441 c = logging.SetField(c, "JobID", jobID) |
| 1317 c = logging.SetField(c, "InvID", invID) | 1442 c = logging.SetField(c, "InvID", invID) |
| 1318 | 1443 |
| 1319 logging.Infof(c, "Handling invocation timer %q", timer.Name) | 1444 logging.Infof(c, "Handling invocation timer %q", timer.Name) |
| 1320 » inv, err := e.GetInvocation(c, jobID, invID) | 1445 » inv, err := e.getInvocation(c, jobID, invID) |
| 1321 if err != nil { | 1446 if err != nil { |
| 1322 logging.Errorf(c, "Failed to fetch the invocation - %s", err) | 1447 logging.Errorf(c, "Failed to fetch the invocation - %s", err) |
| 1323 return err | 1448 return err |
| 1324 } | 1449 } |
| 1325 if inv == nil { | 1450 if inv == nil { |
| 1326 return ErrNoSuchInvocation | 1451 return ErrNoSuchInvocation |
| 1327 } | 1452 } |
| 1328 | 1453 |
| 1329 // Finished invocations are immutable, skip the message. | 1454 // Finished invocations are immutable, skip the message. |
| 1330 if inv.Status.Final() { | 1455 if inv.Status.Final() { |
| (...skipping 364 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1695 return err | 1820 return err |
| 1696 } | 1821 } |
| 1697 jobID = data["job"] | 1822 jobID = data["job"] |
| 1698 if invID, err = strconv.ParseInt(data["inv"], 10, 64); err != nil { | 1823 if invID, err = strconv.ParseInt(data["inv"], 10, 64); err != nil { |
| 1699 logging.Errorf(c, "Could not parse 'inv' %q - %s", data["inv"], err) | 1824 logging.Errorf(c, "Could not parse 'inv' %q - %s", data["inv"], err) |
| 1700 return err | 1825 return err |
| 1701 } | 1826 } |
| 1702 | 1827 |
| 1703 c = logging.SetField(c, "JobID", jobID) | 1828 c = logging.SetField(c, "JobID", jobID) |
| 1704 c = logging.SetField(c, "InvID", invID) | 1829 c = logging.SetField(c, "InvID", invID) |
| 1705 » inv, err := e.GetInvocation(c, jobID, invID) | 1830 » inv, err := e.getInvocation(c, jobID, invID) |
| 1706 if err != nil { | 1831 if err != nil { |
| 1707 logging.Errorf(c, "Failed to fetch the invocation - %s", err) | 1832 logging.Errorf(c, "Failed to fetch the invocation - %s", err) |
| 1708 return err | 1833 return err |
| 1709 } | 1834 } |
| 1710 if inv == nil { | 1835 if inv == nil { |
| 1711 return ErrNoSuchInvocation | 1836 return ErrNoSuchInvocation |
| 1712 } | 1837 } |
| 1713 | 1838 |
| 1714 // Finished invocations are immutable, skip the message. | 1839 // Finished invocations are immutable, skip the message. |
| 1715 if inv.Status.Final() { | 1840 if inv.Status.Final() { |
| (...skipping 236 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1952 } | 2077 } |
| 1953 if hasFinished { | 2078 if hasFinished { |
| 1954 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err or { | 2079 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err or { |
| 1955 sm.OnInvocationDone(saving.ID) | 2080 sm.OnInvocationDone(saving.ID) |
| 1956 return nil | 2081 return nil |
| 1957 }) | 2082 }) |
| 1958 } | 2083 } |
| 1959 return nil | 2084 return nil |
| 1960 }) | 2085 }) |
| 1961 } | 2086 } |
| OLD | NEW |