Chromium Code Reviews| Index: appengine/logdog/coordinator/backend/archiveCron.go |
| diff --git a/appengine/logdog/coordinator/backend/archiveCron.go b/appengine/logdog/coordinator/backend/archiveCron.go |
| index f24e7233a12bedf243fe707ab18d5d3865d55654..03483901564ed39becfb0a7c49a2112d9ca94b4a 100644 |
| --- a/appengine/logdog/coordinator/backend/archiveCron.go |
| +++ b/appengine/logdog/coordinator/backend/archiveCron.go |
| @@ -13,16 +13,16 @@ import ( |
| "github.com/luci/gae/filter/dsQueryBatch" |
| ds "github.com/luci/gae/service/datastore" |
| "github.com/luci/gae/service/info" |
| + tq "github.com/luci/gae/service/taskqueue" |
| "github.com/luci/luci-go/appengine/logdog/coordinator" |
| "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/config" |
| "github.com/luci/luci-go/common/errors" |
| log "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/parallel" |
| "golang.org/x/net/context" |
| ) |
| -const archiveTaskVersion = "v4" |
| - |
| // HandleArchiveCron is the handler for the archive cron endpoint. This scans |
| // for log streams that are ready for archival. |
| // |
| @@ -33,9 +33,97 @@ func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r |
| }) |
| } |
| +// HandleArchiveScanTask is the handler for the "/archive/cron/scan" endpoint. |
| +// This scans for log streams that are ready for archival for a specific project |
| +// namespace. |
| +// |
| +// This is tasked during a HandleArchiveCron run. |
| +func (b *Backend) HandleArchiveScanTask(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { |
| + errorWrapper(c, w, func() error { |
| + return b.archiveScanTask(c) |
| + }) |
| +} |
| + |
| func (b *Backend) archiveCron(c context.Context) error { |
| - svc := coordinator.GetServices(c) |
| - _, cfg, err := svc.Config(c) |
| + log.Debugf(c, "Beginning archive cron project dispatching.") |
| + |
| + gcfg, cfg, err := coordinator.GetServices(c).Config(c) |
| + if err != nil { |
| + return fmt.Errorf("failed to load configuration: %v", err) |
| + } |
| + |
| + queueName := cfg.Coordinator.ArchiveScanProjectQueueName |
| + if queueName == "" { |
| + return errors.New(`configuration is missing "archive_scan_project_queue_name"`) |
| + } |
| + |
| + // For each luci-config project that we have registered, execute a namespace |
| + // sweep. |
| + projects, err := gcfg.Projects(c) |
| + if err != nil { |
| + return fmt.Errorf("failed to enumerate projects") |
| + } |
| + |
| + log.Fields{ |
| + "projectCount": len(projects), |
| + }.Debugf(c, "Adding cron tasks for projects.") |
| + err = parallel.FanOutIn(func(taskC chan<- func() error) { |
| + // TODO(dnj): Remove this empty namespace once it's no longer supported. |
| + taskC <- func() error { |
| + ic := c |
| + if err := coordinator.WithProjectNamespace(&ic, ""); err != nil { |
| + return err |
| + } |
| + |
| + if err := tq.Get(c).Add(tq.NewPOSTTask("/archive/cron/scan", nil), queueName); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to add non-namesapced scan task.") |
|
Ryan Tseng
2016/04/28 20:16:03
namespaced
|
| + return err |
| + } |
| + return nil |
| + } |
| + |
| + for _, p := range projects { |
| + p := p |
| + log.Fields{ |
| + "project": p, |
| + }.Debugf(c, "Creating archive scan task for project.") |
| + |
| + taskC <- func() error { |
| + ic := c |
|
Ryan Tseng
2016/04/28 20:16:03
Maybe add small comment about why the context is b
|
| + if err := coordinator.WithProjectNamespace(&ic, config.ProjectName(p)); err != nil { |
| + return err |
| + } |
| + |
| + if err := tq.Get(ic).Add(tq.NewPOSTTask("/archive/cron/scan", nil), queueName); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "project": p, |
| + }.Errorf(c, "Failed to add scan task for project.") |
| + return err |
| + } |
| + |
| + log.Fields{ |
| + "project": p, |
| + }.Debugf(c, "Created scan task for project.") |
| + return nil |
| + } |
| + } |
| + }) |
| + if err != nil { |
| + return errors.New("failed to create all scan tasks") |
|
Ryan Tseng
2016/04/28 20:16:03
All? Or just some?
|
| + } |
| + return nil |
| +} |
| + |
| +// archiveScanTask is an individual task that scans a single project namespace |
| +// for expired log streams and dispatches archival requests for them. |
| +func (b *Backend) archiveScanTask(c context.Context) error { |
| + log.Fields{ |
| + "project": coordinator.Project(c), |
| + }.Debugf(c, "Beginning archive scan task.") |
| + |
| + services := coordinator.GetServices(c) |
| + _, cfg, err := services.Config(c) |
| if err != nil { |
| return fmt.Errorf("failed to load configuration: %v", err) |
| } |
| @@ -45,7 +133,7 @@ func (b *Backend) archiveCron(c context.Context) error { |
| return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String()) |
| } |
| - ap, err := svc.ArchivalPublisher(c) |
| + ap, err := services.ArchivalPublisher(c) |
| if err != nil { |
| return fmt.Errorf("failed to get archival publisher: %v", err) |
| } |
| @@ -77,6 +165,8 @@ func (b *Backend) archiveCron(c context.Context) error { |
| var tasked int32 |
| var failed int32 |
| + // |
|
Ryan Tseng
2016/04/28 20:16:03
?
|
| + |
| var ierr error |
| parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) { |
| // Run a batched query across the expired log stream space. |