Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2212)

Unified Diff: appengine/logdog/coordinator/archival.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/archival.go
diff --git a/appengine/logdog/coordinator/archival.go b/appengine/logdog/coordinator/archival.go
new file mode 100644
index 0000000000000000000000000000000000000000..931f36a27194f0047ac64566b2c011444a450521
--- /dev/null
+++ b/appengine/logdog/coordinator/archival.go
@@ -0,0 +1,88 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package coordinator
+
+import (
+ "crypto/sha256"
+ "errors"
+ "fmt"
+ "sync/atomic"
+ "time"
+
+ "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
+ "github.com/luci/luci-go/common/proto/google"
+ "golang.org/x/net/context"
+)
+
+// ErrArchiveTasked is returned by ArchivalParams' PublishTask if the supplied
+// LogStream indicates that it has already had an archival request dispatched.
+var ErrArchiveTasked = errors.New("archival already tasked for this stream")
+
+// ArchivalParams is the archival configuration.
+type ArchivalParams struct {
+ // RequestID is the unique request ID to use as a random base or the
+ // archival key.
+ RequestID string
+
+ // SettleDelay is the amount of settle delay to attach to this request.
+ SettleDelay time.Duration
+
+ // CompletePeriod is the amount of time after the initial archival task is
+ // executed when the task should fail if the stream is incomplete. After this
+ // period has expired, the archival may complete successfully even if the
+ // stream is missing log entries.
+ CompletePeriod time.Duration
+
+ // keyIndex is atomically incremented each time a request is published to
+ // differentiate it from previous superfluous requests to the same stream.
+ // This must be atomically-manipulated, since PublishTask may be called
+ // multiple times for the same stream if executed as part of a transaction.
+ keyIndex int32
+}
+
+// PublishTask creates and dispatches a task queue task for the supplied
+// LogStream. PublishTask is goroutine-safe.
+//
+// This should be run within a transaction on ls. On success, ls's state will
+// be updated to reflect the archival tasking.
+//
+// If the task is created successfully, this will return nil. If the LogStream
+// already had a task dispatched, it will return ErrArchiveTasked.
+func (p *ArchivalParams) PublishTask(c context.Context, ap ArchivalPublisher, ls *LogStream) error {
+ if ls.State >= LSArchiveTasked {
+ // An archival task has already been dispatched for this log stream.
+ return ErrArchiveTasked
+ }
+
+ path := string(ls.Path())
+ msg := logdog.ArchiveTask{
+ Path: path,
+ Key: p.createArchivalKey(path),
+ }
+ if p.SettleDelay > 0 {
+ msg.SettleDelay = google.NewDuration(p.SettleDelay)
+ }
+ if p.CompletePeriod > 0 {
+ msg.CompletePeriod = google.NewDuration(p.CompletePeriod)
+ }
+
+ // Publish an archival request.
+ if err := ap.Publish(c, &msg); err != nil {
+ return err
+ }
+
+ // Update our LogStream's ArchiveState to reflect that an archival task has
+ // been dispatched.
+ ls.State = LSArchiveTasked
+ ls.ArchivalKey = msg.Key
+ return nil
+}
+
+// createArchivalKey returns a unique archival request key
+func (p *ArchivalParams) createArchivalKey(path string) []byte {
+ index := atomic.AddInt32(&p.keyIndex, 1)
+ hash := sha256.Sum256([]byte(fmt.Sprintf("%s-%s-%d", p.RequestID, path, index)))
+ return hash[:]
+}
« no previous file with comments | « appengine/cmd/logdog_coordinator/vmuser/queue.yaml ('k') | appengine/logdog/coordinator/archivalPublisher.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698