Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(95)

Side by Side Diff: appengine/logdog/coordinator/backend/archiveCron.go

Issue 1910633006: LogDog: Support per-namespace expired archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-coordinator-svcdec
Patch Set: Bugfixes, updates, works. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package backend 5 package backend
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "net/http" 9 "net/http"
10 "sync/atomic" 10 "sync/atomic"
11 11
12 "github.com/julienschmidt/httprouter" 12 "github.com/julienschmidt/httprouter"
13 "github.com/luci/gae/filter/dsQueryBatch" 13 "github.com/luci/gae/filter/dsQueryBatch"
14 ds "github.com/luci/gae/service/datastore" 14 ds "github.com/luci/gae/service/datastore"
15 "github.com/luci/gae/service/info" 15 "github.com/luci/gae/service/info"
16 tq "github.com/luci/gae/service/taskqueue"
16 "github.com/luci/luci-go/appengine/logdog/coordinator" 17 "github.com/luci/luci-go/appengine/logdog/coordinator"
17 "github.com/luci/luci-go/common/clock" 18 "github.com/luci/luci-go/common/clock"
19 "github.com/luci/luci-go/common/config"
18 "github.com/luci/luci-go/common/errors" 20 "github.com/luci/luci-go/common/errors"
19 log "github.com/luci/luci-go/common/logging" 21 log "github.com/luci/luci-go/common/logging"
20 "github.com/luci/luci-go/common/parallel" 22 "github.com/luci/luci-go/common/parallel"
21 "golang.org/x/net/context" 23 "golang.org/x/net/context"
22 ) 24 )
23 25
24 const archiveTaskVersion = "v4"
25
26 // HandleArchiveCron is the handler for the archive cron endpoint. This scans 26 // HandleArchiveCron is the handler for the archive cron endpoint. This scans
27 // for log streams that are ready for archival. 27 // for log streams that are ready for archival.
28 // 28 //
29 // This will be called periodically by AppEngine cron. 29 // This will be called periodically by AppEngine cron.
30 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { 30 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
31 errorWrapper(c, w, func() error { 31 errorWrapper(c, w, func() error {
32 return b.archiveCron(c) 32 return b.archiveCron(c)
33 }) 33 })
34 } 34 }
35 35
36 // HandleArchiveScanTask is the handler for the "/archive/cron/scan" endpoint.
37 // This scans for log streams that are ready for archival for a specific project
38 // namespace.
39 //
40 // This is tasked during a HandleArchiveCron run.
41 func (b *Backend) HandleArchiveScanTask(c context.Context, w http.ResponseWriter , r *http.Request, p httprouter.Params) {
42 errorWrapper(c, w, func() error {
43 return b.archiveScanTask(c)
44 })
45 }
46
36 func (b *Backend) archiveCron(c context.Context) error { 47 func (b *Backend) archiveCron(c context.Context) error {
37 » svc := coordinator.GetServices(c) 48 » log.Debugf(c, "Beginning archive cron project dispatching.")
38 » _, cfg, err := svc.Config(c) 49
50 » gcfg, cfg, err := coordinator.GetServices(c).Config(c)
39 if err != nil { 51 if err != nil {
40 return fmt.Errorf("failed to load configuration: %v", err) 52 return fmt.Errorf("failed to load configuration: %v", err)
41 } 53 }
54
55 queueName := cfg.Coordinator.ArchiveScanProjectQueueName
56 if queueName == "" {
57 return errors.New(`configuration is missing "archive_scan_projec t_queue_name"`)
58 }
59
60 // For each luci-config project that we have registered, execute a names pace
61 // sweep.
62 projects, err := gcfg.Projects(c)
63 if err != nil {
64 return fmt.Errorf("failed to enumerate projects")
65 }
66
67 log.Fields{
68 "projectCount": len(projects),
69 }.Debugf(c, "Adding cron tasks for projects.")
70 err = parallel.FanOutIn(func(taskC chan<- func() error) {
71 // TODO(dnj): Remove this empty namespace once it's no longer su pported.
72 taskC <- func() error {
73 ic := c
74 if err := coordinator.WithProjectNamespace(&ic, ""); err != nil {
75 return err
76 }
77
78 if err := tq.Get(c).Add(tq.NewPOSTTask("/archive/cron/sc an", nil), queueName); err != nil {
79 log.WithError(err).Errorf(c, "Failed to add non- namesapced scan task.")
Ryan Tseng 2016/04/28 20:16:03 namespaced
80 return err
81 }
82 return nil
83 }
84
85 for _, p := range projects {
86 p := p
87 log.Fields{
88 "project": p,
89 }.Debugf(c, "Creating archive scan task for project.")
90
91 taskC <- func() error {
92 ic := c
Ryan Tseng 2016/04/28 20:16:03 Maybe add small comment about why the context is b
93 if err := coordinator.WithProjectNamespace(&ic, config.ProjectName(p)); err != nil {
94 return err
95 }
96
97 if err := tq.Get(ic).Add(tq.NewPOSTTask("/archiv e/cron/scan", nil), queueName); err != nil {
98 log.Fields{
99 log.ErrorKey: err,
100 "project": p,
101 }.Errorf(c, "Failed to add scan task for project.")
102 return err
103 }
104
105 log.Fields{
106 "project": p,
107 }.Debugf(c, "Created scan task for project.")
108 return nil
109 }
110 }
111 })
112 if err != nil {
113 return errors.New("failed to create all scan tasks")
Ryan Tseng 2016/04/28 20:16:03 All? Or just some?
114 }
115 return nil
116 }
117
118 // archiveScanTask is an individual task that scans a single project namespace
119 // for expired log streams and dispatches archival requests for them.
120 func (b *Backend) archiveScanTask(c context.Context) error {
121 log.Fields{
122 "project": coordinator.Project(c),
123 }.Debugf(c, "Beginning archive scan task.")
124
125 services := coordinator.GetServices(c)
126 _, cfg, err := services.Config(c)
127 if err != nil {
128 return fmt.Errorf("failed to load configuration: %v", err)
129 }
42 130
43 archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration() 131 archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration()
44 if archiveDelayMax <= 0 { 132 if archiveDelayMax <= 0 {
45 return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String()) 133 return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String())
46 } 134 }
47 135
48 » ap, err := svc.ArchivalPublisher(c) 136 » ap, err := services.ArchivalPublisher(c)
49 if err != nil { 137 if err != nil {
50 return fmt.Errorf("failed to get archival publisher: %v", err) 138 return fmt.Errorf("failed to get archival publisher: %v", err)
51 } 139 }
52 140
53 threshold := clock.Now(c).UTC().Add(-archiveDelayMax) 141 threshold := clock.Now(c).UTC().Add(-archiveDelayMax)
54 log.Fields{ 142 log.Fields{
55 "threshold": threshold, 143 "threshold": threshold,
56 }.Infof(c, "Querying for all streaming logs created before max archival threshold.") 144 }.Infof(c, "Querying for all streaming logs created before max archival threshold.")
57 145
58 // Query for log streams that were created <= our threshold and that are 146 // Query for log streams that were created <= our threshold and that are
(...skipping 11 matching lines...) Expand all
70 // archival immediately. 158 // archival immediately.
71 params := coordinator.ArchivalParams{ 159 params := coordinator.ArchivalParams{
72 RequestID: info.Get(c).RequestID(), 160 RequestID: info.Get(c).RequestID(),
73 } 161 }
74 162
75 // Create archive tasks for our expired log streams in parallel. 163 // Create archive tasks for our expired log streams in parallel.
76 batch := b.getMultiTaskBatchSize() 164 batch := b.getMultiTaskBatchSize()
77 var tasked int32 165 var tasked int32
78 var failed int32 166 var failed int32
79 167
168 //
Ryan Tseng 2016/04/28 20:16:03 ?
169
80 var ierr error 170 var ierr error
81 parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) { 171 parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) {
82 // Run a batched query across the expired log stream space. 172 // Run a batched query across the expired log stream space.
83 ierr = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(lsKey *ds.Key) error { 173 ierr = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(lsKey *ds.Key) error {
84 var ls coordinator.LogStream 174 var ls coordinator.LogStream
85 ds.PopulateKey(&ls, lsKey) 175 ds.PopulateKey(&ls, lsKey)
86 176
87 // Archive this log stream in a transaction. 177 // Archive this log stream in a transaction.
88 taskC <- func() error { 178 taskC <- func() error {
89 err := ds.Get(c).RunInTransaction(func(c context .Context) error { 179 err := ds.Get(c).RunInTransaction(func(c context .Context) error {
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
147 }.Errorf(c, "Failed to archive candidate all streams.") 237 }.Errorf(c, "Failed to archive candidate all streams.")
148 return errors.New("failed to archive all candidate streams") 238 return errors.New("failed to archive all candidate streams")
149 239
150 default: 240 default:
151 log.Fields{ 241 log.Fields{
152 "archiveCount": tasked, 242 "archiveCount": tasked,
153 }.Infof(c, "Archive sweep completed successfully.") 243 }.Infof(c, "Archive sweep completed successfully.")
154 return nil 244 return nil
155 } 245 }
156 } 246 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698