Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package backend | 5 package backend |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "net/http" | 9 "net/http" |
| 10 "sync/atomic" | 10 "sync/atomic" |
| 11 | 11 |
| 12 "github.com/julienschmidt/httprouter" | 12 "github.com/julienschmidt/httprouter" |
| 13 "github.com/luci/gae/filter/dsQueryBatch" | 13 "github.com/luci/gae/filter/dsQueryBatch" |
| 14 ds "github.com/luci/gae/service/datastore" | 14 ds "github.com/luci/gae/service/datastore" |
| 15 "github.com/luci/gae/service/info" | 15 "github.com/luci/gae/service/info" |
| 16 tq "github.com/luci/gae/service/taskqueue" | |
| 16 "github.com/luci/luci-go/appengine/logdog/coordinator" | 17 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 17 "github.com/luci/luci-go/common/clock" | 18 "github.com/luci/luci-go/common/clock" |
| 19 "github.com/luci/luci-go/common/config" | |
| 18 "github.com/luci/luci-go/common/errors" | 20 "github.com/luci/luci-go/common/errors" |
| 19 log "github.com/luci/luci-go/common/logging" | 21 log "github.com/luci/luci-go/common/logging" |
| 20 "github.com/luci/luci-go/common/parallel" | 22 "github.com/luci/luci-go/common/parallel" |
| 21 "golang.org/x/net/context" | 23 "golang.org/x/net/context" |
| 22 ) | 24 ) |
| 23 | 25 |
| 24 const archiveTaskVersion = "v4" | |
| 25 | |
| 26 // HandleArchiveCron is the handler for the archive cron endpoint. This scans | 26 // HandleArchiveCron is the handler for the archive cron endpoint. This scans |
| 27 // for log streams that are ready for archival. | 27 // for log streams that are ready for archival. |
| 28 // | 28 // |
| 29 // This will be called periodically by AppEngine cron. | 29 // This will be called periodically by AppEngine cron. |
| 30 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { | 30 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { |
| 31 errorWrapper(c, w, func() error { | 31 errorWrapper(c, w, func() error { |
| 32 return b.archiveCron(c) | 32 return b.archiveCron(c) |
| 33 }) | 33 }) |
| 34 } | 34 } |
| 35 | 35 |
| 36 // HandleArchiveScanTask is the handler for the "/archive/cron/scan" endpoint. | |
| 37 // This scans for log streams that are ready for archival for a specific project | |
| 38 // namespace. | |
| 39 // | |
| 40 // This is tasked during a HandleArchiveCron run. | |
| 41 func (b *Backend) HandleArchiveScanTask(c context.Context, w http.ResponseWriter , r *http.Request, p httprouter.Params) { | |
| 42 errorWrapper(c, w, func() error { | |
| 43 return b.archiveScanTask(c) | |
| 44 }) | |
| 45 } | |
| 46 | |
| 36 func (b *Backend) archiveCron(c context.Context) error { | 47 func (b *Backend) archiveCron(c context.Context) error { |
| 37 » svc := coordinator.GetServices(c) | 48 » log.Debugf(c, "Beginning archive cron project dispatching.") |
| 38 » _, cfg, err := svc.Config(c) | 49 |
| 50 » gcfg, cfg, err := coordinator.GetServices(c).Config(c) | |
| 39 if err != nil { | 51 if err != nil { |
| 40 return fmt.Errorf("failed to load configuration: %v", err) | 52 return fmt.Errorf("failed to load configuration: %v", err) |
| 41 } | 53 } |
| 54 | |
| 55 queueName := cfg.Coordinator.ArchiveScanProjectQueueName | |
| 56 if queueName == "" { | |
| 57 return errors.New(`configuration is missing "archive_scan_projec t_queue_name"`) | |
| 58 } | |
| 59 | |
| 60 // For each luci-config project that we have registered, execute a names pace | |
| 61 // sweep. | |
| 62 projects, err := gcfg.Projects(c) | |
| 63 if err != nil { | |
| 64 return fmt.Errorf("failed to enumerate projects") | |
| 65 } | |
| 66 | |
| 67 log.Fields{ | |
| 68 "projectCount": len(projects), | |
| 69 }.Debugf(c, "Adding cron tasks for projects.") | |
| 70 err = parallel.FanOutIn(func(taskC chan<- func() error) { | |
| 71 // TODO(dnj): Remove this empty namespace once it's no longer su pported. | |
| 72 taskC <- func() error { | |
| 73 ic := c | |
| 74 if err := coordinator.WithProjectNamespace(&ic, ""); err != nil { | |
| 75 return err | |
| 76 } | |
| 77 | |
| 78 if err := tq.Get(c).Add(tq.NewPOSTTask("/archive/cron/sc an", nil), queueName); err != nil { | |
| 79 log.WithError(err).Errorf(c, "Failed to add non- namesapced scan task.") | |
|
Ryan Tseng
2016/04/28 20:16:03
namespaced
| |
| 80 return err | |
| 81 } | |
| 82 return nil | |
| 83 } | |
| 84 | |
| 85 for _, p := range projects { | |
| 86 p := p | |
| 87 log.Fields{ | |
| 88 "project": p, | |
| 89 }.Debugf(c, "Creating archive scan task for project.") | |
| 90 | |
| 91 taskC <- func() error { | |
| 92 ic := c | |
|
Ryan Tseng
2016/04/28 20:16:03
Maybe add small comment about why the context is b
| |
| 93 if err := coordinator.WithProjectNamespace(&ic, config.ProjectName(p)); err != nil { | |
| 94 return err | |
| 95 } | |
| 96 | |
| 97 if err := tq.Get(ic).Add(tq.NewPOSTTask("/archiv e/cron/scan", nil), queueName); err != nil { | |
| 98 log.Fields{ | |
| 99 log.ErrorKey: err, | |
| 100 "project": p, | |
| 101 }.Errorf(c, "Failed to add scan task for project.") | |
| 102 return err | |
| 103 } | |
| 104 | |
| 105 log.Fields{ | |
| 106 "project": p, | |
| 107 }.Debugf(c, "Created scan task for project.") | |
| 108 return nil | |
| 109 } | |
| 110 } | |
| 111 }) | |
| 112 if err != nil { | |
| 113 return errors.New("failed to create all scan tasks") | |
|
Ryan Tseng
2016/04/28 20:16:03
All? Or just some?
| |
| 114 } | |
| 115 return nil | |
| 116 } | |
| 117 | |
| 118 // archiveScanTask is an individual task that scans a single project namespace | |
| 119 // for expired log streams and dispatches archival requests for them. | |
| 120 func (b *Backend) archiveScanTask(c context.Context) error { | |
| 121 log.Fields{ | |
| 122 "project": coordinator.Project(c), | |
| 123 }.Debugf(c, "Beginning archive scan task.") | |
| 124 | |
| 125 services := coordinator.GetServices(c) | |
| 126 _, cfg, err := services.Config(c) | |
| 127 if err != nil { | |
| 128 return fmt.Errorf("failed to load configuration: %v", err) | |
| 129 } | |
| 42 | 130 |
| 43 archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration() | 131 archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration() |
| 44 if archiveDelayMax <= 0 { | 132 if archiveDelayMax <= 0 { |
| 45 return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String()) | 133 return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String()) |
| 46 } | 134 } |
| 47 | 135 |
| 48 » ap, err := svc.ArchivalPublisher(c) | 136 » ap, err := services.ArchivalPublisher(c) |
| 49 if err != nil { | 137 if err != nil { |
| 50 return fmt.Errorf("failed to get archival publisher: %v", err) | 138 return fmt.Errorf("failed to get archival publisher: %v", err) |
| 51 } | 139 } |
| 52 | 140 |
| 53 threshold := clock.Now(c).UTC().Add(-archiveDelayMax) | 141 threshold := clock.Now(c).UTC().Add(-archiveDelayMax) |
| 54 log.Fields{ | 142 log.Fields{ |
| 55 "threshold": threshold, | 143 "threshold": threshold, |
| 56 }.Infof(c, "Querying for all streaming logs created before max archival threshold.") | 144 }.Infof(c, "Querying for all streaming logs created before max archival threshold.") |
| 57 | 145 |
| 58 // Query for log streams that were created <= our threshold and that are | 146 // Query for log streams that were created <= our threshold and that are |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 70 // archival immediately. | 158 // archival immediately. |
| 71 params := coordinator.ArchivalParams{ | 159 params := coordinator.ArchivalParams{ |
| 72 RequestID: info.Get(c).RequestID(), | 160 RequestID: info.Get(c).RequestID(), |
| 73 } | 161 } |
| 74 | 162 |
| 75 // Create archive tasks for our expired log streams in parallel. | 163 // Create archive tasks for our expired log streams in parallel. |
| 76 batch := b.getMultiTaskBatchSize() | 164 batch := b.getMultiTaskBatchSize() |
| 77 var tasked int32 | 165 var tasked int32 |
| 78 var failed int32 | 166 var failed int32 |
| 79 | 167 |
| 168 // | |
|
Ryan Tseng
2016/04/28 20:16:03
?
| |
| 169 | |
| 80 var ierr error | 170 var ierr error |
| 81 parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) { | 171 parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) { |
| 82 // Run a batched query across the expired log stream space. | 172 // Run a batched query across the expired log stream space. |
| 83 ierr = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(lsKey *ds.Key) error { | 173 ierr = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(lsKey *ds.Key) error { |
| 84 var ls coordinator.LogStream | 174 var ls coordinator.LogStream |
| 85 ds.PopulateKey(&ls, lsKey) | 175 ds.PopulateKey(&ls, lsKey) |
| 86 | 176 |
| 87 // Archive this log stream in a transaction. | 177 // Archive this log stream in a transaction. |
| 88 taskC <- func() error { | 178 taskC <- func() error { |
| 89 err := ds.Get(c).RunInTransaction(func(c context .Context) error { | 179 err := ds.Get(c).RunInTransaction(func(c context .Context) error { |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 147 }.Errorf(c, "Failed to archive candidate all streams.") | 237 }.Errorf(c, "Failed to archive candidate all streams.") |
| 148 return errors.New("failed to archive all candidate streams") | 238 return errors.New("failed to archive all candidate streams") |
| 149 | 239 |
| 150 default: | 240 default: |
| 151 log.Fields{ | 241 log.Fields{ |
| 152 "archiveCount": tasked, | 242 "archiveCount": tasked, |
| 153 }.Infof(c, "Archive sweep completed successfully.") | 243 }.Infof(c, "Archive sweep completed successfully.") |
| 154 return nil | 244 return nil |
| 155 } | 245 } |
| 156 } | 246 } |
| OLD | NEW |