Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(463)

Unified Diff: logdog/appengine/coordinator/tasks/archival.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: incorporate namespace in task name Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: logdog/appengine/coordinator/tasks/archival.go
diff --git a/logdog/appengine/coordinator/tasks/archival.go b/logdog/appengine/coordinator/tasks/archival.go
new file mode 100644
index 0000000000000000000000000000000000000000..8cee444f0ae744336376cffe12d7df40e0be27f2
--- /dev/null
+++ b/logdog/appengine/coordinator/tasks/archival.go
@@ -0,0 +1,180 @@
+// Copyright 2017 The LUCI Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tasks
+
+import (
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
+ "github.com/luci/luci-go/logdog/appengine/coordinator"
+
+ "github.com/luci/luci-go/appengine/tq"
+ "github.com/luci/luci-go/common/errors"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/proto/google"
+ "github.com/luci/luci-go/common/retry/transient"
+
+ ds "github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/info"
+
+ "github.com/golang/protobuf/proto"
+ "golang.org/x/net/context"
+)
+
+// CreateArchivalTask adds a task to the task queue to initiate
+// archival on an stream. The task will be delayed by "delay".
+func CreateArchivalTask(c context.Context, id coordinator.HashID, tag logdog.ArchiveDispatchTask_Tag,
+ delay time.Duration, params *coordinator.ArchivalParams) error {
+
+ task := makeArchivalTask(c, id, tag)
+ task.Payload = &logdog.ArchiveDispatchTask{
+ Id: string(id),
+ Tag: tag,
+ SettleDelay: google.NewDuration(params.SettleDelay),
+ CompletePeriod: google.NewDuration(params.CompletePeriod),
+ }
+ task.Delay = delay
+
+ if err := taskDispatcher.AddTask(c, task); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "taskName": task.Name(),
+ }.Errorf(c, "Failed to add task to task queue.")
Ryan Tseng 2017/08/03 19:30:04 This will cause the same error to log multiple tim
dnj 2017/08/03 21:05:47 Done.
+ return err
+ }
+
+ log.Debugf(c, "Successfully created archival task: %q", task.Name())
+ return nil
+}
+
+// ClearArchiveIncompleteExpiredTask deletes the enqueued task for the specified
+// hash ID.
+//
+// This task will be created by CreateArchiveIncompleteTask.
+func ClearArchiveIncompleteExpiredTask(c context.Context, id coordinator.HashID) error {
Ryan Tseng 2017/08/03 19:30:04 DeleteAllExpiredTasks()?
dnj 2017/08/03 21:05:46 Nope, this clears a single archival tasks for the
+ task := makeArchivalTask(c, id, logdog.ArchiveDispatchTask_EXPIRED)
+ log.Debugf(c, "Deleting archival task: %q", task.Name())
+ if err := taskDispatcher.DeleteTask(c, task); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "taskName": task.Name(),
+ }.Errorf(c, "Failed to delete expired archival task.")
+ return err
+ }
+
+ log.Debugf(c, "Successfully removed EXPIRED archival task: %q", task.Name())
+ return nil
+}
+
+// handleArchiveDispatchTask is a tq.Handler for an ArchiveDispatchTask.
Ryan Tseng 2017/08/03 19:30:04 What does it do?
dnj 2017/08/03 21:05:47 Updated comment.
+func handleArchiveDispatchTask(c context.Context, payload proto.Message, execCount int) error {
+ adt, ok := payload.(*logdog.ArchiveDispatchTask)
+ if !ok {
+ return errors.Reason("unexpected message type %T", payload).Err()
+ }
+
+ log.Infof(c, "Handling archival for %q task (#%d) in namespace %q: %q",
+ adt.Tag, execCount, info.GetNamespace(c), adt.Id)
+
+ // Get our archival publisher.
+ svc := coordinator.GetServices(c)
+ ap, err := svc.ArchivalPublisher(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to get archival publisher.")
+ return transient.Tag.Apply(err)
+ }
+ defer func() {
+ if err := ap.Close(); err != nil {
+ log.WithError(err).Warningf(c, "Failed to close archival publisher.")
+ }
+ }()
+
+ // Get the log stream.
+ stream := coordinator.LogStream{ID: coordinator.HashID(adt.Id)}
+ state := stream.State(c)
+
+ err = ds.RunInTransaction(c, func(c context.Context) error {
+ if err := ds.Get(c, state); err != nil {
+ if err == ds.ErrNoSuchEntity {
+ log.Warningf(c, "Log stream no longer exists.")
+ return nil
+ }
+
+ log.WithError(err).Errorf(c, "Failed to load archival log stream.")
Ryan Tseng 2017/08/03 19:30:04 Use errors.Annotate() instead for this transaction
dnj 2017/08/03 21:05:46 Done.
+ return err
+ }
+
+ params := coordinator.ArchivalParams{
+ RequestID: info.RequestID(c),
+ SettleDelay: google.DurationFromProto(adt.SettleDelay),
+ CompletePeriod: google.DurationFromProto(adt.CompletePeriod),
+ }
+ if err = params.PublishTask(c, ap, state); err != nil {
+ if err == coordinator.ErrArchiveTasked {
+ log.Warningf(c, "Archival already tasked, skipping.")
+ return nil
+ }
+
+ log.WithError(err).Errorf(c, "Failed to publish archival task.")
+ return err
+ }
+
+ if err := ds.Put(c, state); err != nil {
+ log.WithError(err).Errorf(c, "Failed to update datastore.")
+ return err
+ }
+
+ return nil
+ }, nil)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to publish archival task.")
+ return transient.Tag.Apply(err)
+ }
+
+ log.Debugf(c, "Successfully published cleanup archival task.")
+ return nil
+}
+
+func makeArchivalTask(c context.Context, id coordinator.HashID, tag logdog.ArchiveDispatchTask_Tag) *tq.Task {
+ name := fmt.Sprintf("%s_%s", id, tag.String())
+ return &tq.Task{
+ NamePrefix: name,
+ DeduplicationKey: info.GetNamespace(c),
+ Title: name,
+ }
+}
+
+// errCheckReader is an io.Reader wrapper which tracks whether or not the
Ryan Tseng 2017/08/03 19:30:04 Is this used?
dnj 2017/08/03 21:05:47 No, now that I'm using "tq". Is deleted.
+// underlying io.Reader encountered an error.
+//
+// We use this because we stream our Reader through a json.Decoder, which can
+// also return an error (on invalid JSON), so we can't outright tell whether the
+// error is in I/O (transient) or due to JSON content (non-transient).
+//
+// This lets us differentiate.
+type errCheckReader struct {
+ inner io.Reader
+ wasError bool
+}
+
+func (r *errCheckReader) Read(p []byte) (int, error) {
+ v, err := r.inner.Read(p)
+ if err != nil {
+ r.wasError = true
+ }
+ return v, err
+}

Powered by Google App Engine
This is Rietveld 408576698