Chromium Code Reviews| Index: scheduler/appengine/engine/cron/machine.go |
| diff --git a/scheduler/appengine/engine/cron/machine.go b/scheduler/appengine/engine/cron/machine.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..6adef9aa07c06a4aaf3db0fdbde5130dfd965790 |
| --- /dev/null |
| +++ b/scheduler/appengine/engine/cron/machine.go |
| @@ -0,0 +1,240 @@ |
| +// Copyright 2017 The LUCI Authors. |
| +// |
| +// Licensed under the Apache License, Version 2.0 (the "License"); |
| +// you may not use this file except in compliance with the License. |
| +// You may obtain a copy of the License at |
| +// |
| +// http://www.apache.org/licenses/LICENSE-2.0 |
| +// |
| +// Unless required by applicable law or agreed to in writing, software |
| +// distributed under the License is distributed on an "AS IS" BASIS, |
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| +// See the License for the specific language governing permissions and |
| +// limitations under the License. |
| + |
| +package cron |
| + |
| +import ( |
| + "fmt" |
| + "time" |
| + |
| + "github.com/luci/luci-go/scheduler/appengine/schedule" |
| +) |
| + |
| +// State stores serializable state of the cron machine. |
| +// |
| +// Whoever hosts the cron machine is supposed to store this state in some |
| +// persistent store between events. It's mutated by Machine. So the usage |
| +// pattern is: |
| +// * Deserialize State, construct Machine instance with it. |
| +// * Invoke some Machine method (e.g Enable()) to advance the state. |
| +// * Acknowledge all actions emitted by the machine (see Machine.Actions). |
| +// * Serialize the mutated state (available in Machine.State). |
| +// |
| +// If appropriate, all of the above should be done in a transaction. |
| +// |
| +// Machine assumes that whoever hosts it handles TickLaterAction with following |
| +// semantics: |
| +// * A scheduled tick can't be "unscheduled". |
| +// * A scheduled tick may come more than one time. |
| +// |
| +// So the machine just ignores ticks it doesn't expect. |
| +// |
| +// It supports "absolute" and "relative" schedules, see 'schedule' package for |
| +// definitions. |
| +type State struct { |
| + // Enabled is true if the cron machine is running. |
| + // |
| + // A disabled cron machine ignores all events except 'Enable'. |
| + Enabled bool |
| + |
| + // LastRewind is a time when the cron machine was restarted last time. |
| + // |
| + // For relative schedules, it's a time RewindIfNecessary() was called. For |
| + // absolute schedules it is last time invocation happened (cron machines on |
| + // absolute schedules auto-rewind themselves). |
| + LastRewind time.Time |
| + |
| + // LastTick is last emitted tick request (or empty struct). |
| + // |
| + // It may be scheduled for "distant future" for paused cron machines. |
| + LastTick TickLaterAction |
| +} |
| + |
| +// IsSuspended returns true if the cron machine is not waiting for a tick. |
| +// |
| +// This happens for paused cron machines (they technically are scheduled for |
| +// a tick in a distant future) and for cron machines on relative schedule that |
| +// wait for 'RewindIfNecessary' to be called to start ticking again. |
| +// |
| +// A disabled cron machine is also considered suspended. |
| +func (s *State) IsSuspended() bool { |
| + return !s.Enabled || s.LastTick.When.IsZero() || s.LastTick.When == schedule.DistantFuture |
| +} |
| + |
| +//////////////////////////////////////////////////////////////////////////////// |
| + |
| +// Action is a particular action to perform when switching the state. |
| +// |
| +// Can be type cast to some concrete *Action struct. Intended to be handled by |
| +// whoever hosts the cron machine. |
| +type Action interface { |
| + IsAction() bool |
| +} |
| + |
| +// TickLaterAction schedules an OnTimerTick call at given moment in time. |
| +// |
| +// TickNonce is used by cron machine to skip canceled or repeated ticks. |
| +type TickLaterAction struct { |
| + When time.Time |
| + TickNonce int64 |
| +} |
| + |
| +// IsAction makes TickLaterAction implement Action interface. |
| +func (a TickLaterAction) IsAction() bool { return true } |
| + |
| +// StartInvocationAction is emitted when the scheduled moment comes. |
| +// |
| +// A handler is expected to call RewindIfNecessary() at some later time to |
| +// restart the cron machine if it's running on a relative schedule (e.g. "with |
| +// 10 sec interval"). Cron machines on relative schedules are "one shot". They |
| +// need to be rewound to start counting time again. |
| +// |
| +// Cron machines on absolute schedules (regular crons, like "at 12 AM every |
| +// day") don't need rewinding, they'll start counting time until next invocation |
| +// automatically. |
| +type StartInvocationAction struct{} |
| + |
| +// IsAction makes StartInvocationAction implement Action interface. |
| +func (a StartInvocationAction) IsAction() bool { return true } |
| + |
| +//////////////////////////////////////////////////////////////////////////////// |
| + |
| +// Machine advances the state of the cron machine. |
| +// |
| +// It gracefully handles various kinds of external events (like pauses and |
| +// schedule changes) and emits actions that's supposed to handled by whoever |
| +// hosts it. |
| +type Machine struct { |
| + // Inputs. |
| + Now time.Time // current time |
| + Schedule *schedule.Schedule // knows when to emit invocation action |
| + Nonce func() int64 // produces nonces on demand |
| + |
| + // Mutated. |
| + State State // state of the cron machine, mutated by its methods |
| + Actions []Action // all emitted actions (if any) |
| +} |
| + |
| +// Enable makes the cron machine start counting time. |
| +// |
| +// Does nothing if already enabled. |
| +func (m *Machine) Enable() { |
| + if !m.State.Enabled { |
| + m.State = State{Enabled: true, LastRewind: m.Now} // reset state |
| + m.scheduleTick() |
| + } |
| +} |
| + |
| +// Disable stops any pending timer ticks, resets state. |
| +// |
| +// The cron machine will ignore any events until Enable is called to turn it on. |
| +func (m *Machine) Disable() { |
| + m.State = State{Enabled: false} |
| +} |
| + |
| +// RewindIfNecessary is called to restart the cron after it has fired the |
| +// invocation action. |
| +// |
| +// Does nothing if the cron is disabled or already ticking. |
| +func (m *Machine) RewindIfNecessary() { |
| + if m.State.Enabled && m.State.LastTick.When.IsZero() { |
| + m.State.LastRewind = m.Now |
| + m.scheduleTick() |
| + } |
| +} |
| + |
| +// OnScheduleChange happens when cron's schedule changes. |
| +// |
| +// In particular, it handles switches between absolute and relative schedules. |
| +func (m *Machine) OnScheduleChange() { |
| + // Do not touch timers on disabled cron machines. |
| + if !m.State.Enabled { |
| + return |
| + } |
| + |
| + // The following condition is true for cron machines on a relative schedule |
| + // that have already "fired", and currently wait for manual RewindIfNecessary |
| + // call to start ticking again. When such cron machines switch to an absolute |
| + // schedule, we need to rewind them right away (since machines on absolute |
| + // schedules always tick!). If the new schedule is also relative, do nothing: |
| + // RewindIfNecessary() should be called manually by the host at some later |
| + // time (as usual for relative schedules). |
| + if m.State.LastTick.When.IsZero() { |
| + if m.Schedule.IsAbsolute() { |
| + m.RewindIfNecessary() |
| + } |
| + } else { |
| + // In this branch, the cron machine has a timer tick scheduled. It means it |
| + // is either in a relative or absolute schedule, and this schedule may have |
| + // changed, so we may need to move the tick to reflect the change. Note that |
| + // we are not resetting LastRewind here, since we want the new schedule to |
| + // take into account real last RewindIfNecessary call. For example, if the |
| + // last rewind happened at moment X, current time is Now, and the new |
| + // schedule is "with 10s interval", we want the tick to happen at "X+10", |
| + // not "Now+10". |
| + m.scheduleTick() |
| + } |
| +} |
| + |
| +// OnTimerTick happens when a scheduled timer tick (added with TickLaterAction) |
| +// occurs. |
| +// |
| +// Returns an error if the tick happened too soon. |
| +func (m *Machine) OnTimerTick(tickNonce int64) error { |
| + // Silently skip unexpected, late or canceled ticks. This is fine. |
| + switch { |
| + case m.State.IsSuspended(): |
| + return nil |
| + case m.State.LastTick.TickNonce != tickNonce: |
| + return nil |
| + } |
| + |
| + // Report error (to trigger a retry) if the tick happened unexpectedly soon. |
| + // Absolute schedules may report "wrong" next tick time if asked for a next |
| + // tick before previous one has happened. |
| + delay := m.Now.Sub(m.State.LastTick.When) |
| + if delay < 0 { |
|
tandrii(chromium)
2017/07/14 18:17:02
nit: combine two lines?
Vadim Sh.
2017/07/14 18:29:12
Ack, will do.
Vadim Sh.
2017/07/14 21:57:21
Done.
|
| + return fmt.Errorf("tick happened %.1f sec before it was expected", -delay.Seconds()) |
| + } |
| + |
| + // The scheduled time has come! |
| + m.Actions = append(m.Actions, StartInvocationAction{}) |
| + m.State.LastTick = TickLaterAction{} |
| + |
| + // Start waiting for a new tick right away if on an absolute schedule or just |
| + // keep the tick state clear for relative schedules: new tick will be set when |
| + // RewindIfNecessary() is manually called by whoever handles the cron. |
| + if m.Schedule.IsAbsolute() { |
| + m.RewindIfNecessary() |
| + } |
| + |
| + return nil |
| +} |
| + |
| +// scheduleTick emits TickLaterAction action according to the schedule, current |
| +// time, and last time Rewind was called. |
| +// |
| +// Does nothing if such tick has already been scheduled. |
| +func (m *Machine) scheduleTick() { |
| + nextTickTime := m.Schedule.Next(m.Now, m.State.LastRewind) |
| + if nextTickTime != m.State.LastTick.When { |
| + m.State.LastTick = TickLaterAction{ |
| + When: nextTickTime, |
| + TickNonce: m.Nonce(), |
| + } |
| + if nextTickTime != schedule.DistantFuture { |
|
tandrii(chromium)
2017/07/14 18:17:02
is this actually necessary for a cron job? AFAIR,
Vadim Sh.
2017/07/14 18:29:12
Maybe not... I'm not sure yet. I'm keeping it so f
tandrii(chromium)
2017/07/14 18:36:08
oops, that was my cache miss. Thanks for reminder.
|
| + m.Actions = append(m.Actions, m.State.LastTick) |
| + } |
| + } |
| +} |