Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1293)

Unified Diff: scheduler/appengine/engine/cron/machine.go

Issue 2980943002: Add cron.Machine state machine. (Closed)
Patch Set: typos Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « scheduler/appengine/engine/cron/demo/main.go ('k') | scheduler/appengine/engine/cron/machine_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: scheduler/appengine/engine/cron/machine.go
diff --git a/scheduler/appengine/engine/cron/machine.go b/scheduler/appengine/engine/cron/machine.go
new file mode 100644
index 0000000000000000000000000000000000000000..792b7b002e5e0364e3efd391e5bc5c0ad331419a
--- /dev/null
+++ b/scheduler/appengine/engine/cron/machine.go
@@ -0,0 +1,240 @@
+// Copyright 2017 The LUCI Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cron
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/luci/luci-go/scheduler/appengine/schedule"
+)
+
+// State stores serializable state of the cron machine.
+//
+// Whoever hosts the cron machine is supposed to store this state in some
+// persistent store between events. It's mutated by Machine. So the usage
+// pattern is:
+// * Deserialize State, construct Machine instance with it.
+// * Invoke some Machine method (e.g Enable()) to advance the state.
+// * Acknowledge all actions emitted by the machine (see Machine.Actions).
+// * Serialize the mutated state (available in Machine.State).
+//
+// If appropriate, all of the above should be done in a transaction.
+//
+// Machine assumes that whoever hosts it handles TickLaterAction with following
+// semantics:
+// * A scheduled tick can't be "unscheduled".
+// * A scheduled tick may come more than one time.
+//
+// So the machine just ignores ticks it doesn't expect.
+//
+// It supports "absolute" and "relative" schedules, see 'schedule' package for
+// definitions.
+type State struct {
+ // Enabled is true if the cron machine is running.
+ //
+ // A disabled cron machine ignores all events except 'Enable'.
+ Enabled bool
+
+ // LastRewind is a time when the cron machine was restarted last time.
+ //
+ // For relative schedules, it's a time RewindIfNecessary() was called. For
+ // absolute schedules it is last time invocation happened (cron machines on
+ // absolute schedules auto-rewind themselves).
+ LastRewind time.Time
+
+ // LastTick is last emitted tick request (or empty struct).
+ //
+ // It may be scheduled for "distant future" for paused cron machines.
+ LastTick TickLaterAction
+}
+
+// IsSuspended returns true if the cron machine is not waiting for a tick.
+//
+// This happens for paused cron machines (they technically are scheduled for
+// a tick in a distant future) and for cron machines on relative schedule that
+// wait for 'RewindIfNecessary' to be called to start ticking again.
+//
+// A disabled cron machine is also considered suspended.
+func (s *State) IsSuspended() bool {
+ return !s.Enabled || s.LastTick.When.IsZero() || s.LastTick.When == schedule.DistantFuture
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Action is a particular action to perform when switching the state.
+//
+// Can be type cast to some concrete *Action struct. Intended to be handled by
+// whoever hosts the cron machine.
+type Action interface {
+ IsAction() bool
+}
+
+// TickLaterAction schedules an OnTimerTick call at given moment in time.
+//
+// TickNonce is used by cron machine to skip canceled or repeated ticks.
+type TickLaterAction struct {
+ When time.Time
+ TickNonce int64
+}
+
+// IsAction makes TickLaterAction implement Action interface.
+func (a TickLaterAction) IsAction() bool { return true }
+
+// StartInvocationAction is emitted when the scheduled moment comes.
+//
+// A handler is expected to call RewindIfNecessary() at some later time to
+// restart the cron machine if it's running on a relative schedule (e.g. "with
+// 10 sec interval"). Cron machines on relative schedules are "one shot". They
+// need to be rewound to start counting time again.
+//
+// Cron machines on absolute schedules (regular crons, like "at 12 AM every
+// day") don't need rewinding, they'll start counting time until next invocation
+// automatically. Calling RewindIfNecessary() for them won't hurt though, it
+// will be noop.
+type StartInvocationAction struct{}
+
+// IsAction makes StartInvocationAction implement Action interface.
+func (a StartInvocationAction) IsAction() bool { return true }
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Machine advances the state of the cron machine.
+//
+// It gracefully handles various kinds of external events (like pauses and
+// schedule changes) and emits actions that's supposed to handled by whoever
+// hosts it.
+type Machine struct {
+ // Inputs.
+ Now time.Time // current time
+ Schedule *schedule.Schedule // knows when to emit invocation action
+ Nonce func() int64 // produces nonces on demand
+
+ // Mutated.
+ State State // state of the cron machine, mutated by its methods
+ Actions []Action // all emitted actions (if any)
+}
+
+// Enable makes the cron machine start counting time.
+//
+// Does nothing if already enabled.
+func (m *Machine) Enable() {
+ if !m.State.Enabled {
+ m.State = State{Enabled: true, LastRewind: m.Now} // reset state
+ m.scheduleTick()
+ }
+}
+
+// Disable stops any pending timer ticks, resets state.
+//
+// The cron machine will ignore any events until Enable is called to turn it on.
+func (m *Machine) Disable() {
+ m.State = State{Enabled: false}
+}
+
+// RewindIfNecessary is called to restart the cron after it has fired the
+// invocation action.
+//
+// Does nothing if the cron is disabled or already ticking.
+func (m *Machine) RewindIfNecessary() {
+ if m.State.Enabled && m.State.LastTick.When.IsZero() {
+ m.State.LastRewind = m.Now
+ m.scheduleTick()
+ }
+}
+
+// OnScheduleChange happens when cron's schedule changes.
+//
+// In particular, it handles switches between absolute and relative schedules.
+func (m *Machine) OnScheduleChange() {
+ // Do not touch timers on disabled cron machines.
+ if !m.State.Enabled {
+ return
+ }
+
+ // The following condition is true for cron machines on a relative schedule
+ // that have already "fired", and currently wait for manual RewindIfNecessary
+ // call to start ticking again. When such cron machines switch to an absolute
+ // schedule, we need to rewind them right away (since machines on absolute
+ // schedules always tick!). If the new schedule is also relative, do nothing:
+ // RewindIfNecessary() should be called manually by the host at some later
+ // time (as usual for relative schedules).
+ if m.State.LastTick.When.IsZero() {
+ if m.Schedule.IsAbsolute() {
+ m.RewindIfNecessary()
+ }
+ } else {
+ // In this branch, the cron machine has a timer tick scheduled. It means it
+ // is either in a relative or absolute schedule, and this schedule may have
+ // changed, so we may need to move the tick to reflect the change. Note that
+ // we are not resetting LastRewind here, since we want the new schedule to
+ // take into account real last RewindIfNecessary call. For example, if the
+ // last rewind happened at moment X, current time is Now, and the new
+ // schedule is "with 10s interval", we want the tick to happen at "X+10",
+ // not "Now+10".
+ m.scheduleTick()
+ }
+}
+
+// OnTimerTick happens when a scheduled timer tick (added with TickLaterAction)
+// occurs.
+//
+// Returns an error if the tick happened too soon.
+func (m *Machine) OnTimerTick(tickNonce int64) error {
+ // Silently skip unexpected, late or canceled ticks. This is fine.
+ switch {
+ case m.State.IsSuspended():
+ return nil
+ case m.State.LastTick.TickNonce != tickNonce:
+ return nil
+ }
+
+ // Report error (to trigger a retry) if the tick happened unexpectedly soon.
+ // Absolute schedules may report "wrong" next tick time if asked for a next
+ // tick before previous one has happened.
+ if delay := m.Now.Sub(m.State.LastTick.When); delay < 0 {
+ return fmt.Errorf("tick happened %.1f sec before it was expected", -delay.Seconds())
+ }
+
+ // The scheduled time has come!
+ m.Actions = append(m.Actions, StartInvocationAction{})
+ m.State.LastTick = TickLaterAction{}
+
+ // Start waiting for a new tick right away if on an absolute schedule or just
+ // keep the tick state clear for relative schedules: new tick will be set when
+ // RewindIfNecessary() is manually called by whoever handles the cron.
+ if m.Schedule.IsAbsolute() {
+ m.RewindIfNecessary()
+ }
+
+ return nil
+}
+
+// scheduleTick emits TickLaterAction action according to the schedule, current
+// time, and last time RewindIfNecessary was called.
+//
+// Does nothing if such tick has already been scheduled.
+func (m *Machine) scheduleTick() {
+ nextTickTime := m.Schedule.Next(m.Now, m.State.LastRewind)
+ if nextTickTime != m.State.LastTick.When {
+ m.State.LastTick = TickLaterAction{
+ When: nextTickTime,
+ TickNonce: m.Nonce(),
+ }
+ if nextTickTime != schedule.DistantFuture {
+ m.Actions = append(m.Actions, m.State.LastTick)
+ }
+ }
+}
« no previous file with comments | « scheduler/appengine/engine/cron/demo/main.go ('k') | scheduler/appengine/engine/cron/machine_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698