| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package mutations |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "time" |
| 10 |
| 11 ds "github.com/luci/gae/service/datastore" |
| 12 "github.com/luci/gae/service/info" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 14 "github.com/luci/luci-go/appengine/tumble" |
| 15 "github.com/luci/luci-go/common/logdog/types" |
| 16 log "github.com/luci/luci-go/common/logging" |
| 17 "golang.org/x/net/context" |
| 18 ) |
| 19 |
| 20 // CreateArchiveTask is a tumble Mutation that registers a single hierarchy |
| 21 // component. |
| 22 type CreateArchiveTask struct { |
| 23 // Path is the path of the log stream to create an archive task for. |
| 24 Path types.StreamPath |
| 25 |
| 26 // Expiration is the time when an archive task should be forced regardle
ss |
| 27 // of stream termination state. |
| 28 Expiration time.Time |
| 29 } |
| 30 |
| 31 var _ tumble.DelayedMutation = (*CreateArchiveTask)(nil) |
| 32 |
| 33 // RollForward implements tumble.DelayedMutation. |
| 34 func (m *CreateArchiveTask) RollForward(c context.Context) ([]tumble.Mutation, e
rror) { |
| 35 c = log.SetField(c, "path", m.Path) |
| 36 |
| 37 svc := coordinator.GetServices(c) |
| 38 ap, err := svc.ArchivalPublisher(c) |
| 39 if err != nil { |
| 40 log.WithError(err).Errorf(c, "Failed to get archival publisher."
) |
| 41 return nil, err |
| 42 } |
| 43 |
| 44 // Get the log stream. |
| 45 ls := m.logStream() |
| 46 if err := ds.Get(c).Get(ls); err != nil { |
| 47 if err == ds.ErrNoSuchEntity { |
| 48 log.Warningf(c, "Log stream no longer exists.") |
| 49 return nil, nil |
| 50 } |
| 51 |
| 52 log.WithError(err).Errorf(c, "Failed to load archival log stream
.") |
| 53 return nil, err |
| 54 } |
| 55 |
| 56 params := coordinator.ArchivalParams{ |
| 57 RequestID: info.Get(c).RequestID(), |
| 58 } |
| 59 if err := params.PublishTask(c, ap, ls); err != nil { |
| 60 log.WithError(err).Errorf(c, "Failed to publish archival task.") |
| 61 return nil, err |
| 62 } |
| 63 |
| 64 log.Debugf(c, "Successfully published cleanup archival task.") |
| 65 return nil, nil |
| 66 } |
| 67 |
| 68 // Root implements tumble.DelayedMutation. |
| 69 func (m *CreateArchiveTask) Root(c context.Context) *ds.Key { |
| 70 return ds.Get(c).KeyForObj(m.logStream()) |
| 71 } |
| 72 |
| 73 // ProcessAfter implements tumble.DelayedMutation. |
| 74 func (m *CreateArchiveTask) ProcessAfter() time.Time { |
| 75 return m.Expiration |
| 76 } |
| 77 |
| 78 // HighPriority implements tumble.DelayedMutation. |
| 79 func (m *CreateArchiveTask) HighPriority() bool { |
| 80 return false |
| 81 } |
| 82 |
| 83 // TaskName returns the task's name, which is derived from its path. |
| 84 func (m *CreateArchiveTask) TaskName(di ds.Interface) (*ds.Key, string) { |
| 85 ls := m.logStream() |
| 86 return di.KeyForObj(ls), fmt.Sprintf("archive-expired-%s", ls.HashID) |
| 87 } |
| 88 |
| 89 func (m *CreateArchiveTask) logStream() *coordinator.LogStream { |
| 90 return coordinator.LogStreamFromPath(m.Path) |
| 91 } |
| 92 |
| 93 func init() { |
| 94 tumble.Register((*CreateArchiveTask)(nil)) |
| 95 } |
| OLD | NEW |