Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(861)

Unified Diff: logdog/appengine/coordinator/tasks/archival.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « logdog/appengine/coordinator/service.go ('k') | logdog/appengine/coordinator/tasks/routes.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: logdog/appengine/coordinator/tasks/archival.go
diff --git a/logdog/appengine/coordinator/tasks/archival.go b/logdog/appengine/coordinator/tasks/archival.go
new file mode 100644
index 0000000000000000000000000000000000000000..e3eece405c6c989a0d83cbe8487bf5068aba75bd
--- /dev/null
+++ b/logdog/appengine/coordinator/tasks/archival.go
@@ -0,0 +1,201 @@
+// Copyright 2017 The LUCI Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tasks
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
+ "github.com/luci/luci-go/logdog/appengine/coordinator"
+
+ "github.com/luci/luci-go/appengine/tq"
+ "github.com/luci/luci-go/common/errors"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/proto/google"
+ "github.com/luci/luci-go/common/retry/transient"
+
+ ds "github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/info"
+
+ "github.com/golang/protobuf/proto"
+ "golang.org/x/net/context"
+)
+
+// emptyArchiveDispatchTask is a singleton empty ArchiveDispatchTask.
+//
+// This is used because tq.Task are typed to their Payload, so every task needs
+// a Payload. This is the default Payload value, and is overridden when the
+// Payload needs to actually be populated for publishing.
+var emptyArchiveDispatchTask = &logdog.ArchiveDispatchTask{}
+
+// CreateArchivalTask adds a task to the task queue to initiate
+// archival on an stream. The task will be delayed by "delay".
+func CreateArchivalTask(c context.Context, id coordinator.HashID, tag logdog.ArchiveDispatchTask_Tag,
+ delay time.Duration, params *coordinator.ArchivalParams) error {
+
+ task := makeArchivalTask(c, id, tag)
+ task.Payload = &logdog.ArchiveDispatchTask{
+ Id: string(id),
+ Tag: tag,
+ SettleDelay: google.NewDuration(params.SettleDelay),
+ CompletePeriod: google.NewDuration(params.CompletePeriod),
+ }
+ task.Delay = delay
+
+ // Add the task outside of a transaction, since it's a named task.
+ //
+ // This introduces two possibilities:
+ // - During transaction retries, the task may be added multiple times. This
+ // is fine, since it will naturally deduplicate.
+ // - If the overall transaction fails, the task may be added for a log stream
+ // that never exists. We handle this in the handler by warning (but not
+ // failing) on non-existent log streams.
+ if err := taskDispatcher.AddTask(ds.WithoutTransaction(c), task); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "taskName": task.Name(),
+ }.Errorf(c, "Failed to add task to task queue.")
+ return errors.Annotate(err, "failed to add task to task queue").Err()
+ }
+
+ log.Debugf(c, "Successfully created archival task: %q", task.Name())
+ return nil
+}
+
+// DeleteArchiveStreamExpiredTask deletes stream EXPIRED task associated with
+// id. If the task has already been deleted, this will do nothing.
+func DeleteArchiveStreamExpiredTask(c context.Context, id coordinator.HashID) error {
+ task := makeArchivalTask(c, id, logdog.ArchiveDispatchTask_EXPIRED)
+
+ log.Debugf(c, "Deleting archival task: %q", task.Name())
+ if err := taskDispatcher.DeleteTask(c, task); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "taskName": task.Name(),
+ }.Errorf(c, "Failed to delete expired archival task.")
+ return errors.Annotate(err, "failed to delete expired archival task").Err()
+ }
+
+ log.Debugf(c, "Successfully removed EXPIRED archival task: %q", task.Name())
+ return nil
+}
+
+// handleArchiveDispatchTask is a tq.Handler for an ArchiveDispatchTask.
+//
+// This task is associated with a log stream and some archival parameters. It
+// will verify that the log stream hasn't had archival dispatched yet. If it
+// has, the task will terminate without further operation.
+//
+// For streams that haven't been archived, this task will transactionally
+// dispatch an archival task to the Archivist fleet and update the stream's
+// status.
+func handleArchiveDispatchTask(c context.Context, payload proto.Message, execCount int) error {
+ adt, ok := payload.(*logdog.ArchiveDispatchTask)
+ if !ok {
+ return errors.Reason("unexpected message type %T", payload).Err()
+ }
+
+ log.Infof(c, "Handling archival for %q task (#%d) in namespace %q: %q",
+ adt.Tag, execCount, info.GetNamespace(c), adt.Id)
+
+ stream := coordinator.LogStream{ID: coordinator.HashID(adt.Id)}
+ state := stream.State(c)
+
+ // Check if we're already archived (non-transactional).
+ if err := ds.Get(c, state); err != nil {
+ if err == ds.ErrNoSuchEntity {
+ log.Warningf(c, "Log stream no longer exists.")
+ return nil
+ }
+
+ log.WithError(err).Errorf(c, "Failed to load archival log stream.")
+ return errors.Annotate(err, "failed to load archival log stream").Tag(transient.Tag).Err()
+ }
+ if state.ArchivalState() != coordinator.NotArchived {
+ log.Infof(c, "Log stream archival is already tasked.")
+ return nil
+ }
+
+ // Get our archival publisher.
+ svc := coordinator.GetServices(c)
+ ap, err := svc.ArchivalPublisher(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to get archival publisher.")
+ return errors.Annotate(err, "failed to get archival publisher").Tag(transient.Tag).Err()
+ }
+ defer func() {
+ if err := ap.Close(); err != nil {
+ log.WithError(err).Warningf(c, "Failed to close archival publisher.")
+ }
+ }()
+
+ params := coordinator.ArchivalParams{
+ RequestID: info.RequestID(c),
+ SettleDelay: google.DurationFromProto(adt.SettleDelay),
+ CompletePeriod: google.DurationFromProto(adt.CompletePeriod),
+ }
+
+ err = ds.RunInTransaction(c, func(c context.Context) error {
+ // Check if we're already archived (transactional).
+ if err := ds.Get(c, state); err != nil {
+ if err == ds.ErrNoSuchEntity {
+ log.Warningf(c, "(Transactional) Log stream no longer exists.")
+ return nil
+ }
+
+ log.WithError(err).Errorf(c, "(Transactional) Failed to load archival log stream.")
+ return errors.Annotate(err, "failed to load archival stream").Err()
+ }
+ if state.ArchivalState() != coordinator.NotArchived {
+ log.Infof(c, "(Transactional) Log stream archival is already tasked.")
+ return nil
+ }
+
+ if err = params.PublishTask(c, ap, state); err != nil {
+ if err == coordinator.ErrArchiveTasked {
+ log.Warningf(c, "Archival already tasked, skipping.")
+ return nil
+ }
+
+ log.WithError(err).Errorf(c, "Failed to publish archival task.")
+ return errors.Annotate(err, "failed to publish archival task").Err()
+ }
+
+ if err := ds.Put(c, state); err != nil {
+ log.WithError(err).Errorf(c, "Failed to update datastore.")
+ return errors.Annotate(err, "failed to update datastore").Err()
+ }
+
+ return nil
+ }, nil)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to publish archival task.")
+ return errors.Annotate(err, "failed to publish archival task").Tag(transient.Tag).Err()
+ }
+
+ log.Debugf(c, "Successfully published cleanup archival task.")
+ return nil
+}
+
+func makeArchivalTask(c context.Context, id coordinator.HashID, tag logdog.ArchiveDispatchTask_Tag) *tq.Task {
+ name := fmt.Sprintf("%s_%s", id, tag)
+ return &tq.Task{
+ Payload: emptyArchiveDispatchTask,
+ NamePrefix: name,
+ DeduplicationKey: info.GetNamespace(c),
+ Title: name,
+ }
+}
« no previous file with comments | « logdog/appengine/coordinator/service.go ('k') | logdog/appengine/coordinator/tasks/routes.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698