| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // Package engine implements the core logic of the scheduler service. | 5 // Package engine implements the core logic of the scheduler service. |
| 6 package engine | 6 package engine |
| 7 | 7 |
| 8 import ( | 8 import ( |
| 9 "bytes" | 9 "bytes" |
| 10 "encoding/json" | 10 "encoding/json" |
| (...skipping 25 matching lines...) Expand all Loading... |
| 36 "github.com/luci/luci-go/server/auth" | 36 "github.com/luci/luci-go/server/auth" |
| 37 "github.com/luci/luci-go/server/auth/identity" | 37 "github.com/luci/luci-go/server/auth/identity" |
| 38 "github.com/luci/luci-go/server/auth/signing" | 38 "github.com/luci/luci-go/server/auth/signing" |
| 39 "github.com/luci/luci-go/server/tokens" | 39 "github.com/luci/luci-go/server/tokens" |
| 40 | 40 |
| 41 "github.com/luci/luci-go/scheduler/appengine/catalog" | 41 "github.com/luci/luci-go/scheduler/appengine/catalog" |
| 42 "github.com/luci/luci-go/scheduler/appengine/schedule" | 42 "github.com/luci/luci-go/scheduler/appengine/schedule" |
| 43 "github.com/luci/luci-go/scheduler/appengine/task" | 43 "github.com/luci/luci-go/scheduler/appengine/task" |
| 44 ) | 44 ) |
| 45 | 45 |
| 46 var ( |
| 47 ErrNoSuchJob = errors.New("no such job") |
| 48 ErrNoSuchInvocation = errors.New("the invocation doesn't exist") |
| 49 ) |
| 50 |
| 46 // Engine manages all scheduler jobs: keeps track of their state, runs state | 51 // Engine manages all scheduler jobs: keeps track of their state, runs state |
| 47 // machine transactions, starts new invocations, etc. A method returns | 52 // machine transactions, starts new invocations, etc. A method returns |
| 48 // errors.Transient if the error is non-fatal and the call should be retried | 53 // errors.Transient if the error is non-fatal and the call should be retried |
| 49 // later. Any other error means that retry won't help. | 54 // later. Any other error means that retry won't help. |
| 50 type Engine interface { | 55 type Engine interface { |
| 51 // GetAllProjects returns a list of all projects that have at least one | 56 // GetAllProjects returns a list of all projects that have at least one |
| 52 // enabled scheduler job. | 57 // enabled scheduler job. |
| 53 GetAllProjects(c context.Context) ([]string, error) | 58 GetAllProjects(c context.Context) ([]string, error) |
| 54 | 59 |
| 55 // GetAllJobs returns a list of all enabled scheduler jobs in no particu
lar | 60 // GetAllJobs returns a list of all enabled scheduler jobs in no particu
lar |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 111 // start an invocation). Normally one nonce corresponds to one Invocatio
n | 116 // start an invocation). Normally one nonce corresponds to one Invocatio
n |
| 112 // entity, but there can be more if job fails to start with a transient
error. | 117 // entity, but there can be more if job fails to start with a transient
error. |
| 113 TriggerInvocation(c context.Context, jobID string, triggeredBy identity.
Identity) (int64, error) | 118 TriggerInvocation(c context.Context, jobID string, triggeredBy identity.
Identity) (int64, error) |
| 114 | 119 |
| 115 // PauseJob replaces job's schedule with "triggered", effectively preven
ting | 120 // PauseJob replaces job's schedule with "triggered", effectively preven
ting |
| 116 // it from running automatically (until it is resumed). Manual invocatio
ns are | 121 // it from running automatically (until it is resumed). Manual invocatio
ns are |
| 117 // still allowed. Does nothing if job is already paused. Any pending or | 122 // still allowed. Does nothing if job is already paused. Any pending or |
| 118 // running invocations are still executed. | 123 // running invocations are still executed. |
| 119 PauseJob(c context.Context, jobID string, who identity.Identity) error | 124 PauseJob(c context.Context, jobID string, who identity.Identity) error |
| 120 | 125 |
| 121 » // ResumeJob resumed paused job. Doesn't nothing if the job is not pause
d. | 126 » // ResumeJob resumes paused job. Doesn't nothing if the job is not pause
d. |
| 122 ResumeJob(c context.Context, jobID string, who identity.Identity) error | 127 ResumeJob(c context.Context, jobID string, who identity.Identity) error |
| 123 | 128 |
| 124 // AbortInvocation forcefully moves the invocation to failed state. | 129 // AbortInvocation forcefully moves the invocation to failed state. |
| 125 // | 130 // |
| 126 // It opportunistically tries to send "abort" signal to a job runner if
it | 131 // It opportunistically tries to send "abort" signal to a job runner if
it |
| 127 // supports cancellation, but it doesn't wait for reply. It proceeds to | 132 // supports cancellation, but it doesn't wait for reply. It proceeds to |
| 128 // modifying local state in the scheduler service datastore immediately. | 133 // modifying local state in the scheduler service datastore immediately. |
| 129 // | 134 // |
| 130 // AbortInvocation can be used to manually "unstuck" jobs that got stuck
due | 135 // AbortInvocation can be used to manually "unstuck" jobs that got stuck
due |
| 131 // to missing PubSub notifications or other kinds of unexpected conditio
ns. | 136 // to missing PubSub notifications or other kinds of unexpected conditio
ns. |
| (...skipping 902 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1034 default: | 1039 default: |
| 1035 return fmt.Errorf("unexpected invocation action kind %q", payloa
d) | 1040 return fmt.Errorf("unexpected invocation action kind %q", payloa
d) |
| 1036 } | 1041 } |
| 1037 } | 1042 } |
| 1038 | 1043 |
| 1039 func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere
dBy identity.Identity) (int64, error) { | 1044 func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere
dBy identity.Identity) (int64, error) { |
| 1040 var err error | 1045 var err error |
| 1041 var invNonce int64 | 1046 var invNonce int64 |
| 1042 err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) er
ror { | 1047 err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) er
ror { |
| 1043 if isNew { | 1048 if isNew { |
| 1044 » » » err = errors.New("no such job") | 1049 » » » err = ErrNoSuchJob |
| 1045 return errSkipPut | 1050 return errSkipPut |
| 1046 } | 1051 } |
| 1047 if !job.Enabled { | 1052 if !job.Enabled { |
| 1048 err = errors.New("the job is disabled") | 1053 err = errors.New("the job is disabled") |
| 1049 return errSkipPut | 1054 return errSkipPut |
| 1050 } | 1055 } |
| 1051 invNonce = 0 | 1056 invNonce = 0 |
| 1052 return e.rollSM(c, job, func(sm *StateMachine) error { | 1057 return e.rollSM(c, job, func(sm *StateMachine) error { |
| 1053 if err := sm.OnManualInvocation(triggeredBy); err != nil
{ | 1058 if err := sm.OnManualInvocation(triggeredBy); err != nil
{ |
| 1054 return err | 1059 return err |
| (...skipping 12 matching lines...) Expand all Loading... |
| 1067 return e.setPausedFlag(c, jobID, true, who) | 1072 return e.setPausedFlag(c, jobID, true, who) |
| 1068 } | 1073 } |
| 1069 | 1074 |
| 1070 func (e *engineImpl) ResumeJob(c context.Context, jobID string, who identity.Ide
ntity) error { | 1075 func (e *engineImpl) ResumeJob(c context.Context, jobID string, who identity.Ide
ntity) error { |
| 1071 return e.setPausedFlag(c, jobID, false, who) | 1076 return e.setPausedFlag(c, jobID, false, who) |
| 1072 } | 1077 } |
| 1073 | 1078 |
| 1074 func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool,
who identity.Identity) error { | 1079 func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool,
who identity.Identity) error { |
| 1075 return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err
or { | 1080 return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err
or { |
| 1076 if isNew || !job.Enabled { | 1081 if isNew || !job.Enabled { |
| 1077 » » » return errors.New("no such job") | 1082 » » » return ErrNoSuchJob |
| 1078 } | 1083 } |
| 1079 if job.Paused == paused { | 1084 if job.Paused == paused { |
| 1080 return errSkipPut | 1085 return errSkipPut |
| 1081 } | 1086 } |
| 1082 if paused { | 1087 if paused { |
| 1083 logging.Warningf(c, "Job is paused by %s", who) | 1088 logging.Warningf(c, "Job is paused by %s", who) |
| 1084 } else { | 1089 } else { |
| 1085 logging.Warningf(c, "Job is resumed by %s", who) | 1090 logging.Warningf(c, "Job is resumed by %s", who) |
| 1086 } | 1091 } |
| 1087 job.Paused = paused | 1092 job.Paused = paused |
| 1088 return e.rollSM(c, job, func(sm *StateMachine) error { | 1093 return e.rollSM(c, job, func(sm *StateMachine) error { |
| 1089 sm.OnScheduleChange() | 1094 sm.OnScheduleChange() |
| 1090 return nil | 1095 return nil |
| 1091 }) | 1096 }) |
| 1092 }) | 1097 }) |
| 1093 } | 1098 } |
| 1094 | 1099 |
| 1095 func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6
4, who identity.Identity) error { | 1100 func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6
4, who identity.Identity) error { |
| 1096 c = logging.SetField(c, "JobID", jobID) | 1101 c = logging.SetField(c, "JobID", jobID) |
| 1097 c = logging.SetField(c, "InvID", invID) | 1102 c = logging.SetField(c, "InvID", invID) |
| 1098 | 1103 |
| 1099 var inv *Invocation | 1104 var inv *Invocation |
| 1100 var err error | 1105 var err error |
| 1101 switch inv, err = e.GetInvocation(c, jobID, invID); { | 1106 switch inv, err = e.GetInvocation(c, jobID, invID); { |
| 1102 case err != nil: | 1107 case err != nil: |
| 1103 logging.Errorf(c, "Failed to fetch the invocation - %s", err) | 1108 logging.Errorf(c, "Failed to fetch the invocation - %s", err) |
| 1104 return err | 1109 return err |
| 1105 case inv == nil: | 1110 case inv == nil: |
| 1106 logging.Errorf(c, "The invocation doesn't exist") | 1111 logging.Errorf(c, "The invocation doesn't exist") |
| 1107 » » return errors.New("the invocation doesn't exist") | 1112 » » return ErrNoSuchInvocation |
| 1108 case inv.Status.Final(): | 1113 case inv.Status.Final(): |
| 1109 return nil | 1114 return nil |
| 1110 } | 1115 } |
| 1111 | 1116 |
| 1112 ctl, err := e.controllerForInvocation(c, inv) | 1117 ctl, err := e.controllerForInvocation(c, inv) |
| 1113 if err != nil { | 1118 if err != nil { |
| 1114 logging.Errorf(c, "Cannot get controller - %s", err) | 1119 logging.Errorf(c, "Cannot get controller - %s", err) |
| 1115 return err | 1120 return err |
| 1116 } | 1121 } |
| 1117 | 1122 |
| (...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1301 c = logging.SetField(c, "JobID", jobID) | 1306 c = logging.SetField(c, "JobID", jobID) |
| 1302 c = logging.SetField(c, "InvID", invID) | 1307 c = logging.SetField(c, "InvID", invID) |
| 1303 | 1308 |
| 1304 logging.Infof(c, "Handling invocation timer %q", timer.Name) | 1309 logging.Infof(c, "Handling invocation timer %q", timer.Name) |
| 1305 inv, err := e.GetInvocation(c, jobID, invID) | 1310 inv, err := e.GetInvocation(c, jobID, invID) |
| 1306 if err != nil { | 1311 if err != nil { |
| 1307 logging.Errorf(c, "Failed to fetch the invocation - %s", err) | 1312 logging.Errorf(c, "Failed to fetch the invocation - %s", err) |
| 1308 return err | 1313 return err |
| 1309 } | 1314 } |
| 1310 if inv == nil { | 1315 if inv == nil { |
| 1311 » » return errors.New("the invocation doesn't exist") | 1316 » » return ErrNoSuchInvocation |
| 1312 } | 1317 } |
| 1313 | 1318 |
| 1314 // Finished invocations are immutable, skip the message. | 1319 // Finished invocations are immutable, skip the message. |
| 1315 if inv.Status.Final() { | 1320 if inv.Status.Final() { |
| 1316 logging.Infof(c, "Skipping the timer, the invocation is in final
state %q", inv.Status) | 1321 logging.Infof(c, "Skipping the timer, the invocation is in final
state %q", inv.Status) |
| 1317 return nil | 1322 return nil |
| 1318 } | 1323 } |
| 1319 | 1324 |
| 1320 // Build corresponding controller. | 1325 // Build corresponding controller. |
| 1321 ctl, err := e.controllerForInvocation(c, inv) | 1326 ctl, err := e.controllerForInvocation(c, inv) |
| (...skipping 364 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1686 } | 1691 } |
| 1687 | 1692 |
| 1688 c = logging.SetField(c, "JobID", jobID) | 1693 c = logging.SetField(c, "JobID", jobID) |
| 1689 c = logging.SetField(c, "InvID", invID) | 1694 c = logging.SetField(c, "InvID", invID) |
| 1690 inv, err := e.GetInvocation(c, jobID, invID) | 1695 inv, err := e.GetInvocation(c, jobID, invID) |
| 1691 if err != nil { | 1696 if err != nil { |
| 1692 logging.Errorf(c, "Failed to fetch the invocation - %s", err) | 1697 logging.Errorf(c, "Failed to fetch the invocation - %s", err) |
| 1693 return err | 1698 return err |
| 1694 } | 1699 } |
| 1695 if inv == nil { | 1700 if inv == nil { |
| 1696 » » return errors.New("the invocation doesn't exist") | 1701 » » return ErrNoSuchInvocation |
| 1697 } | 1702 } |
| 1698 | 1703 |
| 1699 // Finished invocations are immutable, skip the message. | 1704 // Finished invocations are immutable, skip the message. |
| 1700 if inv.Status.Final() { | 1705 if inv.Status.Final() { |
| 1701 logging.Infof(c, "Skipping the notification, the invocation is i
n final state %q", inv.Status) | 1706 logging.Infof(c, "Skipping the notification, the invocation is i
n final state %q", inv.Status) |
| 1702 return nil | 1707 return nil |
| 1703 } | 1708 } |
| 1704 | 1709 |
| 1705 // Build corresponding controller. | 1710 // Build corresponding controller. |
| 1706 ctl, err := e.controllerForInvocation(c, inv) | 1711 ctl, err := e.controllerForInvocation(c, inv) |
| (...skipping 230 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1937 } | 1942 } |
| 1938 if hasFinished { | 1943 if hasFinished { |
| 1939 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err
or { | 1944 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err
or { |
| 1940 sm.OnInvocationDone(saving.ID) | 1945 sm.OnInvocationDone(saving.ID) |
| 1941 return nil | 1946 return nil |
| 1942 }) | 1947 }) |
| 1943 } | 1948 } |
| 1944 return nil | 1949 return nil |
| 1945 }) | 1950 }) |
| 1946 } | 1951 } |
| OLD | NEW |