Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2039)

Unified Diff: appengine/logdog/coordinator/archival.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/archival.go
diff --git a/appengine/logdog/coordinator/archival.go b/appengine/logdog/coordinator/archival.go
new file mode 100644
index 0000000000000000000000000000000000000000..d1a1ae3f26f7062bdef2f73528699bfc0ffb9968
--- /dev/null
+++ b/appengine/logdog/coordinator/archival.go
@@ -0,0 +1,84 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package coordinator
+
+import (
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ tq "github.com/luci/gae/service/taskqueue"
+ "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
+ "github.com/luci/luci-go/common/proto/google"
+)
+
+// ArchivalParams is the archival configuration.
+//
+// The zero value of this struct dispatches an archival job that accepts
+// incomplete archival immediately.
+type ArchivalParams struct {
+ // SettleDelay is the amount of time to wait for all of the log entries to be
+ // collected before the archival task can be executed.
+ //
+ // The log entries can be collected out of order, so using a reasonable
+ // collection settle period here will prevent the archivist from wasting time
+ // archiving incomplete logs.
+ SettleDelay time.Duration
+
+ // CompletePeriod is the amount of time after the initial archival task is
+ // executed when the task should fail if the stream is incomplete. After this
+ // period has expired, the archival may complete successfully even if the
+ // stream is missing log entries.
+ CompletePeriod time.Duration
+}
+
+// CreateTask creates a task queue task for the supplied LogStream.
+//
+// This should be run within a transaction on ls. On success, ls's state will
+// be updated to reflect the archival tasking.
+//
+// If the task is created successfully, this will return true.
+func (p *ArchivalParams) CreateTask(ti tq.Interface, ls *LogStream, queue string) (bool, error) {
+ if ls.State >= LSArchiveTasked {
+ // An archival task has already been dispatched for this log stream.
+ return false, nil
+ }
+
+ msg := logdog.ArchiveTask{
+ Path: string(ls.Path()),
+ }
+ if p.CompletePeriod > 0 {
+ msg.CompletePeriod = google.NewDuration(p.CompletePeriod)
+ }
+ task, err := createPullTask(&msg)
+ if err != nil {
+ return false, err
+ }
+
+ // Enqueue the task.
+ task.Delay = p.SettleDelay
+ if err := ti.Add(task, queue); err != nil {
+ return false, err
+ }
+
+ // Update our LogStream's ArchiveState to reflect that an archival task has
+ // been dispatched.
+ ls.State = LSArchiveTasked
+ ls.ArchiveTaskName = task.Name
+ return true, nil
+}
+
+// createPullTask is a generic pull queue task creation method. It is used to
+// instantiate pull queue tasks.
+func createPullTask(msg proto.Message) (*tq.Task, error) {
+ payload, err := proto.Marshal(msg)
+ if err != nil {
+ return nil, err
+ }
+
+ return &tq.Task{
+ Method: "PULL",
+ Payload: payload,
+ }, nil
+}

Powered by Google App Engine
This is Rietveld 408576698