Index: scheduler/appengine/engine/cron/machine.go |
diff --git a/scheduler/appengine/engine/cron/machine.go b/scheduler/appengine/engine/cron/machine.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..792b7b002e5e0364e3efd391e5bc5c0ad331419a |
--- /dev/null |
+++ b/scheduler/appengine/engine/cron/machine.go |
@@ -0,0 +1,240 @@ |
+// Copyright 2017 The LUCI Authors. |
+// |
+// Licensed under the Apache License, Version 2.0 (the "License"); |
+// you may not use this file except in compliance with the License. |
+// You may obtain a copy of the License at |
+// |
+// http://www.apache.org/licenses/LICENSE-2.0 |
+// |
+// Unless required by applicable law or agreed to in writing, software |
+// distributed under the License is distributed on an "AS IS" BASIS, |
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+// See the License for the specific language governing permissions and |
+// limitations under the License. |
+ |
+package cron |
+ |
+import ( |
+ "fmt" |
+ "time" |
+ |
+ "github.com/luci/luci-go/scheduler/appengine/schedule" |
+) |
+ |
+// State stores serializable state of the cron machine. |
+// |
+// Whoever hosts the cron machine is supposed to store this state in some |
+// persistent store between events. It's mutated by Machine. So the usage |
+// pattern is: |
+// * Deserialize State, construct Machine instance with it. |
+// * Invoke some Machine method (e.g Enable()) to advance the state. |
+// * Acknowledge all actions emitted by the machine (see Machine.Actions). |
+// * Serialize the mutated state (available in Machine.State). |
+// |
+// If appropriate, all of the above should be done in a transaction. |
+// |
+// Machine assumes that whoever hosts it handles TickLaterAction with following |
+// semantics: |
+// * A scheduled tick can't be "unscheduled". |
+// * A scheduled tick may come more than one time. |
+// |
+// So the machine just ignores ticks it doesn't expect. |
+// |
+// It supports "absolute" and "relative" schedules, see 'schedule' package for |
+// definitions. |
+type State struct { |
+ // Enabled is true if the cron machine is running. |
+ // |
+ // A disabled cron machine ignores all events except 'Enable'. |
+ Enabled bool |
+ |
+ // LastRewind is a time when the cron machine was restarted last time. |
+ // |
+ // For relative schedules, it's a time RewindIfNecessary() was called. For |
+ // absolute schedules it is last time invocation happened (cron machines on |
+ // absolute schedules auto-rewind themselves). |
+ LastRewind time.Time |
+ |
+ // LastTick is last emitted tick request (or empty struct). |
+ // |
+ // It may be scheduled for "distant future" for paused cron machines. |
+ LastTick TickLaterAction |
+} |
+ |
+// IsSuspended returns true if the cron machine is not waiting for a tick. |
+// |
+// This happens for paused cron machines (they technically are scheduled for |
+// a tick in a distant future) and for cron machines on relative schedule that |
+// wait for 'RewindIfNecessary' to be called to start ticking again. |
+// |
+// A disabled cron machine is also considered suspended. |
+func (s *State) IsSuspended() bool { |
+ return !s.Enabled || s.LastTick.When.IsZero() || s.LastTick.When == schedule.DistantFuture |
+} |
+ |
+//////////////////////////////////////////////////////////////////////////////// |
+ |
+// Action is a particular action to perform when switching the state. |
+// |
+// Can be type cast to some concrete *Action struct. Intended to be handled by |
+// whoever hosts the cron machine. |
+type Action interface { |
+ IsAction() bool |
+} |
+ |
+// TickLaterAction schedules an OnTimerTick call at given moment in time. |
+// |
+// TickNonce is used by cron machine to skip canceled or repeated ticks. |
+type TickLaterAction struct { |
+ When time.Time |
+ TickNonce int64 |
+} |
+ |
+// IsAction makes TickLaterAction implement Action interface. |
+func (a TickLaterAction) IsAction() bool { return true } |
+ |
+// StartInvocationAction is emitted when the scheduled moment comes. |
+// |
+// A handler is expected to call RewindIfNecessary() at some later time to |
+// restart the cron machine if it's running on a relative schedule (e.g. "with |
+// 10 sec interval"). Cron machines on relative schedules are "one shot". They |
+// need to be rewound to start counting time again. |
+// |
+// Cron machines on absolute schedules (regular crons, like "at 12 AM every |
+// day") don't need rewinding, they'll start counting time until next invocation |
+// automatically. Calling RewindIfNecessary() for them won't hurt though, it |
+// will be noop. |
+type StartInvocationAction struct{} |
+ |
+// IsAction makes StartInvocationAction implement Action interface. |
+func (a StartInvocationAction) IsAction() bool { return true } |
+ |
+//////////////////////////////////////////////////////////////////////////////// |
+ |
+// Machine advances the state of the cron machine. |
+// |
+// It gracefully handles various kinds of external events (like pauses and |
+// schedule changes) and emits actions that's supposed to handled by whoever |
+// hosts it. |
+type Machine struct { |
+ // Inputs. |
+ Now time.Time // current time |
+ Schedule *schedule.Schedule // knows when to emit invocation action |
+ Nonce func() int64 // produces nonces on demand |
+ |
+ // Mutated. |
+ State State // state of the cron machine, mutated by its methods |
+ Actions []Action // all emitted actions (if any) |
+} |
+ |
+// Enable makes the cron machine start counting time. |
+// |
+// Does nothing if already enabled. |
+func (m *Machine) Enable() { |
+ if !m.State.Enabled { |
+ m.State = State{Enabled: true, LastRewind: m.Now} // reset state |
+ m.scheduleTick() |
+ } |
+} |
+ |
+// Disable stops any pending timer ticks, resets state. |
+// |
+// The cron machine will ignore any events until Enable is called to turn it on. |
+func (m *Machine) Disable() { |
+ m.State = State{Enabled: false} |
+} |
+ |
+// RewindIfNecessary is called to restart the cron after it has fired the |
+// invocation action. |
+// |
+// Does nothing if the cron is disabled or already ticking. |
+func (m *Machine) RewindIfNecessary() { |
+ if m.State.Enabled && m.State.LastTick.When.IsZero() { |
+ m.State.LastRewind = m.Now |
+ m.scheduleTick() |
+ } |
+} |
+ |
+// OnScheduleChange happens when cron's schedule changes. |
+// |
+// In particular, it handles switches between absolute and relative schedules. |
+func (m *Machine) OnScheduleChange() { |
+ // Do not touch timers on disabled cron machines. |
+ if !m.State.Enabled { |
+ return |
+ } |
+ |
+ // The following condition is true for cron machines on a relative schedule |
+ // that have already "fired", and currently wait for manual RewindIfNecessary |
+ // call to start ticking again. When such cron machines switch to an absolute |
+ // schedule, we need to rewind them right away (since machines on absolute |
+ // schedules always tick!). If the new schedule is also relative, do nothing: |
+ // RewindIfNecessary() should be called manually by the host at some later |
+ // time (as usual for relative schedules). |
+ if m.State.LastTick.When.IsZero() { |
+ if m.Schedule.IsAbsolute() { |
+ m.RewindIfNecessary() |
+ } |
+ } else { |
+ // In this branch, the cron machine has a timer tick scheduled. It means it |
+ // is either in a relative or absolute schedule, and this schedule may have |
+ // changed, so we may need to move the tick to reflect the change. Note that |
+ // we are not resetting LastRewind here, since we want the new schedule to |
+ // take into account real last RewindIfNecessary call. For example, if the |
+ // last rewind happened at moment X, current time is Now, and the new |
+ // schedule is "with 10s interval", we want the tick to happen at "X+10", |
+ // not "Now+10". |
+ m.scheduleTick() |
+ } |
+} |
+ |
+// OnTimerTick happens when a scheduled timer tick (added with TickLaterAction) |
+// occurs. |
+// |
+// Returns an error if the tick happened too soon. |
+func (m *Machine) OnTimerTick(tickNonce int64) error { |
+ // Silently skip unexpected, late or canceled ticks. This is fine. |
+ switch { |
+ case m.State.IsSuspended(): |
+ return nil |
+ case m.State.LastTick.TickNonce != tickNonce: |
+ return nil |
+ } |
+ |
+ // Report error (to trigger a retry) if the tick happened unexpectedly soon. |
+ // Absolute schedules may report "wrong" next tick time if asked for a next |
+ // tick before previous one has happened. |
+ if delay := m.Now.Sub(m.State.LastTick.When); delay < 0 { |
+ return fmt.Errorf("tick happened %.1f sec before it was expected", -delay.Seconds()) |
+ } |
+ |
+ // The scheduled time has come! |
+ m.Actions = append(m.Actions, StartInvocationAction{}) |
+ m.State.LastTick = TickLaterAction{} |
+ |
+ // Start waiting for a new tick right away if on an absolute schedule or just |
+ // keep the tick state clear for relative schedules: new tick will be set when |
+ // RewindIfNecessary() is manually called by whoever handles the cron. |
+ if m.Schedule.IsAbsolute() { |
+ m.RewindIfNecessary() |
+ } |
+ |
+ return nil |
+} |
+ |
+// scheduleTick emits TickLaterAction action according to the schedule, current |
+// time, and last time RewindIfNecessary was called. |
+// |
+// Does nothing if such tick has already been scheduled. |
+func (m *Machine) scheduleTick() { |
+ nextTickTime := m.Schedule.Next(m.Now, m.State.LastRewind) |
+ if nextTickTime != m.State.LastTick.When { |
+ m.State.LastTick = TickLaterAction{ |
+ When: nextTickTime, |
+ TickNonce: m.Nonce(), |
+ } |
+ if nextTickTime != schedule.DistantFuture { |
+ m.Actions = append(m.Actions, m.State.LastTick) |
+ } |
+ } |
+} |