Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(335)

Side by Side Diff: appengine/logdog/coordinator/archival.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package coordinator
6
7 import (
8 "crypto/sha256"
9 "errors"
10 "fmt"
11 "sync/atomic"
12 "time"
13
14 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
15 "github.com/luci/luci-go/common/proto/google"
16 "golang.org/x/net/context"
17 )
18
19 // ErrArchiveTasked is returned by ArchivalParams' PublishTask if the supplied
20 // LogStream indicates that it has already had an archival request dispatched.
21 var ErrArchiveTasked = errors.New("archival already tasked for this stream")
22
23 // ArchivalParams is the archival configuration.
24 type ArchivalParams struct {
25 // RequestID is the unique request ID to use as a random base or the
26 // archival key.
27 RequestID string
28
29 // SettleDelay is the amount of settle delay to attach to this request.
30 SettleDelay time.Duration
31
32 // CompletePeriod is the amount of time after the initial archival task is
33 // executed when the task should fail if the stream is incomplete. After this
34 // period has expired, the archival may complete successfully even if th e
35 // stream is missing log entries.
36 CompletePeriod time.Duration
37
38 // keyIndex is atomically incremented each time a request is published t o
39 // differentiate it from previous superfluous requests to the same strea m.
40 // This must be atomically-manipulated, since PublishTask may be called
41 // multiple times for the same stream if executed as part of a transacti on.
42 keyIndex int32
43 }
44
45 // PublishTask creates and dispatches a task queue task for the supplied
46 // LogStream. PublishTask is goroutine-safe.
47 //
48 // This should be run within a transaction on ls. On success, ls's state will
49 // be updated to reflect the archival tasking.
50 //
51 // If the task is created successfully, this will return nil. If the LogStream
52 // already had a task dispatched, it will return ErrArchiveTasked.
53 func (p *ArchivalParams) PublishTask(c context.Context, ap ArchivalPublisher, ls *LogStream) error {
54 if ls.State >= LSArchiveTasked {
55 // An archival task has already been dispatched for this log str eam.
56 return ErrArchiveTasked
57 }
58
59 path := string(ls.Path())
60 msg := logdog.ArchiveTask{
61 Path: path,
62 Key: p.createArchivalKey(path),
63 }
64 if p.SettleDelay > 0 {
65 msg.SettleDelay = google.NewDuration(p.SettleDelay)
66 }
67 if p.CompletePeriod > 0 {
68 msg.CompletePeriod = google.NewDuration(p.CompletePeriod)
69 }
70
71 // Publish an archival request.
72 if err := ap.Publish(c, &msg); err != nil {
73 return err
74 }
75
76 // Update our LogStream's ArchiveState to reflect that an archival task has
77 // been dispatched.
78 ls.State = LSArchiveTasked
79 ls.ArchivalKey = msg.Key
80 return nil
81 }
82
83 // createArchivalKey returns a unique archival request key
84 func (p *ArchivalParams) createArchivalKey(path string) []byte {
85 index := atomic.AddInt32(&p.keyIndex, 1)
86 hash := sha256.Sum256([]byte(fmt.Sprintf("%s-%s-%d", p.RequestID, path, index)))
87 return hash[:]
88 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698