Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(119)

Unified Diff: logdog/appengine/coordinator/backend/archival.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: logdog/appengine/coordinator/backend/archival.go
diff --git a/logdog/appengine/coordinator/backend/archival.go b/logdog/appengine/coordinator/backend/archival.go
new file mode 100644
index 0000000000000000000000000000000000000000..50eede810645586aab6cad9d6fbdcecda345a0eb
--- /dev/null
+++ b/logdog/appengine/coordinator/backend/archival.go
@@ -0,0 +1,239 @@
+// Copyright 2017 The LUCI Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "time"
+
+ "github.com/luci/luci-go/logdog/appengine/coordinator"
+
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/server/router"
+
+ ds "github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/info"
+ "github.com/luci/gae/service/taskqueue"
+
+ "golang.org/x/net/context"
+)
+
+type ArchivalTaskTag string
+
+const (
+ // ArchiveExpired is a tag that is applied to the initially-scheduled
+ // emergency cleanup task that will catch streams that never terminate.
+ ArchiveExpired ArchivalTaskTag = "expired"
+
+ // ArchiveTerminated is a tag that is applied to the standard terminal cleanup
+ // task.
+ ArchiveTerminated ArchivalTaskTag = "terminated"
+)
+
+// archivalTaskParams is the data handed in the POST body to the
+// handleArchivalTask handler.
+type archivalTaskParams struct {
+ // ID is the hash ID of the LogStream whose archive task is being created.
+ //
+ // Note that the task will apply to the LogStreamState, not the stream
+ // entity itself.
+ ID coordinator.HashID `json:"id"`
+
+ // Tag is this task's tag.
+ Tag ArchivalTaskTag `json:"tag"`
+
+ // SettleDelay is the settle delay (see ArchivalParams).
+ SettleDelay time.Duration `json:"settleDelay,omitempty"`
+ // CompletePeriod is the complete period to use (see ArchivalParams).
+ CompletePeriod time.Duration `json:"completePeriod,omitempty"`
+}
+
+func wrapTask(ctx *router.Context, fn func() bool) {
+ if !fn() {
+ log.Errorf(ctx.Context, "Task reported transient failure, rescheduling.")
+ ctx.Writer.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+}
+
+// CreateArchivalTask adds a task to the task queue to initiate
+// archival on an stream. The task will be delayed by "delay".
+//
+// The resulting task will be named, and can be cancelled by
+// ClearArchivalTask.
+func CreateArchivalTask(c context.Context, id coordinator.HashID, tag ArchivalTaskTag,
+ delay time.Duration, params *coordinator.ArchivalParams) error {
+
+ atp := archivalTaskParams{
+ ID: id,
+ Tag: tag,
+ SettleDelay: params.SettleDelay,
+ CompletePeriod: params.CompletePeriod,
+ }
+ payload, err := json.Marshal(&atp)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to generate payload JSON.")
+ return err
+ }
+
+ taskName := archivalTaskName(id, tag)
+ task := taskqueue.NewPOSTTask("/internal/tasks/archival", nil)
+ task.Name = taskName
+ task.Payload = payload
+ task.Delay = delay
+ if err := taskqueue.Add(c, ArchivalTaskQueue, task); err != nil {
+ if err == taskqueue.ErrTaskAlreadyAdded {
+ log.Warningf(c, "Task %q was already added; skipping.", task.Name)
+ return nil
+ }
+
+ log.Fields{
+ log.ErrorKey: err,
+ "taskName": task.Name,
+ }.Errorf(c, "Failed to add task to task queue.")
+ return err
+ }
+
+ log.Debugf(c, "Successfully created archival task: %q", taskName)
+ return nil
+}
+
+// ClearArchiveIncompleteTask deletes the enqueued task for the specified
+// hash ID.
+//
+// This task will be created by CreateArchiveIncompleteTask.
+func ClearArchiveIncompleteExpiredTask(c context.Context, id coordinator.HashID) error {
+ task := taskqueue.Task{Name: archivalTaskName(id, ArchiveExpired)}
+ log.Debugf(c, "Deleting archival task: %q", task.Name)
+ if err := taskqueue.Delete(c, ArchivalTaskQueue, &task); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "taskName": task.Name,
+ }.Errorf(c, "Failed to delete expired archival task.")
+ return err
+ }
+ return nil
+}
+
+// handleArchivalTask is an HTTP handler for the archival task.
+func handleArchivalTask(ctx *router.Context) {
+ wrapTask(ctx, func() bool {
+ c := ctx.Context
dnj 2017/08/03 03:45:55 Note: This logic is largely a clone of the Tumble
+
+ // Read the request body.
+ if ctx.Request.Body == nil {
+ log.Errorf(c, "Request has no body.")
+ return true // Consume this task.
+ }
+ defer ctx.Request.Body.Close()
+
+ // Load the JSON body from the request.
+ var atp archivalTaskParams
+ ecr := errCheckReader{ctx.Request.Body, false}
+ if err := json.NewDecoder(&ecr).Decode(&atp); err != nil {
+ log.WithError(err).Errorf(c, "Failed to read requet body.")
+ return !ecr.wasError // If there was an I/O error, return transient.
+ }
+
+ log.Infof(c, "Scheduling archival for %q task in namespace %q: %q",
+ atp.Tag, info.GetNamespace(c), atp.ID)
+
+ // Get our archival publisher.
+ svc := coordinator.GetServices(c)
+ ap, err := svc.ArchivalPublisher(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to get archival publisher.")
+ return false // Transient
+ }
+ defer func() {
+ if err := ap.Close(); err != nil {
+ log.WithError(err).Warningf(c, "Failed to close archival publisher.")
+ }
+ }()
+
+ // Get the log stream.
+ stream := coordinator.LogStream{ID: atp.ID}
+ state := stream.State(c)
+
+ err = ds.RunInTransaction(c, func(c context.Context) error {
+ if err := ds.Get(c, state); err != nil {
+ if err == ds.ErrNoSuchEntity {
+ log.Warningf(c, "Log stream no longer exists.")
+ return nil
+ }
+
+ log.WithError(err).Errorf(c, "Failed to load archival log stream.")
+ return err
+ }
+
+ params := coordinator.ArchivalParams{
+ RequestID: info.RequestID(c),
+ SettleDelay: atp.SettleDelay,
+ CompletePeriod: atp.CompletePeriod,
+ }
+ if err = params.PublishTask(c, ap, state); err != nil {
+ if err == coordinator.ErrArchiveTasked {
+ log.Warningf(c, "Archival already tasked, skipping.")
+ return nil
+ }
+
+ log.WithError(err).Errorf(c, "Failed to publish archival task.")
+ return err
+ }
+
+ if err := ds.Put(c, state); err != nil {
+ log.WithError(err).Errorf(c, "Failed to update datastore.")
+ return err
+ }
+
+ return nil
+ }, nil)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to publish archival task.")
+ return false // Transient.
+ }
+
+ log.Debugf(c, "Successfully published cleanup archival task.")
+ return true
+ })
+}
+
+func archivalTaskName(id coordinator.HashID, tag ArchivalTaskTag) string {
+ return fmt.Sprintf("archive_%s_%s", tag, id)
+}
+
+// errCheckReader is an io.Reader wrapper which tracks whether or not the
+// underlying io.Reader encountered an error.
+//
+// We use this because we stream our Reader through a json.Decoder, which can
+// also return an error (on invalid JSON), so we can't outright tell whether the
+// error is in I/O (transient) or due to JSON content (non-transient).
+//
+// This lets us differentiate.
+type errCheckReader struct {
+ inner io.Reader
+ wasError bool
+}
+
+func (r *errCheckReader) Read(p []byte) (int, error) {
+ v, err := r.inner.Read(p)
+ if err != nil {
+ r.wasError = true
+ }
+ return v, err
+}

Powered by Google App Engine
This is Rietveld 408576698