Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2017 The LUCI Authors. | 1 // Copyright 2017 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. | 13 // limitations under the License. |
| 14 | 14 |
| 15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. | 15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. |
| 16 package demo | 16 package demo |
| 17 | 17 |
| 18 import ( | 18 import ( |
| 19 "fmt" | |
| 19 "net/http" | 20 "net/http" |
| 21 "strconv" | |
| 22 "sync" | |
| 20 "time" | 23 "time" |
| 21 | 24 |
| 22 "golang.org/x/net/context" | 25 "golang.org/x/net/context" |
| 23 | 26 |
| 24 "github.com/golang/protobuf/proto" | 27 "github.com/golang/protobuf/proto" |
| 25 "github.com/luci/gae/service/datastore" | 28 "github.com/luci/gae/service/datastore" |
| 29 "github.com/luci/gae/service/info" | |
| 26 | 30 |
| 27 "github.com/luci/luci-go/appengine/gaemiddleware" | 31 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 32 "github.com/luci/luci-go/appengine/memlock" | |
| 28 "github.com/luci/luci-go/common/clock" | 33 "github.com/luci/luci-go/common/clock" |
| 29 "github.com/luci/luci-go/common/data/rand/mathrand" | 34 "github.com/luci/luci-go/common/data/rand/mathrand" |
| 30 "github.com/luci/luci-go/common/logging" | 35 "github.com/luci/luci-go/common/logging" |
| 36 "github.com/luci/luci-go/common/retry/transient" | |
| 31 "github.com/luci/luci-go/server/router" | 37 "github.com/luci/luci-go/server/router" |
| 32 | 38 |
| 33 "github.com/luci/luci-go/scheduler/appengine/engine/cron" | 39 "github.com/luci/luci-go/scheduler/appengine/engine/cron" |
| 40 "github.com/luci/luci-go/scheduler/appengine/engine/dsset" | |
| 34 "github.com/luci/luci-go/scheduler/appengine/engine/internal" | 41 "github.com/luci/luci-go/scheduler/appengine/engine/internal" |
| 35 "github.com/luci/luci-go/scheduler/appengine/engine/tq" | 42 "github.com/luci/luci-go/scheduler/appengine/engine/tq" |
| 36 "github.com/luci/luci-go/scheduler/appengine/schedule" | 43 "github.com/luci/luci-go/scheduler/appengine/schedule" |
| 37 ) | 44 ) |
| 38 | 45 |
| 39 var tasks = tq.Dispatcher{} | 46 var dispatcher = tq.Dispatcher{} |
| 40 | 47 |
| 41 type CronState struct { | 48 type CronState struct { |
| 42 _extra datastore.PropertyMap `gae:"-,extra"` | 49 _extra datastore.PropertyMap `gae:"-,extra"` |
| 43 | 50 |
| 44 ID string `gae:"$id"` | 51 ID string `gae:"$id"` |
| 45 State cron.State `gae:",noindex"` | 52 State cron.State `gae:",noindex"` |
| 53 | |
| 54 NextInvID int64 `gae:",noindex"` | |
| 55 RunningSet []int64 `gae:",noindex"` | |
| 46 } | 56 } |
| 47 | 57 |
| 48 func (s *CronState) schedule() *schedule.Schedule { | 58 func (s *CronState) schedule() *schedule.Schedule { |
| 49 parsed, err := schedule.Parse(s.ID, 0) | 59 parsed, err := schedule.Parse(s.ID, 0) |
| 50 if err != nil { | 60 if err != nil { |
| 51 panic(err) | 61 panic(err) |
| 52 } | 62 } |
| 53 return parsed | 63 return parsed |
| 54 } | 64 } |
| 55 | 65 |
| 56 // evolve instantiates cron.Machine, calls the callback and submits emitted | 66 func pendingTriggersSet(c context.Context, jobID string) *dsset.Set { |
| 57 // actions. | 67 » return &dsset.Set{ |
| 68 » » ID: "triggers:" + jobID, | |
| 69 » » ShardCount: 8, | |
| 70 » » TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}), | |
| 71 » » TombstonesDelay: 3 * time.Minute, | |
| 72 » } | |
| 73 } | |
| 74 | |
| 75 func recentlyFinishedSet(c context.Context, jobID string) *dsset.Set { | |
| 76 » return &dsset.Set{ | |
| 77 » » ID: "finished:" + jobID, | |
| 78 » » ShardCount: 8, | |
| 79 » » TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}), | |
| 80 » » TombstonesDelay: 3 * time.Minute, | |
| 81 » } | |
| 82 } | |
| 83 | |
| 84 func pokeMachine(c context.Context, entity *CronState, cb func(context.Context, *cron.Machine) error) error { | |
| 85 » machine := &cron.Machine{ | |
| 86 » » Now: clock.Now(c), | |
| 87 » » Schedule: entity.schedule(), | |
| 88 » » Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, | |
| 89 » » State: entity.State, | |
| 90 » } | |
| 91 | |
| 92 » if err := cb(c, machine); err != nil { | |
| 93 » » return err | |
| 94 » } | |
| 95 | |
| 96 » tasks := []*tq.Task{} | |
| 97 » for _, action := range machine.Actions { | |
| 98 » » switch a := action.(type) { | |
| 99 » » case cron.TickLaterAction: | |
| 100 » » » logging.Infof(c, "Scheduling tick %d after %s", a.TickNo nce, a.When.Sub(time.Now())) | |
| 101 » » » tasks = append(tasks, &tq.Task{ | |
| 102 » » » » Payload: &internal.TickLaterTask{JobId: entity.I D, TickNonce: a.TickNonce}, | |
| 103 » » » » ETA: a.When, | |
| 104 » » » }) | |
| 105 » » case cron.StartInvocationAction: | |
| 106 » » » tasks = append(tasks, &tq.Task{ | |
| 107 » » » » Payload: &internal.TriggerInvocationTask{JobId: entity.ID, TriggerId: mathrand.Get(c).Int63()}, | |
| 108 » » » }) | |
| 109 » » default: | |
| 110 » » » panic("unknown action type") | |
| 111 » » } | |
| 112 » } | |
| 113 » if err := dispatcher.AddTasks(c, tasks); err != nil { | |
| 114 » » return err | |
| 115 » } | |
| 116 | |
| 117 » entity.State = machine.State | |
| 118 » return nil | |
| 119 } | |
| 120 | |
| 58 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error { | 121 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error { |
| 59 err := datastore.RunInTransaction(c, func(c context.Context) error { | 122 err := datastore.RunInTransaction(c, func(c context.Context) error { |
| 60 » » entity := CronState{ID: id} | 123 » » entity := &CronState{ID: id} |
| 61 » » if err := datastore.Get(c, &entity); err != nil && err != datast ore.ErrNoSuchEntity { | 124 » » if err := datastore.Get(c, entity); err != nil && err != datasto re.ErrNoSuchEntity { |
| 62 return err | 125 return err |
| 63 } | 126 } |
| 64 | 127 » » if err := pokeMachine(c, entity, cb); err != nil { |
| 65 » » machine := &cron.Machine{ | |
| 66 » » » Now: clock.Now(c), | |
| 67 » » » Schedule: entity.schedule(), | |
| 68 » » » Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, | |
| 69 » » » State: entity.State, | |
| 70 » » } | |
| 71 | |
| 72 » » if err := cb(c, machine); err != nil { | |
| 73 return err | 128 return err |
| 74 } | 129 } |
| 75 | 130 » » return datastore.Put(c, entity) |
| 76 » » for _, action := range machine.Actions { | |
| 77 » » » var task tq.Task | |
| 78 » » » switch a := action.(type) { | |
| 79 » » » case cron.TickLaterAction: | |
| 80 » » » » logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) | |
| 81 » » » » task = tq.Task{ | |
| 82 » » » » » Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce}, | |
| 83 » » » » » ETA: a.When, | |
| 84 » » » » } | |
| 85 » » » case cron.StartInvocationAction: | |
| 86 » » » » task = tq.Task{ | |
| 87 » » » » » Payload: &internal.StartInvocationTask{J obId: id}, | |
| 88 » » » » » Delay: time.Second, // give the transa ction time to land | |
| 89 » » » » } | |
| 90 » » » default: | |
| 91 » » » » panic("unknown action type") | |
| 92 » » » } | |
| 93 » » » if err := tasks.AddTask(c, &task); err != nil { | |
| 94 » » » » return err | |
| 95 » » » } | |
| 96 » » } | |
| 97 | |
| 98 » » entity.State = machine.State | |
| 99 » » return datastore.Put(c, &entity) | |
| 100 }, nil) | 131 }, nil) |
| 101 | 132 » return transient.Tag.Apply(err) |
| 102 » if err != nil { | |
| 103 » » logging.Errorf(c, "FAIL - %s", err) | |
| 104 » } | |
| 105 » return err | |
| 106 } | 133 } |
| 107 | 134 |
| 108 func startJob(c context.Context, id string) error { | 135 func startJob(c context.Context, id string) error { |
| 109 return evolve(c, id, func(c context.Context, m *cron.Machine) error { | 136 return evolve(c, id, func(c context.Context, m *cron.Machine) error { |
| 110 // Forcefully restart the chain of tasks. | 137 // Forcefully restart the chain of tasks. |
| 111 m.Disable() | 138 m.Disable() |
| 112 m.Enable() | 139 m.Enable() |
| 113 return nil | 140 return nil |
| 114 }) | 141 }) |
| 115 } | 142 } |
| 116 | 143 |
| 144 func addTrigger(c context.Context, jobID, triggerID string) error { | |
| 145 logging.Infof(c, "Triggering %q - %q", jobID, triggerID) | |
| 146 | |
| 147 // Add the trigger request to the pending set. | |
| 148 if err := pendingTriggersSet(c, jobID).Add(c, []dsset.Item{{ID: triggerI D}}); err != nil { | |
| 149 return err | |
| 150 } | |
| 151 | |
| 152 // Run a task that examines the pending set and makes decisions. | |
| 153 return kickTriageTask(c, jobID) | |
| 154 } | |
| 155 | |
| 156 func kickTriageTask(c context.Context, jobID string) error { | |
| 157 // Throttle to once per 2 sec (and make sure it is always in the future) . | |
| 158 eta := clock.Now(c).Unix() | |
| 159 eta = (eta/2 + 1) * 2 | |
| 160 return dispatcher.AddTask(c, &tq.Task{ | |
| 161 DeduplicationKey: fmt.Sprintf("triage:%s:%d", jobID, eta), | |
| 162 ETA: time.Unix(eta, 0), | |
| 163 Payload: &internal.TriageTriggersTask{JobId: jobID}, | |
| 164 }) | |
| 165 } | |
| 166 | |
| 117 func handleTick(c context.Context, task proto.Message, execCount int) error { | 167 func handleTick(c context.Context, task proto.Message, execCount int) error { |
| 118 msg := task.(*internal.TickLaterTask) | 168 msg := task.(*internal.TickLaterTask) |
| 119 return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { | 169 return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { |
| 120 return m.OnTimerTick(msg.TickNonce) | 170 return m.OnTimerTick(msg.TickNonce) |
| 121 }) | 171 }) |
| 122 } | 172 } |
| 123 | 173 |
| 124 func handleInvocation(c context.Context, task proto.Message, execCount int) erro r { | 174 func handleTrigger(c context.Context, task proto.Message, execCount int) error { |
| 125 » msg := task.(*internal.StartInvocationTask) | 175 » msg := task.(*internal.TriggerInvocationTask) |
| 126 » logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId) | 176 » return addTrigger(c, msg.JobId, fmt.Sprintf("cron:%d", msg.TriggerId)) |
| 127 » return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { | 177 } |
| 128 » » m.RewindIfNecessary() | 178 |
| 129 » » return nil | 179 func handleTriage(c context.Context, task proto.Message, execCount int) error { |
| 180 » msg := task.(*internal.TriageTriggersTask) | |
| 181 » logging.Infof(c, "Triaging requests for %q", msg.JobId) | |
| 182 | |
| 183 » err := memlock.TryWithLock(c, "triageLock:"+msg.JobId, info.RequestID(c) , func(context.Context) error { | |
| 184 » » logging.Infof(c, "Got the lock!") | |
| 185 » » return runTriage(c, msg.JobId) | |
| 130 }) | 186 }) |
| 187 return transient.Tag.Apply(err) | |
| 188 } | |
| 189 | |
| 190 func runTriage(c context.Context, jobID string) error { | |
|
tandrii(chromium)
2017/07/21 13:42:52
Do IIC that this is each trigger job could also ch
Vadim Sh.
2017/07/24 00:30:15
Only if it's fine with using stale data. It can't
tandrii(chromium)
2017/07/31 18:58:45
Ah, so the policy is part of the triggerED job, no
| |
| 191 wg := sync.WaitGroup{} | |
| 192 wg.Add(2) | |
| 193 | |
| 194 var triggers []dsset.Item | |
| 195 var triggerTombs []dsset.Tombstone | |
| 196 var triggerErr error | |
| 197 | |
| 198 // Grab all pending requests (and stuff to cleanup). | |
| 199 triggersSet := pendingTriggersSet(c, jobID) | |
| 200 go func() { | |
| 201 defer wg.Done() | |
| 202 triggers, triggerTombs, triggerErr = triggersSet.List(c) | |
| 203 if triggerErr == nil { | |
| 204 logging.Infof(c, "Triggers: %d items, %d tombs to cleanu p", len(triggers), len(triggerTombs)) | |
| 205 triggerErr = triggersSet.CleanupStorage(c, triggerTombs) | |
| 206 } | |
| 207 }() | |
| 208 | |
| 209 var finished []dsset.Item | |
| 210 var finishedTombs []dsset.Tombstone | |
| 211 var finishedErr error | |
| 212 | |
| 213 // Same for recently finished invocations. | |
| 214 finishedSet := recentlyFinishedSet(c, jobID) | |
| 215 go func() { | |
| 216 wg.Done() | |
| 217 finished, finishedTombs, finishedErr = finishedSet.List(c) | |
| 218 if finishedErr == nil { | |
| 219 logging.Infof(c, "Finished: %d items, %d tombs to cleanu p", len(finished), len(finishedTombs)) | |
| 220 finishedErr = finishedSet.CleanupStorage(c, finishedTomb s) | |
| 221 } | |
| 222 }() | |
| 223 | |
| 224 // Do the cleanups first. | |
| 225 wg.Wait() | |
| 226 if triggerErr != nil { | |
| 227 return triggerErr | |
| 228 } | |
| 229 if finishedErr != nil { | |
| 230 return finishedErr | |
| 231 } | |
| 232 | |
| 233 err := datastore.RunInTransaction(c, func(c context.Context) error { | |
| 234 state := &CronState{ID: jobID} | |
| 235 if err := datastore.Get(c, state); err != nil && err != datastor e.ErrNoSuchEntity { | |
| 236 return err | |
| 237 } | |
| 238 | |
| 239 // Tidy RunningSet by removing all recently finished invocations . | |
| 240 if len(finished) != 0 || len(finishedTombs) != 0 { | |
| 241 op, err := finishedSet.BeginPop(c) | |
| 242 if err != nil { | |
| 243 return err | |
| 244 } | |
| 245 op.CleanupTombstones(finishedTombs) | |
| 246 | |
| 247 reallyFinished := map[int64]struct{}{} | |
| 248 for _, itm := range finished { | |
| 249 if op.Pop(itm.ID) { | |
| 250 id, _ := strconv.ParseInt(itm.ID, 10, 64 ) | |
| 251 reallyFinished[id] = struct{}{} | |
| 252 } | |
| 253 } | |
| 254 | |
| 255 if err := op.Submit(); err != nil { | |
| 256 return err | |
| 257 } | |
| 258 | |
| 259 filtered := []int64{} | |
| 260 for _, id := range state.RunningSet { | |
| 261 if _, yep := reallyFinished[id]; yep { | |
| 262 logging.Infof(c, "Invocation finished-%d is acknowledged as finished", id) | |
| 263 } else { | |
| 264 filtered = append(filtered, id) | |
| 265 } | |
| 266 } | |
| 267 state.RunningSet = filtered | |
| 268 } | |
| 269 | |
| 270 // Launch new invocations for each pending trigger. | |
| 271 | |
| 272 op, err := triggersSet.BeginPop(c) | |
| 273 if err != nil { | |
| 274 return err | |
| 275 } | |
| 276 op.CleanupTombstones(triggerTombs) | |
| 277 | |
| 278 batch := internal.LaunchInvocationsBatchTask{JobId: state.ID} | |
| 279 for _, trigger := range triggers { | |
| 280 if op.Pop(trigger.ID) { | |
| 281 logging.Infof(c, "Launching new launch-%d for tr igger %s", state.NextInvID, trigger.ID) | |
| 282 state.RunningSet = append(state.RunningSet, stat e.NextInvID) | |
| 283 batch.InvId = append(batch.InvId, state.NextInvI D) | |
| 284 state.NextInvID++ | |
| 285 } | |
| 286 } | |
| 287 | |
| 288 if err := op.Submit(); err != nil { | |
| 289 return err | |
| 290 } | |
| 291 | |
| 292 // Transactionally trigger a batch with new invocations. | |
| 293 if len(batch.InvId) != 0 { | |
| 294 if err := dispatcher.AddTask(c, &tq.Task{Payload: &batch }); err != nil { | |
| 295 return err | |
| 296 } | |
| 297 } | |
| 298 | |
| 299 logging.Infof(c, "Running invocations - %v", state.RunningSet) | |
| 300 | |
| 301 // If nothing is running, poke the cron machine. Maybe it wants to start | |
| 302 // something. | |
| 303 if len(state.RunningSet) == 0 { | |
| 304 err := pokeMachine(c, state, func(c context.Context, m * cron.Machine) error { | |
| 305 m.RewindIfNecessary() | |
| 306 return nil | |
| 307 }) | |
| 308 if err != nil { | |
| 309 return err | |
| 310 } | |
| 311 } | |
| 312 | |
| 313 // Done! | |
| 314 return datastore.Put(c, state) | |
| 315 }, nil) | |
| 316 return transient.Tag.Apply(err) | |
| 317 } | |
| 318 | |
| 319 func handleBatchLaunch(c context.Context, task proto.Message, execCount int) err or { | |
| 320 msg := task.(*internal.LaunchInvocationsBatchTask) | |
| 321 logging.Infof(c, "Batch launch for %q", msg.JobId) | |
| 322 | |
| 323 tasks := []*tq.Task{} | |
| 324 for _, invId := range msg.InvId { | |
| 325 logging.Infof(c, "Launching inv-%d", invId) | |
| 326 tasks = append(tasks, &tq.Task{ | |
| 327 DeduplicationKey: fmt.Sprintf("inv:%s:%d", msg.JobId, in vId), | |
| 328 Payload: &internal.LaunchInvocationTask{ | |
| 329 JobId: msg.JobId, | |
| 330 InvId: invId, | |
| 331 }, | |
| 332 }) | |
| 333 } | |
| 334 | |
| 335 return dispatcher.AddTasks(c, tasks) | |
| 336 } | |
| 337 | |
| 338 func handleLaunchTask(c context.Context, task proto.Message, execCount int) erro r { | |
| 339 msg := task.(*internal.LaunchInvocationTask) | |
| 340 logging.Infof(c, "Executing invocation %q: exec-%d", msg.JobId, msg.InvI d) | |
| 341 | |
| 342 // There can be more stuff here. But we just finish the invocation right away. | |
| 343 | |
| 344 finishedSet := recentlyFinishedSet(c, msg.JobId) | |
| 345 err := finishedSet.Add(c, []dsset.Item{ | |
| 346 {ID: fmt.Sprintf("%d", msg.InvId)}, | |
| 347 }) | |
| 348 if err != nil { | |
| 349 return err | |
| 350 } | |
| 351 | |
| 352 // Kick the triage now that the set of running invocations has been modi fied. | |
| 353 return kickTriageTask(c, msg.JobId) | |
| 131 } | 354 } |
| 132 | 355 |
| 133 func init() { | 356 func init() { |
| 134 r := router.New() | 357 r := router.New() |
| 135 gaemiddleware.InstallHandlers(r) | 358 gaemiddleware.InstallHandlers(r) |
| 136 | 359 |
| 137 » tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil ) | 360 » dispatcher.RegisterTask(&internal.TickLaterTask{}, handleTick, "timers", nil) |
| 138 » tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "d efault", nil) | 361 » dispatcher.RegisterTask(&internal.TriggerInvocationTask{}, handleTrigger , "triggers", nil) |
| 139 » tasks.InstallRoutes(r, gaemiddleware.BaseProd()) | 362 » dispatcher.RegisterTask(&internal.TriageTriggersTask{}, handleTriage, "t riages", nil) |
| 363 » dispatcher.RegisterTask(&internal.LaunchInvocationsBatchTask{}, handleBa tchLaunch, "launches", nil) | |
| 364 » dispatcher.RegisterTask(&internal.LaunchInvocationTask{}, handleLaunchTa sk, "invocations", nil) | |
| 365 » dispatcher.InstallRoutes(r, gaemiddleware.BaseProd()) | |
| 140 | 366 |
| 141 // Kick-start a bunch of jobs by visiting: | 367 // Kick-start a bunch of jobs by visiting: |
| 142 // | 368 // |
| 143 // http://localhost:8080/start/with 10s interval | 369 // http://localhost:8080/start/with 10s interval |
| 144 // http://localhost:8080/start/with 5s interval | 370 // http://localhost:8080/start/with 5s interval |
| 145 // http://localhost:8080/start/0 * * * * * * * | 371 // http://localhost:8080/start/0 * * * * * * * |
| 146 // | 372 // |
| 147 // And the look at the logs. | 373 // And the look at the logs. |
| 148 | 374 |
| 149 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { | 375 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { |
| 150 jobID := c.Params.ByName("JobID") | 376 jobID := c.Params.ByName("JobID") |
| 151 if err := startJob(c.Context, jobID); err != nil { | 377 if err := startJob(c.Context, jobID); err != nil { |
| 152 panic(err) | 378 panic(err) |
| 153 } | 379 } |
| 154 }) | 380 }) |
| 155 | 381 |
| 382 r.GET("/trigger/:JobID", gaemiddleware.BaseProd(), func(c *router.Contex t) { | |
| 383 jobID := c.Params.ByName("JobID") | |
| 384 triggerID := fmt.Sprintf("manual:%d", clock.Now(c.Context).UnixN ano()) | |
| 385 if err := addTrigger(c.Context, jobID, triggerID); err != nil { | |
| 386 panic(err) | |
| 387 } | |
| 388 }) | |
| 389 | |
| 156 http.DefaultServeMux.Handle("/", r) | 390 http.DefaultServeMux.Handle("/", r) |
| 157 } | 391 } |
| OLD | NEW |