Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(31)

Side by Side Diff: appengine/logdog/coordinator/archival.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package coordinator
6
7 import (
8 "time"
9
10 "github.com/golang/protobuf/proto"
11 tq "github.com/luci/gae/service/taskqueue"
12 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
13 "github.com/luci/luci-go/common/proto/google"
14 )
15
16 // ArchivalParams is the archival configuration.
17 //
18 // The zero value of this struct dispatches an archival job that accepts
19 // incomplete archival immediately.
20 type ArchivalParams struct {
21 // SettleDelay is the amount of time to wait for all of the log entries to be
22 // collected before the archival task can be executed.
23 //
24 // The log entries can be collected out of order, so using a reasonable
25 // collection settle period here will prevent the archivist from wasting time
26 // archiving incomplete logs.
27 SettleDelay time.Duration
28
29 // CompletePeriod is the amount of time after the initial archival task is
30 // executed when the task should fail if the stream is incomplete. After this
31 // period has expired, the archival may complete successfully even if th e
32 // stream is missing log entries.
33 CompletePeriod time.Duration
34 }
35
36 // CreateTask creates a task queue task for the supplied LogStream.
37 //
38 // This should be run within a transaction on ls. On success, ls's state will
39 // be updated to reflect the archival tasking.
40 //
41 // If the task is created successfully, this will return true.
42 func (p *ArchivalParams) CreateTask(ti tq.Interface, ls *LogStream, queue string ) (bool, error) {
43 if ls.State >= LSArchiveTasked {
44 // An archival task has already been dispatched for this log str eam.
45 return false, nil
46 }
47
48 msg := logdog.ArchiveTask{
49 Path: string(ls.Path()),
50 }
51 if p.CompletePeriod > 0 {
52 msg.CompletePeriod = google.NewDuration(p.CompletePeriod)
53 }
54 task, err := createPullTask(&msg)
55 if err != nil {
56 return false, err
57 }
58
59 // Enqueue the task.
60 task.Delay = p.SettleDelay
61 if err := ti.Add(task, queue); err != nil {
62 return false, err
63 }
64
65 // Update our LogStream's ArchiveState to reflect that an archival task has
66 // been dispatched.
67 ls.State = LSArchiveTasked
68 ls.ArchiveTaskName = task.Name
69 return true, nil
70 }
71
72 // createPullTask is a generic pull queue task creation method. It is used to
73 // instantiate pull queue tasks.
74 func createPullTask(msg proto.Message) (*tq.Task, error) {
75 payload, err := proto.Marshal(msg)
76 if err != nil {
77 return nil, err
78 }
79
80 return &tq.Task{
81 Method: "PULL",
82 Payload: payload,
83 }, nil
84 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698