Chromium Code Reviews| Index: logdog/appengine/coordinator/backend/archival.go |
| diff --git a/logdog/appengine/coordinator/backend/archival.go b/logdog/appengine/coordinator/backend/archival.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..50eede810645586aab6cad9d6fbdcecda345a0eb |
| --- /dev/null |
| +++ b/logdog/appengine/coordinator/backend/archival.go |
| @@ -0,0 +1,239 @@ |
| +// Copyright 2017 The LUCI Authors. |
| +// |
| +// Licensed under the Apache License, Version 2.0 (the "License"); |
| +// you may not use this file except in compliance with the License. |
| +// You may obtain a copy of the License at |
| +// |
| +// http://www.apache.org/licenses/LICENSE-2.0 |
| +// |
| +// Unless required by applicable law or agreed to in writing, software |
| +// distributed under the License is distributed on an "AS IS" BASIS, |
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| +// See the License for the specific language governing permissions and |
| +// limitations under the License. |
| + |
| +package backend |
| + |
| +import ( |
| + "encoding/json" |
| + "fmt" |
| + "io" |
| + "net/http" |
| + "time" |
| + |
| + "github.com/luci/luci-go/logdog/appengine/coordinator" |
| + |
| + log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/server/router" |
| + |
| + ds "github.com/luci/gae/service/datastore" |
| + "github.com/luci/gae/service/info" |
| + "github.com/luci/gae/service/taskqueue" |
| + |
| + "golang.org/x/net/context" |
| +) |
| + |
| +type ArchivalTaskTag string |
| + |
| +const ( |
| + // ArchiveExpired is a tag that is applied to the initially-scheduled |
| + // emergency cleanup task that will catch streams that never terminate. |
| + ArchiveExpired ArchivalTaskTag = "expired" |
| + |
| + // ArchiveTerminated is a tag that is applied to the standard terminal cleanup |
| + // task. |
| + ArchiveTerminated ArchivalTaskTag = "terminated" |
| +) |
| + |
| +// archivalTaskParams is the data handed in the POST body to the |
| +// handleArchivalTask handler. |
| +type archivalTaskParams struct { |
| + // ID is the hash ID of the LogStream whose archive task is being created. |
| + // |
| + // Note that the task will apply to the LogStreamState, not the stream |
| + // entity itself. |
| + ID coordinator.HashID `json:"id"` |
| + |
| + // Tag is this task's tag. |
| + Tag ArchivalTaskTag `json:"tag"` |
| + |
| + // SettleDelay is the settle delay (see ArchivalParams). |
| + SettleDelay time.Duration `json:"settleDelay,omitempty"` |
| + // CompletePeriod is the complete period to use (see ArchivalParams). |
| + CompletePeriod time.Duration `json:"completePeriod,omitempty"` |
| +} |
| + |
| +func wrapTask(ctx *router.Context, fn func() bool) { |
| + if !fn() { |
| + log.Errorf(ctx.Context, "Task reported transient failure, rescheduling.") |
| + ctx.Writer.WriteHeader(http.StatusInternalServerError) |
| + return |
| + } |
| +} |
| + |
| +// CreateArchivalTask adds a task to the task queue to initiate |
| +// archival on an stream. The task will be delayed by "delay". |
| +// |
| +// The resulting task will be named, and can be cancelled by |
| +// ClearArchivalTask. |
| +func CreateArchivalTask(c context.Context, id coordinator.HashID, tag ArchivalTaskTag, |
| + delay time.Duration, params *coordinator.ArchivalParams) error { |
| + |
| + atp := archivalTaskParams{ |
| + ID: id, |
| + Tag: tag, |
| + SettleDelay: params.SettleDelay, |
| + CompletePeriod: params.CompletePeriod, |
| + } |
| + payload, err := json.Marshal(&atp) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to generate payload JSON.") |
| + return err |
| + } |
| + |
| + taskName := archivalTaskName(id, tag) |
| + task := taskqueue.NewPOSTTask("/internal/tasks/archival", nil) |
| + task.Name = taskName |
| + task.Payload = payload |
| + task.Delay = delay |
| + if err := taskqueue.Add(c, ArchivalTaskQueue, task); err != nil { |
| + if err == taskqueue.ErrTaskAlreadyAdded { |
| + log.Warningf(c, "Task %q was already added; skipping.", task.Name) |
| + return nil |
| + } |
| + |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "taskName": task.Name, |
| + }.Errorf(c, "Failed to add task to task queue.") |
| + return err |
| + } |
| + |
| + log.Debugf(c, "Successfully created archival task: %q", taskName) |
| + return nil |
| +} |
| + |
| +// ClearArchiveIncompleteTask deletes the enqueued task for the specified |
| +// hash ID. |
| +// |
| +// This task will be created by CreateArchiveIncompleteTask. |
| +func ClearArchiveIncompleteExpiredTask(c context.Context, id coordinator.HashID) error { |
| + task := taskqueue.Task{Name: archivalTaskName(id, ArchiveExpired)} |
| + log.Debugf(c, "Deleting archival task: %q", task.Name) |
| + if err := taskqueue.Delete(c, ArchivalTaskQueue, &task); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "taskName": task.Name, |
| + }.Errorf(c, "Failed to delete expired archival task.") |
| + return err |
| + } |
| + return nil |
| +} |
| + |
| +// handleArchivalTask is an HTTP handler for the archival task. |
| +func handleArchivalTask(ctx *router.Context) { |
| + wrapTask(ctx, func() bool { |
| + c := ctx.Context |
|
dnj
2017/08/03 03:45:55
Note: This logic is largely a clone of the Tumble
|
| + |
| + // Read the request body. |
| + if ctx.Request.Body == nil { |
| + log.Errorf(c, "Request has no body.") |
| + return true // Consume this task. |
| + } |
| + defer ctx.Request.Body.Close() |
| + |
| + // Load the JSON body from the request. |
| + var atp archivalTaskParams |
| + ecr := errCheckReader{ctx.Request.Body, false} |
| + if err := json.NewDecoder(&ecr).Decode(&atp); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to read requet body.") |
| + return !ecr.wasError // If there was an I/O error, return transient. |
| + } |
| + |
| + log.Infof(c, "Scheduling archival for %q task in namespace %q: %q", |
| + atp.Tag, info.GetNamespace(c), atp.ID) |
| + |
| + // Get our archival publisher. |
| + svc := coordinator.GetServices(c) |
| + ap, err := svc.ArchivalPublisher(c) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to get archival publisher.") |
| + return false // Transient |
| + } |
| + defer func() { |
| + if err := ap.Close(); err != nil { |
| + log.WithError(err).Warningf(c, "Failed to close archival publisher.") |
| + } |
| + }() |
| + |
| + // Get the log stream. |
| + stream := coordinator.LogStream{ID: atp.ID} |
| + state := stream.State(c) |
| + |
| + err = ds.RunInTransaction(c, func(c context.Context) error { |
| + if err := ds.Get(c, state); err != nil { |
| + if err == ds.ErrNoSuchEntity { |
| + log.Warningf(c, "Log stream no longer exists.") |
| + return nil |
| + } |
| + |
| + log.WithError(err).Errorf(c, "Failed to load archival log stream.") |
| + return err |
| + } |
| + |
| + params := coordinator.ArchivalParams{ |
| + RequestID: info.RequestID(c), |
| + SettleDelay: atp.SettleDelay, |
| + CompletePeriod: atp.CompletePeriod, |
| + } |
| + if err = params.PublishTask(c, ap, state); err != nil { |
| + if err == coordinator.ErrArchiveTasked { |
| + log.Warningf(c, "Archival already tasked, skipping.") |
| + return nil |
| + } |
| + |
| + log.WithError(err).Errorf(c, "Failed to publish archival task.") |
| + return err |
| + } |
| + |
| + if err := ds.Put(c, state); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to update datastore.") |
| + return err |
| + } |
| + |
| + return nil |
| + }, nil) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to publish archival task.") |
| + return false // Transient. |
| + } |
| + |
| + log.Debugf(c, "Successfully published cleanup archival task.") |
| + return true |
| + }) |
| +} |
| + |
| +func archivalTaskName(id coordinator.HashID, tag ArchivalTaskTag) string { |
| + return fmt.Sprintf("archive_%s_%s", tag, id) |
| +} |
| + |
| +// errCheckReader is an io.Reader wrapper which tracks whether or not the |
| +// underlying io.Reader encountered an error. |
| +// |
| +// We use this because we stream our Reader through a json.Decoder, which can |
| +// also return an error (on invalid JSON), so we can't outright tell whether the |
| +// error is in I/O (transient) or due to JSON content (non-transient). |
| +// |
| +// This lets us differentiate. |
| +type errCheckReader struct { |
| + inner io.Reader |
| + wasError bool |
| +} |
| + |
| +func (r *errCheckReader) Read(p []byte) (int, error) { |
| + v, err := r.inner.Read(p) |
| + if err != nil { |
| + r.wasError = true |
| + } |
| + return v, err |
| +} |