Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(655)

Side by Side Diff: appengine/logdog/coordinator/mutations/createArchiveTask.go

Issue 1910633006: LogDog: Support per-namespace expired archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-coordinator-svcdec
Patch Set: Update another test. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package mutations
6
7 import (
8 "fmt"
9 "time"
10
11 ds "github.com/luci/gae/service/datastore"
12 "github.com/luci/gae/service/info"
13 "github.com/luci/luci-go/appengine/logdog/coordinator"
14 "github.com/luci/luci-go/appengine/tumble"
15 "github.com/luci/luci-go/common/logdog/types"
16 log "github.com/luci/luci-go/common/logging"
17 "golang.org/x/net/context"
18 )
19
20 // CreateArchiveTask is a tumble Mutation that registers a single hierarchy
21 // component.
22 type CreateArchiveTask struct {
23 // Path is the path of the log stream to create an archive task for.
24 Path types.StreamPath
25
26 // Expiration is the time when an archive task should be forced regardle ss
27 // of stream termination state.
28 Expiration time.Time
29 }
30
31 var _ tumble.DelayedMutation = (*CreateArchiveTask)(nil)
32
33 // RollForward implements tumble.DelayedMutation.
34 func (m *CreateArchiveTask) RollForward(c context.Context) ([]tumble.Mutation, e rror) {
35 c = log.SetField(c, "path", m.Path)
36
37 svc := coordinator.GetServices(c)
38 ap, err := svc.ArchivalPublisher(c)
39 if err != nil {
40 log.WithError(err).Errorf(c, "Failed to get archival publisher." )
41 return nil, err
42 }
43
44 // Get the log stream.
45 ls := m.logStream()
46 if err := ds.Get(c).Get(ls); err != nil {
47 if err == ds.ErrNoSuchEntity {
48 log.Warningf(c, "Log stream no longer exists.")
49 return nil, nil
50 }
51
52 log.WithError(err).Errorf(c, "Failed to load archival log stream .")
53 return nil, err
54 }
55
56 params := coordinator.ArchivalParams{
57 RequestID: info.Get(c).RequestID(),
58 }
59 if err := params.PublishTask(c, ap, ls); err != nil {
60 log.WithError(err).Errorf(c, "Failed to publish archival task.")
61 return nil, err
62 }
63
64 log.Debugf(c, "Successfully published cleanup archival task.")
65 return nil, nil
66 }
67
68 // Root implements tumble.DelayedMutation.
69 func (m *CreateArchiveTask) Root(c context.Context) *ds.Key {
70 return ds.Get(c).KeyForObj(m.logStream())
71 }
72
73 // ProcessAfter implements tumble.DelayedMutation.
74 func (m *CreateArchiveTask) ProcessAfter() time.Time {
75 return m.Expiration
76 }
77
78 // HighPriority implements tumble.DelayedMutation.
79 func (m *CreateArchiveTask) HighPriority() bool {
80 return false
81 }
82
83 // TaskName returns the task's name, which is derived from its path.
84 func (m *CreateArchiveTask) TaskName(di ds.Interface) (*ds.Key, string) {
85 ls := m.logStream()
86 return di.KeyForObj(ls), fmt.Sprintf("archive-expired-%s", ls.HashID)
87 }
88
89 func (m *CreateArchiveTask) logStream() *coordinator.LogStream {
90 return coordinator.LogStreamFromPath(m.Path)
91 }
92
93 func init() {
94 tumble.Register((*CreateArchiveTask)(nil))
95 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/endpoints/services/terminateStream_test.go ('k') | appengine/logdog/coordinator/project.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698