| Index: scheduler/appengine/engine/cron/machine.go
|
| diff --git a/scheduler/appengine/engine/cron/machine.go b/scheduler/appengine/engine/cron/machine.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..792b7b002e5e0364e3efd391e5bc5c0ad331419a
|
| --- /dev/null
|
| +++ b/scheduler/appengine/engine/cron/machine.go
|
| @@ -0,0 +1,240 @@
|
| +// Copyright 2017 The LUCI Authors.
|
| +//
|
| +// Licensed under the Apache License, Version 2.0 (the "License");
|
| +// you may not use this file except in compliance with the License.
|
| +// You may obtain a copy of the License at
|
| +//
|
| +// http://www.apache.org/licenses/LICENSE-2.0
|
| +//
|
| +// Unless required by applicable law or agreed to in writing, software
|
| +// distributed under the License is distributed on an "AS IS" BASIS,
|
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +// See the License for the specific language governing permissions and
|
| +// limitations under the License.
|
| +
|
| +package cron
|
| +
|
| +import (
|
| + "fmt"
|
| + "time"
|
| +
|
| + "github.com/luci/luci-go/scheduler/appengine/schedule"
|
| +)
|
| +
|
| +// State stores serializable state of the cron machine.
|
| +//
|
| +// Whoever hosts the cron machine is supposed to store this state in some
|
| +// persistent store between events. It's mutated by Machine. So the usage
|
| +// pattern is:
|
| +// * Deserialize State, construct Machine instance with it.
|
| +// * Invoke some Machine method (e.g Enable()) to advance the state.
|
| +// * Acknowledge all actions emitted by the machine (see Machine.Actions).
|
| +// * Serialize the mutated state (available in Machine.State).
|
| +//
|
| +// If appropriate, all of the above should be done in a transaction.
|
| +//
|
| +// Machine assumes that whoever hosts it handles TickLaterAction with following
|
| +// semantics:
|
| +// * A scheduled tick can't be "unscheduled".
|
| +// * A scheduled tick may come more than one time.
|
| +//
|
| +// So the machine just ignores ticks it doesn't expect.
|
| +//
|
| +// It supports "absolute" and "relative" schedules, see 'schedule' package for
|
| +// definitions.
|
| +type State struct {
|
| + // Enabled is true if the cron machine is running.
|
| + //
|
| + // A disabled cron machine ignores all events except 'Enable'.
|
| + Enabled bool
|
| +
|
| + // LastRewind is a time when the cron machine was restarted last time.
|
| + //
|
| + // For relative schedules, it's a time RewindIfNecessary() was called. For
|
| + // absolute schedules it is last time invocation happened (cron machines on
|
| + // absolute schedules auto-rewind themselves).
|
| + LastRewind time.Time
|
| +
|
| + // LastTick is last emitted tick request (or empty struct).
|
| + //
|
| + // It may be scheduled for "distant future" for paused cron machines.
|
| + LastTick TickLaterAction
|
| +}
|
| +
|
| +// IsSuspended returns true if the cron machine is not waiting for a tick.
|
| +//
|
| +// This happens for paused cron machines (they technically are scheduled for
|
| +// a tick in a distant future) and for cron machines on relative schedule that
|
| +// wait for 'RewindIfNecessary' to be called to start ticking again.
|
| +//
|
| +// A disabled cron machine is also considered suspended.
|
| +func (s *State) IsSuspended() bool {
|
| + return !s.Enabled || s.LastTick.When.IsZero() || s.LastTick.When == schedule.DistantFuture
|
| +}
|
| +
|
| +////////////////////////////////////////////////////////////////////////////////
|
| +
|
| +// Action is a particular action to perform when switching the state.
|
| +//
|
| +// Can be type cast to some concrete *Action struct. Intended to be handled by
|
| +// whoever hosts the cron machine.
|
| +type Action interface {
|
| + IsAction() bool
|
| +}
|
| +
|
| +// TickLaterAction schedules an OnTimerTick call at given moment in time.
|
| +//
|
| +// TickNonce is used by cron machine to skip canceled or repeated ticks.
|
| +type TickLaterAction struct {
|
| + When time.Time
|
| + TickNonce int64
|
| +}
|
| +
|
| +// IsAction makes TickLaterAction implement Action interface.
|
| +func (a TickLaterAction) IsAction() bool { return true }
|
| +
|
| +// StartInvocationAction is emitted when the scheduled moment comes.
|
| +//
|
| +// A handler is expected to call RewindIfNecessary() at some later time to
|
| +// restart the cron machine if it's running on a relative schedule (e.g. "with
|
| +// 10 sec interval"). Cron machines on relative schedules are "one shot". They
|
| +// need to be rewound to start counting time again.
|
| +//
|
| +// Cron machines on absolute schedules (regular crons, like "at 12 AM every
|
| +// day") don't need rewinding, they'll start counting time until next invocation
|
| +// automatically. Calling RewindIfNecessary() for them won't hurt though, it
|
| +// will be noop.
|
| +type StartInvocationAction struct{}
|
| +
|
| +// IsAction makes StartInvocationAction implement Action interface.
|
| +func (a StartInvocationAction) IsAction() bool { return true }
|
| +
|
| +////////////////////////////////////////////////////////////////////////////////
|
| +
|
| +// Machine advances the state of the cron machine.
|
| +//
|
| +// It gracefully handles various kinds of external events (like pauses and
|
| +// schedule changes) and emits actions that's supposed to handled by whoever
|
| +// hosts it.
|
| +type Machine struct {
|
| + // Inputs.
|
| + Now time.Time // current time
|
| + Schedule *schedule.Schedule // knows when to emit invocation action
|
| + Nonce func() int64 // produces nonces on demand
|
| +
|
| + // Mutated.
|
| + State State // state of the cron machine, mutated by its methods
|
| + Actions []Action // all emitted actions (if any)
|
| +}
|
| +
|
| +// Enable makes the cron machine start counting time.
|
| +//
|
| +// Does nothing if already enabled.
|
| +func (m *Machine) Enable() {
|
| + if !m.State.Enabled {
|
| + m.State = State{Enabled: true, LastRewind: m.Now} // reset state
|
| + m.scheduleTick()
|
| + }
|
| +}
|
| +
|
| +// Disable stops any pending timer ticks, resets state.
|
| +//
|
| +// The cron machine will ignore any events until Enable is called to turn it on.
|
| +func (m *Machine) Disable() {
|
| + m.State = State{Enabled: false}
|
| +}
|
| +
|
| +// RewindIfNecessary is called to restart the cron after it has fired the
|
| +// invocation action.
|
| +//
|
| +// Does nothing if the cron is disabled or already ticking.
|
| +func (m *Machine) RewindIfNecessary() {
|
| + if m.State.Enabled && m.State.LastTick.When.IsZero() {
|
| + m.State.LastRewind = m.Now
|
| + m.scheduleTick()
|
| + }
|
| +}
|
| +
|
| +// OnScheduleChange happens when cron's schedule changes.
|
| +//
|
| +// In particular, it handles switches between absolute and relative schedules.
|
| +func (m *Machine) OnScheduleChange() {
|
| + // Do not touch timers on disabled cron machines.
|
| + if !m.State.Enabled {
|
| + return
|
| + }
|
| +
|
| + // The following condition is true for cron machines on a relative schedule
|
| + // that have already "fired", and currently wait for manual RewindIfNecessary
|
| + // call to start ticking again. When such cron machines switch to an absolute
|
| + // schedule, we need to rewind them right away (since machines on absolute
|
| + // schedules always tick!). If the new schedule is also relative, do nothing:
|
| + // RewindIfNecessary() should be called manually by the host at some later
|
| + // time (as usual for relative schedules).
|
| + if m.State.LastTick.When.IsZero() {
|
| + if m.Schedule.IsAbsolute() {
|
| + m.RewindIfNecessary()
|
| + }
|
| + } else {
|
| + // In this branch, the cron machine has a timer tick scheduled. It means it
|
| + // is either in a relative or absolute schedule, and this schedule may have
|
| + // changed, so we may need to move the tick to reflect the change. Note that
|
| + // we are not resetting LastRewind here, since we want the new schedule to
|
| + // take into account real last RewindIfNecessary call. For example, if the
|
| + // last rewind happened at moment X, current time is Now, and the new
|
| + // schedule is "with 10s interval", we want the tick to happen at "X+10",
|
| + // not "Now+10".
|
| + m.scheduleTick()
|
| + }
|
| +}
|
| +
|
| +// OnTimerTick happens when a scheduled timer tick (added with TickLaterAction)
|
| +// occurs.
|
| +//
|
| +// Returns an error if the tick happened too soon.
|
| +func (m *Machine) OnTimerTick(tickNonce int64) error {
|
| + // Silently skip unexpected, late or canceled ticks. This is fine.
|
| + switch {
|
| + case m.State.IsSuspended():
|
| + return nil
|
| + case m.State.LastTick.TickNonce != tickNonce:
|
| + return nil
|
| + }
|
| +
|
| + // Report error (to trigger a retry) if the tick happened unexpectedly soon.
|
| + // Absolute schedules may report "wrong" next tick time if asked for a next
|
| + // tick before previous one has happened.
|
| + if delay := m.Now.Sub(m.State.LastTick.When); delay < 0 {
|
| + return fmt.Errorf("tick happened %.1f sec before it was expected", -delay.Seconds())
|
| + }
|
| +
|
| + // The scheduled time has come!
|
| + m.Actions = append(m.Actions, StartInvocationAction{})
|
| + m.State.LastTick = TickLaterAction{}
|
| +
|
| + // Start waiting for a new tick right away if on an absolute schedule or just
|
| + // keep the tick state clear for relative schedules: new tick will be set when
|
| + // RewindIfNecessary() is manually called by whoever handles the cron.
|
| + if m.Schedule.IsAbsolute() {
|
| + m.RewindIfNecessary()
|
| + }
|
| +
|
| + return nil
|
| +}
|
| +
|
| +// scheduleTick emits TickLaterAction action according to the schedule, current
|
| +// time, and last time RewindIfNecessary was called.
|
| +//
|
| +// Does nothing if such tick has already been scheduled.
|
| +func (m *Machine) scheduleTick() {
|
| + nextTickTime := m.Schedule.Next(m.Now, m.State.LastRewind)
|
| + if nextTickTime != m.State.LastTick.When {
|
| + m.State.LastTick = TickLaterAction{
|
| + When: nextTickTime,
|
| + TickNonce: m.Nonce(),
|
| + }
|
| + if nextTickTime != schedule.DistantFuture {
|
| + m.Actions = append(m.Actions, m.State.LastTick)
|
| + }
|
| + }
|
| +}
|
|
|