Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: appengine/logdog/coordinator/backend/archiveCron.go

Issue 1910633006: LogDog: Support per-namespace expired archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-coordinator-svcdec
Patch Set: Update another test. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package backend
6
7 import (
8 "fmt"
9 "net/http"
10 "sync/atomic"
11
12 "github.com/julienschmidt/httprouter"
13 "github.com/luci/gae/filter/dsQueryBatch"
14 ds "github.com/luci/gae/service/datastore"
15 "github.com/luci/gae/service/info"
16 "github.com/luci/luci-go/appengine/logdog/coordinator"
17 "github.com/luci/luci-go/common/clock"
18 "github.com/luci/luci-go/common/errors"
19 log "github.com/luci/luci-go/common/logging"
20 "github.com/luci/luci-go/common/parallel"
21 "golang.org/x/net/context"
22 )
23
24 const archiveTaskVersion = "v4"
25
26 // HandleArchiveCron is the handler for the archive cron endpoint. This scans
27 // for log streams that are ready for archival.
28 //
29 // This will be called periodically by AppEngine cron.
30 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
31 errorWrapper(c, w, func() error {
32 return b.archiveCron(c)
33 })
34 }
35
36 func (b *Backend) archiveCron(c context.Context) error {
37 svc := coordinator.GetServices(c)
38 _, cfg, err := svc.Config(c)
39 if err != nil {
40 return fmt.Errorf("failed to load configuration: %v", err)
41 }
42
43 archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration()
44 if archiveDelayMax <= 0 {
45 return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String())
46 }
47
48 ap, err := svc.ArchivalPublisher(c)
49 if err != nil {
50 return fmt.Errorf("failed to get archival publisher: %v", err)
51 }
52
53 threshold := clock.Now(c).UTC().Add(-archiveDelayMax)
54 log.Fields{
55 "threshold": threshold,
56 }.Infof(c, "Querying for all streaming logs created before max archival threshold.")
57
58 // Query for log streams that were created <= our threshold and that are
59 // still in LSStreaming state.
60 //
61 // We order descending because this is already an index that we use for our
62 // "logdog.Logs.Query".
63 q := ds.NewQuery("LogStream").
64 KeysOnly(true).
65 Eq("State", coordinator.LSStreaming).
66 Lte("Created", threshold).
67 Order("-Created", "State")
68
69 // Since these logs are beyond maximum archival delay, we will dispatch
70 // archival immediately.
71 params := coordinator.ArchivalParams{
72 RequestID: info.Get(c).RequestID(),
73 }
74
75 // Create archive tasks for our expired log streams in parallel.
76 batch := b.getMultiTaskBatchSize()
77 var tasked int32
78 var failed int32
79
80 var ierr error
81 parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) {
82 // Run a batched query across the expired log stream space.
83 ierr = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(lsKey *ds.Key) error {
84 var ls coordinator.LogStream
85 ds.PopulateKey(&ls, lsKey)
86
87 // Archive this log stream in a transaction.
88 taskC <- func() error {
89 err := ds.Get(c).RunInTransaction(func(c context .Context) error {
90 if err := ds.Get(c).Get(&ls); err != nil {
91 log.WithError(err).Errorf(c, "Fa iled to load stream.")
92 return err
93 }
94
95 log.Fields{
96 "path": ls.Path(),
97 "id": ls.HashID,
98 }.Infof(c, "Identified expired log strea m.")
99
100 if err := params.PublishTask(c, ap, &ls) ; err != nil {
101 if err == coordinator.ErrArchive Tasked {
102 log.Warningf(c, "Archiva l has already been tasked for this stream.")
103 return nil
104 }
105 return err
106 }
107 return ds.Get(c).Put(&ls)
108 }, nil)
109
110 if err != nil {
111 log.Fields{
112 log.ErrorKey: err,
113 "path": ls.Path(),
114 }.Errorf(c, "Failed to archive log strea m.")
115 atomic.AddInt32(&failed, 1)
116 return nil // Nothing will consume it an yway.
117 }
118
119 log.Fields{
120 "path": ls.Path(),
121 "id": ls.HashID,
122 "archiveTopic": cfg.Coordinator.ArchiveT opic,
123 }.Infof(c, "Created archive task.")
124 atomic.AddInt32(&tasked, 1)
125 return nil
126 }
127
128 return nil
129 })
130 }))
131
132 // Return an error code if we experienced any failures. This doesn't rea lly
133 // have an impact, but it will show up as a "!" in the cron UI.
134 switch {
135 case ierr != nil:
136 log.Fields{
137 log.ErrorKey: err,
138 "archiveCount": tasked,
139 }.Errorf(c, "Failed to execute expired tasks query.")
140 return ierr
141
142 case failed > 0:
143 log.Fields{
144 log.ErrorKey: err,
145 "archiveCount": tasked,
146 "failCount": failed,
147 }.Errorf(c, "Failed to archive candidate all streams.")
148 return errors.New("failed to archive all candidate streams")
149
150 default:
151 log.Fields{
152 "archiveCount": tasked,
153 }.Infof(c, "Archive sweep completed successfully.")
154 return nil
155 }
156 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/archivalPublisher.go ('k') | appengine/logdog/coordinator/backend/archiveCron_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698