Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1025)

Side by Side Diff: scheduler/appengine/engine/engine.go

Issue 2946343004: scheduler: add rpcs for actions on job and invocation. (Closed)
Patch Set: review Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 // Package engine implements the core logic of the scheduler service. 5 // Package engine implements the core logic of the scheduler service.
6 package engine 6 package engine
7 7
8 import ( 8 import (
9 "bytes" 9 "bytes"
10 "encoding/json" 10 "encoding/json"
(...skipping 25 matching lines...) Expand all
36 "github.com/luci/luci-go/server/auth" 36 "github.com/luci/luci-go/server/auth"
37 "github.com/luci/luci-go/server/auth/identity" 37 "github.com/luci/luci-go/server/auth/identity"
38 "github.com/luci/luci-go/server/auth/signing" 38 "github.com/luci/luci-go/server/auth/signing"
39 "github.com/luci/luci-go/server/tokens" 39 "github.com/luci/luci-go/server/tokens"
40 40
41 "github.com/luci/luci-go/scheduler/appengine/catalog" 41 "github.com/luci/luci-go/scheduler/appengine/catalog"
42 "github.com/luci/luci-go/scheduler/appengine/schedule" 42 "github.com/luci/luci-go/scheduler/appengine/schedule"
43 "github.com/luci/luci-go/scheduler/appengine/task" 43 "github.com/luci/luci-go/scheduler/appengine/task"
44 ) 44 )
45 45
46 var (
47 ErrNoSuchJob = errors.New("no such job")
48 ErrNoSuchInvocation = errors.New("the invocation doesn't exist")
49 )
50
46 // Engine manages all scheduler jobs: keeps track of their state, runs state 51 // Engine manages all scheduler jobs: keeps track of their state, runs state
47 // machine transactions, starts new invocations, etc. A method returns 52 // machine transactions, starts new invocations, etc. A method returns
48 // errors.Transient if the error is non-fatal and the call should be retried 53 // errors.Transient if the error is non-fatal and the call should be retried
49 // later. Any other error means that retry won't help. 54 // later. Any other error means that retry won't help.
50 type Engine interface { 55 type Engine interface {
51 // GetAllProjects returns a list of all projects that have at least one 56 // GetAllProjects returns a list of all projects that have at least one
52 // enabled scheduler job. 57 // enabled scheduler job.
53 GetAllProjects(c context.Context) ([]string, error) 58 GetAllProjects(c context.Context) ([]string, error)
54 59
55 // GetAllJobs returns a list of all enabled scheduler jobs in no particu lar 60 // GetAllJobs returns a list of all enabled scheduler jobs in no particu lar
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
111 // start an invocation). Normally one nonce corresponds to one Invocatio n 116 // start an invocation). Normally one nonce corresponds to one Invocatio n
112 // entity, but there can be more if job fails to start with a transient error. 117 // entity, but there can be more if job fails to start with a transient error.
113 TriggerInvocation(c context.Context, jobID string, triggeredBy identity. Identity) (int64, error) 118 TriggerInvocation(c context.Context, jobID string, triggeredBy identity. Identity) (int64, error)
114 119
115 // PauseJob replaces job's schedule with "triggered", effectively preven ting 120 // PauseJob replaces job's schedule with "triggered", effectively preven ting
116 // it from running automatically (until it is resumed). Manual invocatio ns are 121 // it from running automatically (until it is resumed). Manual invocatio ns are
117 // still allowed. Does nothing if job is already paused. Any pending or 122 // still allowed. Does nothing if job is already paused. Any pending or
118 // running invocations are still executed. 123 // running invocations are still executed.
119 PauseJob(c context.Context, jobID string, who identity.Identity) error 124 PauseJob(c context.Context, jobID string, who identity.Identity) error
120 125
121 » // ResumeJob resumed paused job. Doesn't nothing if the job is not pause d. 126 » // ResumeJob resumes paused job. Doesn't nothing if the job is not pause d.
122 ResumeJob(c context.Context, jobID string, who identity.Identity) error 127 ResumeJob(c context.Context, jobID string, who identity.Identity) error
123 128
124 // AbortInvocation forcefully moves the invocation to failed state. 129 // AbortInvocation forcefully moves the invocation to failed state.
125 // 130 //
126 // It opportunistically tries to send "abort" signal to a job runner if it 131 // It opportunistically tries to send "abort" signal to a job runner if it
127 // supports cancellation, but it doesn't wait for reply. It proceeds to 132 // supports cancellation, but it doesn't wait for reply. It proceeds to
128 // modifying local state in the scheduler service datastore immediately. 133 // modifying local state in the scheduler service datastore immediately.
129 // 134 //
130 // AbortInvocation can be used to manually "unstuck" jobs that got stuck due 135 // AbortInvocation can be used to manually "unstuck" jobs that got stuck due
131 // to missing PubSub notifications or other kinds of unexpected conditio ns. 136 // to missing PubSub notifications or other kinds of unexpected conditio ns.
(...skipping 902 matching lines...) Expand 10 before | Expand all | Expand 10 after
1034 default: 1039 default:
1035 return fmt.Errorf("unexpected invocation action kind %q", payloa d) 1040 return fmt.Errorf("unexpected invocation action kind %q", payloa d)
1036 } 1041 }
1037 } 1042 }
1038 1043
1039 func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere dBy identity.Identity) (int64, error) { 1044 func (e *engineImpl) TriggerInvocation(c context.Context, jobID string, triggere dBy identity.Identity) (int64, error) {
1040 var err error 1045 var err error
1041 var invNonce int64 1046 var invNonce int64
1042 err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) er ror { 1047 err2 := e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) er ror {
1043 if isNew { 1048 if isNew {
1044 » » » err = errors.New("no such job") 1049 » » » err = ErrNoSuchJob
1045 return errSkipPut 1050 return errSkipPut
1046 } 1051 }
1047 if !job.Enabled { 1052 if !job.Enabled {
1048 err = errors.New("the job is disabled") 1053 err = errors.New("the job is disabled")
1049 return errSkipPut 1054 return errSkipPut
1050 } 1055 }
1051 invNonce = 0 1056 invNonce = 0
1052 return e.rollSM(c, job, func(sm *StateMachine) error { 1057 return e.rollSM(c, job, func(sm *StateMachine) error {
1053 if err := sm.OnManualInvocation(triggeredBy); err != nil { 1058 if err := sm.OnManualInvocation(triggeredBy); err != nil {
1054 return err 1059 return err
(...skipping 12 matching lines...) Expand all
1067 return e.setPausedFlag(c, jobID, true, who) 1072 return e.setPausedFlag(c, jobID, true, who)
1068 } 1073 }
1069 1074
1070 func (e *engineImpl) ResumeJob(c context.Context, jobID string, who identity.Ide ntity) error { 1075 func (e *engineImpl) ResumeJob(c context.Context, jobID string, who identity.Ide ntity) error {
1071 return e.setPausedFlag(c, jobID, false, who) 1076 return e.setPausedFlag(c, jobID, false, who)
1072 } 1077 }
1073 1078
1074 func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool, who identity.Identity) error { 1079 func (e *engineImpl) setPausedFlag(c context.Context, jobID string, paused bool, who identity.Identity) error {
1075 return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or { 1080 return e.txn(c, jobID, func(c context.Context, job *Job, isNew bool) err or {
1076 if isNew || !job.Enabled { 1081 if isNew || !job.Enabled {
1077 » » » return errors.New("no such job") 1082 » » » return ErrNoSuchJob
1078 } 1083 }
1079 if job.Paused == paused { 1084 if job.Paused == paused {
1080 return errSkipPut 1085 return errSkipPut
1081 } 1086 }
1082 if paused { 1087 if paused {
1083 logging.Warningf(c, "Job is paused by %s", who) 1088 logging.Warningf(c, "Job is paused by %s", who)
1084 } else { 1089 } else {
1085 logging.Warningf(c, "Job is resumed by %s", who) 1090 logging.Warningf(c, "Job is resumed by %s", who)
1086 } 1091 }
1087 job.Paused = paused 1092 job.Paused = paused
1088 return e.rollSM(c, job, func(sm *StateMachine) error { 1093 return e.rollSM(c, job, func(sm *StateMachine) error {
1089 sm.OnScheduleChange() 1094 sm.OnScheduleChange()
1090 return nil 1095 return nil
1091 }) 1096 })
1092 }) 1097 })
1093 } 1098 }
1094 1099
1095 func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6 4, who identity.Identity) error { 1100 func (e *engineImpl) AbortInvocation(c context.Context, jobID string, invID int6 4, who identity.Identity) error {
1096 c = logging.SetField(c, "JobID", jobID) 1101 c = logging.SetField(c, "JobID", jobID)
1097 c = logging.SetField(c, "InvID", invID) 1102 c = logging.SetField(c, "InvID", invID)
1098 1103
1099 var inv *Invocation 1104 var inv *Invocation
1100 var err error 1105 var err error
1101 switch inv, err = e.GetInvocation(c, jobID, invID); { 1106 switch inv, err = e.GetInvocation(c, jobID, invID); {
1102 case err != nil: 1107 case err != nil:
1103 logging.Errorf(c, "Failed to fetch the invocation - %s", err) 1108 logging.Errorf(c, "Failed to fetch the invocation - %s", err)
1104 return err 1109 return err
1105 case inv == nil: 1110 case inv == nil:
1106 logging.Errorf(c, "The invocation doesn't exist") 1111 logging.Errorf(c, "The invocation doesn't exist")
1107 » » return errors.New("the invocation doesn't exist") 1112 » » return ErrNoSuchInvocation
1108 case inv.Status.Final(): 1113 case inv.Status.Final():
1109 return nil 1114 return nil
1110 } 1115 }
1111 1116
1112 ctl, err := e.controllerForInvocation(c, inv) 1117 ctl, err := e.controllerForInvocation(c, inv)
1113 if err != nil { 1118 if err != nil {
1114 logging.Errorf(c, "Cannot get controller - %s", err) 1119 logging.Errorf(c, "Cannot get controller - %s", err)
1115 return err 1120 return err
1116 } 1121 }
1117 1122
(...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after
1301 c = logging.SetField(c, "JobID", jobID) 1306 c = logging.SetField(c, "JobID", jobID)
1302 c = logging.SetField(c, "InvID", invID) 1307 c = logging.SetField(c, "InvID", invID)
1303 1308
1304 logging.Infof(c, "Handling invocation timer %q", timer.Name) 1309 logging.Infof(c, "Handling invocation timer %q", timer.Name)
1305 inv, err := e.GetInvocation(c, jobID, invID) 1310 inv, err := e.GetInvocation(c, jobID, invID)
1306 if err != nil { 1311 if err != nil {
1307 logging.Errorf(c, "Failed to fetch the invocation - %s", err) 1312 logging.Errorf(c, "Failed to fetch the invocation - %s", err)
1308 return err 1313 return err
1309 } 1314 }
1310 if inv == nil { 1315 if inv == nil {
1311 » » return errors.New("the invocation doesn't exist") 1316 » » return ErrNoSuchInvocation
1312 } 1317 }
1313 1318
1314 // Finished invocations are immutable, skip the message. 1319 // Finished invocations are immutable, skip the message.
1315 if inv.Status.Final() { 1320 if inv.Status.Final() {
1316 logging.Infof(c, "Skipping the timer, the invocation is in final state %q", inv.Status) 1321 logging.Infof(c, "Skipping the timer, the invocation is in final state %q", inv.Status)
1317 return nil 1322 return nil
1318 } 1323 }
1319 1324
1320 // Build corresponding controller. 1325 // Build corresponding controller.
1321 ctl, err := e.controllerForInvocation(c, inv) 1326 ctl, err := e.controllerForInvocation(c, inv)
(...skipping 364 matching lines...) Expand 10 before | Expand all | Expand 10 after
1686 } 1691 }
1687 1692
1688 c = logging.SetField(c, "JobID", jobID) 1693 c = logging.SetField(c, "JobID", jobID)
1689 c = logging.SetField(c, "InvID", invID) 1694 c = logging.SetField(c, "InvID", invID)
1690 inv, err := e.GetInvocation(c, jobID, invID) 1695 inv, err := e.GetInvocation(c, jobID, invID)
1691 if err != nil { 1696 if err != nil {
1692 logging.Errorf(c, "Failed to fetch the invocation - %s", err) 1697 logging.Errorf(c, "Failed to fetch the invocation - %s", err)
1693 return err 1698 return err
1694 } 1699 }
1695 if inv == nil { 1700 if inv == nil {
1696 » » return errors.New("the invocation doesn't exist") 1701 » » return ErrNoSuchInvocation
1697 } 1702 }
1698 1703
1699 // Finished invocations are immutable, skip the message. 1704 // Finished invocations are immutable, skip the message.
1700 if inv.Status.Final() { 1705 if inv.Status.Final() {
1701 logging.Infof(c, "Skipping the notification, the invocation is i n final state %q", inv.Status) 1706 logging.Infof(c, "Skipping the notification, the invocation is i n final state %q", inv.Status)
1702 return nil 1707 return nil
1703 } 1708 }
1704 1709
1705 // Build corresponding controller. 1710 // Build corresponding controller.
1706 ctl, err := e.controllerForInvocation(c, inv) 1711 ctl, err := e.controllerForInvocation(c, inv)
(...skipping 230 matching lines...) Expand 10 before | Expand all | Expand 10 after
1937 } 1942 }
1938 if hasFinished { 1943 if hasFinished {
1939 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err or { 1944 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err or {
1940 sm.OnInvocationDone(saving.ID) 1945 sm.OnInvocationDone(saving.ID)
1941 return nil 1946 return nil
1942 }) 1947 })
1943 } 1948 }
1944 return nil 1949 return nil
1945 }) 1950 })
1946 } 1951 }
OLDNEW
« no previous file with comments | « scheduler/appengine/apiservers/scheduler_test.go ('k') | scheduler/appengine/presentation/acl.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698