| Index: appengine/logdog/coordinator/archival.go
|
| diff --git a/appengine/logdog/coordinator/archival.go b/appengine/logdog/coordinator/archival.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..d1a1ae3f26f7062bdef2f73528699bfc0ffb9968
|
| --- /dev/null
|
| +++ b/appengine/logdog/coordinator/archival.go
|
| @@ -0,0 +1,84 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package coordinator
|
| +
|
| +import (
|
| + "time"
|
| +
|
| + "github.com/golang/protobuf/proto"
|
| + tq "github.com/luci/gae/service/taskqueue"
|
| + "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| + "github.com/luci/luci-go/common/proto/google"
|
| +)
|
| +
|
| +// ArchivalParams is the archival configuration.
|
| +//
|
| +// The zero value of this struct dispatches an archival job that accepts
|
| +// incomplete archival immediately.
|
| +type ArchivalParams struct {
|
| + // SettleDelay is the amount of time to wait for all of the log entries to be
|
| + // collected before the archival task can be executed.
|
| + //
|
| + // The log entries can be collected out of order, so using a reasonable
|
| + // collection settle period here will prevent the archivist from wasting time
|
| + // archiving incomplete logs.
|
| + SettleDelay time.Duration
|
| +
|
| + // CompletePeriod is the amount of time after the initial archival task is
|
| + // executed when the task should fail if the stream is incomplete. After this
|
| + // period has expired, the archival may complete successfully even if the
|
| + // stream is missing log entries.
|
| + CompletePeriod time.Duration
|
| +}
|
| +
|
| +// CreateTask creates a task queue task for the supplied LogStream.
|
| +//
|
| +// This should be run within a transaction on ls. On success, ls's state will
|
| +// be updated to reflect the archival tasking.
|
| +//
|
| +// If the task is created successfully, this will return true.
|
| +func (p *ArchivalParams) CreateTask(ti tq.Interface, ls *LogStream, queue string) (bool, error) {
|
| + if ls.State >= LSArchiveTasked {
|
| + // An archival task has already been dispatched for this log stream.
|
| + return false, nil
|
| + }
|
| +
|
| + msg := logdog.ArchiveTask{
|
| + Path: string(ls.Path()),
|
| + }
|
| + if p.CompletePeriod > 0 {
|
| + msg.CompletePeriod = google.NewDuration(p.CompletePeriod)
|
| + }
|
| + task, err := createPullTask(&msg)
|
| + if err != nil {
|
| + return false, err
|
| + }
|
| +
|
| + // Enqueue the task.
|
| + task.Delay = p.SettleDelay
|
| + if err := ti.Add(task, queue); err != nil {
|
| + return false, err
|
| + }
|
| +
|
| + // Update our LogStream's ArchiveState to reflect that an archival task has
|
| + // been dispatched.
|
| + ls.State = LSArchiveTasked
|
| + ls.ArchiveTaskName = task.Name
|
| + return true, nil
|
| +}
|
| +
|
| +// createPullTask is a generic pull queue task creation method. It is used to
|
| +// instantiate pull queue tasks.
|
| +func createPullTask(msg proto.Message) (*tq.Task, error) {
|
| + payload, err := proto.Marshal(msg)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + return &tq.Task{
|
| + Method: "PULL",
|
| + Payload: payload,
|
| + }, nil
|
| +}
|
|
|