Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2017 The LUCI Authors. | 1 // Copyright 2017 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. | 13 // limitations under the License. |
| 14 | 14 |
| 15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. | 15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. |
| 16 package demo | 16 package demo |
| 17 | 17 |
| 18 import ( | 18 import ( |
| 19 "fmt" | |
| 19 "net/http" | 20 "net/http" |
| 21 "strconv" | |
| 22 "sync" | |
| 20 "time" | 23 "time" |
| 21 | 24 |
| 22 "golang.org/x/net/context" | 25 "golang.org/x/net/context" |
| 23 | 26 |
| 24 "github.com/golang/protobuf/proto" | 27 "github.com/golang/protobuf/proto" |
| 25 "github.com/luci/gae/service/datastore" | 28 "github.com/luci/gae/service/datastore" |
| 29 "github.com/luci/gae/service/info" | |
| 30 "github.com/luci/gae/service/memcache" | |
| 26 | 31 |
| 27 "github.com/luci/luci-go/appengine/gaemiddleware" | 32 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 33 "github.com/luci/luci-go/appengine/memlock" | |
| 28 "github.com/luci/luci-go/common/clock" | 34 "github.com/luci/luci-go/common/clock" |
| 29 "github.com/luci/luci-go/common/data/rand/mathrand" | 35 "github.com/luci/luci-go/common/data/rand/mathrand" |
| 30 "github.com/luci/luci-go/common/logging" | 36 "github.com/luci/luci-go/common/logging" |
| 37 "github.com/luci/luci-go/common/retry/transient" | |
| 31 "github.com/luci/luci-go/server/router" | 38 "github.com/luci/luci-go/server/router" |
| 32 | 39 |
| 33 "github.com/luci/luci-go/scheduler/appengine/engine/cron" | 40 "github.com/luci/luci-go/scheduler/appengine/engine/cron" |
| 41 "github.com/luci/luci-go/scheduler/appengine/engine/dsset" | |
| 34 "github.com/luci/luci-go/scheduler/appengine/engine/internal" | 42 "github.com/luci/luci-go/scheduler/appengine/engine/internal" |
| 35 "github.com/luci/luci-go/scheduler/appengine/engine/tq" | 43 "github.com/luci/luci-go/scheduler/appengine/engine/tq" |
| 36 "github.com/luci/luci-go/scheduler/appengine/schedule" | 44 "github.com/luci/luci-go/scheduler/appengine/schedule" |
| 37 ) | 45 ) |
| 38 | 46 |
| 39 var tasks = tq.Dispatcher{} | 47 var dispatcher = tq.Dispatcher{} |
| 40 | 48 |
| 41 type CronState struct { | 49 type CronState struct { |
| 42 _extra datastore.PropertyMap `gae:"-,extra"` | 50 _extra datastore.PropertyMap `gae:"-,extra"` |
| 43 | 51 |
| 44 ID string `gae:"$id"` | 52 ID string `gae:"$id"` |
| 45 State cron.State `gae:",noindex"` | 53 State cron.State `gae:",noindex"` |
| 54 | |
| 55 NextInvID int64 `gae:",noindex"` | |
| 56 RunningSet []int64 `gae:",noindex"` | |
| 46 } | 57 } |
| 47 | 58 |
| 48 func (s *CronState) schedule() *schedule.Schedule { | 59 func (s *CronState) schedule() *schedule.Schedule { |
| 49 parsed, err := schedule.Parse(s.ID, 0) | 60 parsed, err := schedule.Parse(s.ID, 0) |
| 50 if err != nil { | 61 if err != nil { |
| 51 panic(err) | 62 panic(err) |
| 52 } | 63 } |
| 53 return parsed | 64 return parsed |
| 54 } | 65 } |
| 55 | 66 |
| 56 // evolve instantiates cron.Machine, calls the callback and submits emitted | 67 func pendingTriggersSet(c context.Context, jobID string) *dsset.Set { |
| 57 // actions. | 68 » return &dsset.Set{ |
| 69 » » ID: "triggers:" + jobID, | |
| 70 » » ShardCount: 8, | |
| 71 » » TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}), | |
| 72 » » TombstonesDelay: 15 * time.Minute, | |
| 73 » } | |
| 74 } | |
| 75 | |
| 76 func recentlyFinishedSet(c context.Context, jobID string) *dsset.Set { | |
| 77 » return &dsset.Set{ | |
| 78 » » ID: "finished:" + jobID, | |
| 79 » » ShardCount: 8, | |
| 80 » » TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}), | |
| 81 » » TombstonesDelay: 15 * time.Minute, | |
| 82 » } | |
| 83 } | |
| 84 | |
| 85 func pokeMachine(c context.Context, entity *CronState, cb func(context.Context, *cron.Machine) error) error { | |
| 86 » machine := &cron.Machine{ | |
| 87 » » Now: clock.Now(c), | |
| 88 » » Schedule: entity.schedule(), | |
| 89 » » Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, | |
| 90 » » State: entity.State, | |
| 91 » } | |
| 92 | |
| 93 » if err := cb(c, machine); err != nil { | |
| 94 » » return err | |
| 95 » } | |
| 96 | |
| 97 » tasks := []*tq.Task{} | |
| 98 » for _, action := range machine.Actions { | |
| 99 » » switch a := action.(type) { | |
| 100 » » case cron.TickLaterAction: | |
| 101 » » » logging.Infof(c, "Scheduling tick %d after %s", a.TickNo nce, a.When.Sub(time.Now())) | |
| 102 » » » tasks = append(tasks, &tq.Task{ | |
| 103 » » » » Payload: &internal.TickLaterTask{JobId: entity.I D, TickNonce: a.TickNonce}, | |
| 104 » » » » ETA: a.When, | |
| 105 » » » }) | |
| 106 » » case cron.StartInvocationAction: | |
| 107 » » » tasks = append(tasks, &tq.Task{ | |
| 108 » » » » Payload: &internal.TriggerInvocationTask{JobId: entity.ID, TriggerId: mathrand.Get(c).Int63()}, | |
| 109 » » » }) | |
| 110 » » default: | |
| 111 » » » panic("unknown action type") | |
| 112 » » } | |
| 113 » } | |
| 114 » if err := dispatcher.AddTasks(c, tasks); err != nil { | |
| 115 » » return err | |
| 116 » } | |
| 117 | |
| 118 » entity.State = machine.State | |
| 119 » return nil | |
| 120 } | |
| 121 | |
| 58 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error { | 122 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error { |
| 59 err := datastore.RunInTransaction(c, func(c context.Context) error { | 123 err := datastore.RunInTransaction(c, func(c context.Context) error { |
| 60 » » entity := CronState{ID: id} | 124 » » entity := &CronState{ID: id} |
|
tandrii(chromium)
2017/07/31 18:58:46
btw, why this change? I think entity object should
| |
| 61 » » if err := datastore.Get(c, &entity); err != nil && err != datast ore.ErrNoSuchEntity { | 125 » » if err := datastore.Get(c, entity); err != nil && err != datasto re.ErrNoSuchEntity { |
| 62 return err | 126 return err |
| 63 } | 127 } |
| 64 | 128 » » if err := pokeMachine(c, entity, cb); err != nil { |
| 65 » » machine := &cron.Machine{ | |
| 66 » » » Now: clock.Now(c), | |
| 67 » » » Schedule: entity.schedule(), | |
| 68 » » » Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, | |
| 69 » » » State: entity.State, | |
| 70 » » } | |
| 71 | |
| 72 » » if err := cb(c, machine); err != nil { | |
| 73 return err | 129 return err |
| 74 } | 130 } |
| 75 | 131 » » return datastore.Put(c, entity) |
| 76 » » for _, action := range machine.Actions { | |
| 77 » » » var task tq.Task | |
| 78 » » » switch a := action.(type) { | |
| 79 » » » case cron.TickLaterAction: | |
| 80 » » » » logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) | |
| 81 » » » » task = tq.Task{ | |
| 82 » » » » » Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce}, | |
| 83 » » » » » ETA: a.When, | |
| 84 » » » » } | |
| 85 » » » case cron.StartInvocationAction: | |
| 86 » » » » task = tq.Task{ | |
| 87 » » » » » Payload: &internal.StartInvocationTask{J obId: id}, | |
| 88 » » » » » Delay: time.Second, // give the transa ction time to land | |
| 89 » » » » } | |
| 90 » » » default: | |
| 91 » » » » panic("unknown action type") | |
| 92 » » » } | |
| 93 » » » if err := tasks.AddTask(c, &task); err != nil { | |
| 94 » » » » return err | |
| 95 » » » } | |
| 96 » » } | |
| 97 | |
| 98 » » entity.State = machine.State | |
| 99 » » return datastore.Put(c, &entity) | |
| 100 }, nil) | 132 }, nil) |
| 101 | 133 » return transient.Tag.Apply(err) |
| 102 » if err != nil { | |
| 103 » » logging.Errorf(c, "FAIL - %s", err) | |
| 104 » } | |
| 105 » return err | |
| 106 } | 134 } |
| 107 | 135 |
| 108 func startJob(c context.Context, id string) error { | 136 func startJob(c context.Context, id string) error { |
| 109 return evolve(c, id, func(c context.Context, m *cron.Machine) error { | 137 return evolve(c, id, func(c context.Context, m *cron.Machine) error { |
| 110 // Forcefully restart the chain of tasks. | 138 // Forcefully restart the chain of tasks. |
| 111 m.Disable() | 139 m.Disable() |
| 112 m.Enable() | 140 m.Enable() |
| 113 return nil | 141 return nil |
| 114 }) | 142 }) |
| 115 } | 143 } |
| 116 | 144 |
| 145 func addTrigger(c context.Context, jobID, triggerID string) error { | |
| 146 logging.Infof(c, "Triggering %q - %q", jobID, triggerID) | |
| 147 | |
| 148 // Add the trigger request to the pending set. | |
| 149 if err := pendingTriggersSet(c, jobID).Add(c, []dsset.Item{{ID: triggerI D}}); err != nil { | |
| 150 return err | |
| 151 } | |
| 152 | |
| 153 // Run a task that examines the pending set and makes decisions. | |
| 154 return kickTriageTask(c, jobID) | |
| 155 } | |
| 156 | |
| 157 func kickTriageTask(c context.Context, jobID string) error { | |
| 158 // Throttle to once per 2 sec (and make sure it is always in the future) . | |
| 159 eta := clock.Now(c).Unix() | |
| 160 eta = (eta/2 + 1) * 2 | |
| 161 dedupKey := fmt.Sprintf("triage:%s:%d", jobID, eta) | |
| 162 | |
| 163 // Use cheaper but crappier memcache as a first guard. | |
| 164 itm := memcache.NewItem(c, dedupKey).SetExpiration(time.Minute) | |
| 165 if memcache.Get(c, itm) == nil { | |
| 166 logging.Infof(c, "The triage task has already been scheduled") | |
| 167 return nil // already added! | |
| 168 } | |
| 169 | |
| 170 err := dispatcher.AddTask(c, &tq.Task{ | |
| 171 DeduplicationKey: dedupKey, | |
| 172 ETA: time.Unix(eta, 0), | |
| 173 Payload: &internal.TriageTriggersTask{JobId: jobID}, | |
| 174 }) | |
| 175 if err != nil { | |
| 176 return err | |
| 177 } | |
| 178 logging.Infof(c, "Scheduled the triage task") | |
| 179 | |
| 180 // Best effort in setting memcache flag. No big deal if it fails. | |
| 181 memcache.Set(c, itm) | |
| 182 return nil | |
| 183 } | |
| 184 | |
| 117 func handleTick(c context.Context, task proto.Message, execCount int) error { | 185 func handleTick(c context.Context, task proto.Message, execCount int) error { |
| 118 msg := task.(*internal.TickLaterTask) | 186 msg := task.(*internal.TickLaterTask) |
| 119 return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { | 187 return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { |
| 120 return m.OnTimerTick(msg.TickNonce) | 188 return m.OnTimerTick(msg.TickNonce) |
| 121 }) | 189 }) |
| 122 } | 190 } |
| 123 | 191 |
| 124 func handleInvocation(c context.Context, task proto.Message, execCount int) erro r { | 192 func handleTrigger(c context.Context, task proto.Message, execCount int) error { |
| 125 » msg := task.(*internal.StartInvocationTask) | 193 » msg := task.(*internal.TriggerInvocationTask) |
| 126 » logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId) | 194 » return addTrigger(c, msg.JobId, fmt.Sprintf("cron:%d", msg.TriggerId)) |
| 127 » return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { | 195 } |
| 128 » » m.RewindIfNecessary() | 196 |
| 129 » » return nil | 197 func handleTriage(c context.Context, task proto.Message, execCount int) error { |
| 198 » msg := task.(*internal.TriageTriggersTask) | |
| 199 » logging.Infof(c, "Triaging requests for %q", msg.JobId) | |
| 200 | |
| 201 » err := memlock.TryWithLock(c, "triageLock:"+msg.JobId, info.RequestID(c) , func(context.Context) error { | |
| 202 » » logging.Infof(c, "Got the lock!") | |
| 203 » » return runTriage(c, msg.JobId) | |
| 130 }) | 204 }) |
| 205 return transient.Tag.Apply(err) | |
| 206 } | |
| 207 | |
| 208 func runTriage(c context.Context, jobID string) error { | |
| 209 wg := sync.WaitGroup{} | |
| 210 wg.Add(2) | |
| 211 | |
| 212 var triggersList *dsset.Listing | |
| 213 var triggersErr error | |
| 214 | |
| 215 var finishedList *dsset.Listing | |
| 216 var finishedErr error | |
| 217 | |
| 218 // Grab all pending requests (and stuff to cleanup). | |
| 219 triggersSet := pendingTriggersSet(c, jobID) | |
| 220 go func() { | |
| 221 defer wg.Done() | |
| 222 triggersList, triggersErr = triggersSet.List(c) | |
| 223 if triggersErr == nil { | |
| 224 logging.Infof(c, "Triggers: %d items, %d tombs to cleanu p", | |
| 225 len(triggersList.Items), len(triggersList.Tombst ones)) | |
| 226 } | |
| 227 }() | |
| 228 | |
| 229 // Same for recently finished invocations. | |
| 230 finishedSet := recentlyFinishedSet(c, jobID) | |
| 231 go func() { | |
| 232 defer wg.Done() | |
| 233 finishedList, finishedErr = finishedSet.List(c) | |
| 234 if finishedErr == nil { | |
| 235 logging.Infof(c, "Finished: %d items, %d tombs to cleanu p", | |
| 236 len(finishedList.Items), len(finishedList.Tombst ones)) | |
| 237 } | |
| 238 }() | |
| 239 | |
| 240 wg.Wait() | |
| 241 switch { | |
| 242 case triggersErr != nil: | |
| 243 return triggersErr | |
| 244 case finishedErr != nil: | |
| 245 return finishedErr | |
| 246 } | |
| 247 | |
| 248 // Do cleanups first. | |
| 249 if err := dsset.CleanupStorage(c, triggersList.Tombstones, finishedList. Tombstones); err != nil { | |
| 250 return err | |
| 251 } | |
| 252 | |
| 253 var cleanup []*dsset.Tombstone | |
| 254 err := datastore.RunInTransaction(c, func(c context.Context) error { | |
| 255 state := &CronState{ID: jobID} | |
| 256 if err := datastore.Get(c, state); err != nil && err != datastor e.ErrNoSuchEntity { | |
| 257 return err | |
| 258 } | |
| 259 | |
| 260 popOps := []*dsset.PopOp{} | |
| 261 | |
| 262 // Tidy RunningSet by removing all recently finished invocations . | |
| 263 if !finishedList.Empty() { | |
| 264 op, err := finishedSet.BeginPop(c, finishedList) | |
| 265 if err != nil { | |
| 266 return err | |
| 267 } | |
| 268 popOps = append(popOps, op) | |
| 269 | |
| 270 reallyFinished := map[int64]struct{}{} | |
| 271 for _, itm := range finishedList.Items { | |
| 272 if op.Pop(itm.ID) { | |
| 273 id, _ := strconv.ParseInt(itm.ID, 10, 64 ) | |
| 274 reallyFinished[id] = struct{}{} | |
| 275 } | |
| 276 } | |
| 277 | |
| 278 filtered := []int64{} | |
| 279 for _, id := range state.RunningSet { | |
| 280 if _, yep := reallyFinished[id]; yep { | |
| 281 logging.Infof(c, "Invocation finished-%d is acknowledged as finished", id) | |
| 282 } else { | |
| 283 filtered = append(filtered, id) | |
| 284 } | |
| 285 } | |
| 286 state.RunningSet = filtered | |
| 287 } | |
| 288 | |
| 289 // Launch new invocations for each pending trigger. | |
| 290 if !triggersList.Empty() { | |
| 291 op, err := triggersSet.BeginPop(c, triggersList) | |
| 292 if err != nil { | |
| 293 return err | |
| 294 } | |
| 295 popOps = append(popOps, op) | |
| 296 | |
| 297 batch := internal.LaunchInvocationsBatchTask{JobId: stat e.ID} | |
| 298 for _, trigger := range triggersList.Items { | |
| 299 if op.Pop(trigger.ID) { | |
| 300 logging.Infof(c, "Launching new launch-% d for trigger %s", state.NextInvID, trigger.ID) | |
| 301 state.RunningSet = append(state.RunningS et, state.NextInvID) | |
| 302 batch.InvId = append(batch.InvId, state. NextInvID) | |
| 303 state.NextInvID++ | |
| 304 } | |
| 305 } | |
| 306 // Transactionally trigger a batch with new invocations. | |
| 307 if len(batch.InvId) != 0 { | |
| 308 if err := dispatcher.AddTask(c, &tq.Task{Payload : &batch}); err != nil { | |
| 309 return err | |
| 310 } | |
| 311 } | |
| 312 } | |
| 313 | |
| 314 // Submit set changes. | |
| 315 var err error | |
| 316 if cleanup, err = dsset.FinishPop(c, popOps...); err != nil { | |
| 317 return err | |
| 318 } | |
| 319 | |
| 320 logging.Infof(c, "Running invocations - %v", state.RunningSet) | |
| 321 | |
| 322 // If nothing is running, poke the cron machine. Maybe it wants to start | |
| 323 // something. | |
| 324 if len(state.RunningSet) == 0 { | |
| 325 err = pokeMachine(c, state, func(c context.Context, m *c ron.Machine) error { | |
| 326 m.RewindIfNecessary() | |
| 327 return nil | |
| 328 }) | |
| 329 if err != nil { | |
| 330 return err | |
| 331 } | |
| 332 } | |
| 333 | |
| 334 // Done! | |
| 335 return datastore.Put(c, state) | |
| 336 }, nil) | |
| 337 | |
| 338 if err == nil && len(cleanup) != 0 { | |
| 339 // Best effort cleanup of storage of consumed items. | |
| 340 logging.Infof(c, "Cleaning up storage of %d items", len(cleanup) ) | |
| 341 if err := dsset.CleanupStorage(c, cleanup); err != nil { | |
| 342 logging.Warningf(c, "Best effort cleanup failed - %s", e rr) | |
| 343 } | |
| 344 } | |
| 345 | |
| 346 return transient.Tag.Apply(err) | |
| 347 } | |
| 348 | |
| 349 func handleBatchLaunch(c context.Context, task proto.Message, execCount int) err or { | |
| 350 msg := task.(*internal.LaunchInvocationsBatchTask) | |
| 351 logging.Infof(c, "Batch launch for %q", msg.JobId) | |
| 352 | |
| 353 tasks := []*tq.Task{} | |
| 354 for _, invId := range msg.InvId { | |
| 355 logging.Infof(c, "Launching inv-%d", invId) | |
| 356 tasks = append(tasks, &tq.Task{ | |
| 357 DeduplicationKey: fmt.Sprintf("inv:%s:%d", msg.JobId, in vId), | |
| 358 Payload: &internal.LaunchInvocationTask{ | |
| 359 JobId: msg.JobId, | |
| 360 InvId: invId, | |
| 361 }, | |
| 362 }) | |
| 363 } | |
| 364 | |
| 365 return dispatcher.AddTasks(c, tasks) | |
| 366 } | |
| 367 | |
| 368 func handleLaunchTask(c context.Context, task proto.Message, execCount int) erro r { | |
| 369 msg := task.(*internal.LaunchInvocationTask) | |
| 370 logging.Infof(c, "Executing invocation %q: exec-%d", msg.JobId, msg.InvI d) | |
| 371 | |
| 372 // There can be more stuff here. But we just finish the invocation right away. | |
| 373 | |
| 374 finishedSet := recentlyFinishedSet(c, msg.JobId) | |
| 375 err := finishedSet.Add(c, []dsset.Item{ | |
| 376 {ID: fmt.Sprintf("%d", msg.InvId)}, | |
| 377 }) | |
| 378 if err != nil { | |
| 379 return err | |
| 380 } | |
| 381 | |
| 382 // Kick the triage now that the set of running invocations has been modi fied. | |
| 383 return kickTriageTask(c, msg.JobId) | |
| 131 } | 384 } |
| 132 | 385 |
| 133 func init() { | 386 func init() { |
| 134 r := router.New() | 387 r := router.New() |
| 135 gaemiddleware.InstallHandlers(r) | 388 gaemiddleware.InstallHandlers(r) |
| 136 | 389 |
| 137 » tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil ) | 390 » dispatcher.RegisterTask(&internal.TickLaterTask{}, handleTick, "timers", nil) |
| 138 » tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "d efault", nil) | 391 » dispatcher.RegisterTask(&internal.TriggerInvocationTask{}, handleTrigger , "triggers", nil) |
| 139 » tasks.InstallRoutes(r, gaemiddleware.BaseProd()) | 392 » dispatcher.RegisterTask(&internal.TriageTriggersTask{}, handleTriage, "t riages", nil) |
| 393 » dispatcher.RegisterTask(&internal.LaunchInvocationsBatchTask{}, handleBa tchLaunch, "launches", nil) | |
| 394 » dispatcher.RegisterTask(&internal.LaunchInvocationTask{}, handleLaunchTa sk, "invocations", nil) | |
| 395 » dispatcher.InstallRoutes(r, gaemiddleware.BaseProd()) | |
| 140 | 396 |
| 141 // Kick-start a bunch of jobs by visiting: | 397 // Kick-start a bunch of jobs by visiting: |
| 142 // | 398 // |
| 143 // http://localhost:8080/start/with 10s interval | 399 // http://localhost:8080/start/with 10s interval |
| 144 // http://localhost:8080/start/with 5s interval | 400 // http://localhost:8080/start/with 5s interval |
| 145 // http://localhost:8080/start/0 * * * * * * * | 401 // http://localhost:8080/start/0 * * * * * * * |
| 146 // | 402 // |
| 147 // And the look at the logs. | 403 // And the look at the logs. |
| 148 | 404 |
| 149 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { | 405 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { |
| 150 jobID := c.Params.ByName("JobID") | 406 jobID := c.Params.ByName("JobID") |
| 151 if err := startJob(c.Context, jobID); err != nil { | 407 if err := startJob(c.Context, jobID); err != nil { |
| 152 panic(err) | 408 panic(err) |
| 153 } | 409 } |
| 154 }) | 410 }) |
| 155 | 411 |
| 412 r.GET("/trigger/:JobID", gaemiddleware.BaseProd(), func(c *router.Contex t) { | |
| 413 jobID := c.Params.ByName("JobID") | |
| 414 triggerID := fmt.Sprintf("manual:%d", clock.Now(c.Context).UnixN ano()) | |
| 415 if err := addTrigger(c.Context, jobID, triggerID); err != nil { | |
| 416 panic(err) | |
| 417 } | |
| 418 }) | |
| 419 | |
| 156 http.DefaultServeMux.Handle("/", r) | 420 http.DefaultServeMux.Handle("/", r) |
| 157 } | 421 } |
| OLD | NEW |