Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(40)

Unified Diff: appengine/logdog/coordinator/mutations/createArchiveTask.go

Issue 1910633006: LogDog: Support per-namespace expired archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-coordinator-svcdec
Patch Set: Update another test. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/mutations/createArchiveTask.go
diff --git a/appengine/logdog/coordinator/mutations/createArchiveTask.go b/appengine/logdog/coordinator/mutations/createArchiveTask.go
new file mode 100644
index 0000000000000000000000000000000000000000..548dcd6289fc18fcef13bf2a1dda34385eac4113
--- /dev/null
+++ b/appengine/logdog/coordinator/mutations/createArchiveTask.go
@@ -0,0 +1,95 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package mutations
+
+import (
+ "fmt"
+ "time"
+
+ ds "github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/info"
+ "github.com/luci/luci-go/appengine/logdog/coordinator"
+ "github.com/luci/luci-go/appengine/tumble"
+ "github.com/luci/luci-go/common/logdog/types"
+ log "github.com/luci/luci-go/common/logging"
+ "golang.org/x/net/context"
+)
+
+// CreateArchiveTask is a tumble Mutation that registers a single hierarchy
+// component.
+type CreateArchiveTask struct {
+ // Path is the path of the log stream to create an archive task for.
+ Path types.StreamPath
+
+ // Expiration is the time when an archive task should be forced regardless
+ // of stream termination state.
+ Expiration time.Time
+}
+
+var _ tumble.DelayedMutation = (*CreateArchiveTask)(nil)
+
+// RollForward implements tumble.DelayedMutation.
+func (m *CreateArchiveTask) RollForward(c context.Context) ([]tumble.Mutation, error) {
+ c = log.SetField(c, "path", m.Path)
+
+ svc := coordinator.GetServices(c)
+ ap, err := svc.ArchivalPublisher(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to get archival publisher.")
+ return nil, err
+ }
+
+ // Get the log stream.
+ ls := m.logStream()
+ if err := ds.Get(c).Get(ls); err != nil {
+ if err == ds.ErrNoSuchEntity {
+ log.Warningf(c, "Log stream no longer exists.")
+ return nil, nil
+ }
+
+ log.WithError(err).Errorf(c, "Failed to load archival log stream.")
+ return nil, err
+ }
+
+ params := coordinator.ArchivalParams{
+ RequestID: info.Get(c).RequestID(),
+ }
+ if err := params.PublishTask(c, ap, ls); err != nil {
+ log.WithError(err).Errorf(c, "Failed to publish archival task.")
+ return nil, err
+ }
+
+ log.Debugf(c, "Successfully published cleanup archival task.")
+ return nil, nil
+}
+
+// Root implements tumble.DelayedMutation.
+func (m *CreateArchiveTask) Root(c context.Context) *ds.Key {
+ return ds.Get(c).KeyForObj(m.logStream())
+}
+
+// ProcessAfter implements tumble.DelayedMutation.
+func (m *CreateArchiveTask) ProcessAfter() time.Time {
+ return m.Expiration
+}
+
+// HighPriority implements tumble.DelayedMutation.
+func (m *CreateArchiveTask) HighPriority() bool {
+ return false
+}
+
+// TaskName returns the task's name, which is derived from its path.
+func (m *CreateArchiveTask) TaskName(di ds.Interface) (*ds.Key, string) {
+ ls := m.logStream()
+ return di.KeyForObj(ls), fmt.Sprintf("archive-expired-%s", ls.HashID)
+}
+
+func (m *CreateArchiveTask) logStream() *coordinator.LogStream {
+ return coordinator.LogStreamFromPath(m.Path)
+}
+
+func init() {
+ tumble.Register((*CreateArchiveTask)(nil))
+}
« no previous file with comments | « appengine/logdog/coordinator/endpoints/services/terminateStream_test.go ('k') | appengine/logdog/coordinator/project.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698