| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package coordinator |
| 6 |
| 7 import ( |
| 8 "time" |
| 9 |
| 10 "github.com/golang/protobuf/proto" |
| 11 tq "github.com/luci/gae/service/taskqueue" |
| 12 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 13 "github.com/luci/luci-go/common/proto/google" |
| 14 ) |
| 15 |
| 16 // ArchivalParams is the archival configuration. |
| 17 // |
| 18 // The zero value of this struct dispatches an archival job that accepts |
| 19 // incomplete archival immediately. |
| 20 type ArchivalParams struct { |
| 21 // SettleDelay is the amount of time to wait for all of the log entries
to be |
| 22 // collected before the archival task can be executed. |
| 23 // |
| 24 // The log entries can be collected out of order, so using a reasonable |
| 25 // collection settle period here will prevent the archivist from wasting
time |
| 26 // archiving incomplete logs. |
| 27 SettleDelay time.Duration |
| 28 |
| 29 // CompletePeriod is the amount of time after the initial archival task
is |
| 30 // executed when the task should fail if the stream is incomplete. After
this |
| 31 // period has expired, the archival may complete successfully even if th
e |
| 32 // stream is missing log entries. |
| 33 CompletePeriod time.Duration |
| 34 } |
| 35 |
| 36 // CreateTask creates a task queue task for the supplied LogStream. |
| 37 // |
| 38 // This should be run within a transaction on ls. On success, ls's state will |
| 39 // be updated to reflect the archival tasking. |
| 40 // |
| 41 // If the task is created successfully, this will return true. |
| 42 func (p *ArchivalParams) CreateTask(ti tq.Interface, ls *LogStream, queue string
) (bool, error) { |
| 43 if ls.State >= LSArchiveTasked { |
| 44 // An archival task has already been dispatched for this log str
eam. |
| 45 return false, nil |
| 46 } |
| 47 |
| 48 msg := logdog.ArchiveTask{ |
| 49 Path: string(ls.Path()), |
| 50 } |
| 51 if p.CompletePeriod > 0 { |
| 52 msg.CompletePeriod = google.NewDuration(p.CompletePeriod) |
| 53 } |
| 54 task, err := createPullTask(&msg) |
| 55 if err != nil { |
| 56 return false, err |
| 57 } |
| 58 |
| 59 // Enqueue the task. |
| 60 task.Delay = p.SettleDelay |
| 61 if err := ti.Add(task, queue); err != nil { |
| 62 return false, err |
| 63 } |
| 64 |
| 65 // Update our LogStream's ArchiveState to reflect that an archival task
has |
| 66 // been dispatched. |
| 67 ls.State = LSArchiveTasked |
| 68 ls.ArchiveTaskName = task.Name |
| 69 return true, nil |
| 70 } |
| 71 |
| 72 // createPullTask is a generic pull queue task creation method. It is used to |
| 73 // instantiate pull queue tasks. |
| 74 func createPullTask(msg proto.Message) (*tq.Task, error) { |
| 75 payload, err := proto.Marshal(msg) |
| 76 if err != nil { |
| 77 return nil, err |
| 78 } |
| 79 |
| 80 return &tq.Task{ |
| 81 Method: "PULL", |
| 82 Payload: payload, |
| 83 }, nil |
| 84 } |
| OLD | NEW |