Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(790)

Unified Diff: appengine/logdog/coordinator/backend/archiveCron.go

Issue 1910633006: LogDog: Support per-namespace expired archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-coordinator-svcdec
Patch Set: Bugfixes, updates, works. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/backend/archiveCron.go
diff --git a/appengine/logdog/coordinator/backend/archiveCron.go b/appengine/logdog/coordinator/backend/archiveCron.go
index f24e7233a12bedf243fe707ab18d5d3865d55654..03483901564ed39becfb0a7c49a2112d9ca94b4a 100644
--- a/appengine/logdog/coordinator/backend/archiveCron.go
+++ b/appengine/logdog/coordinator/backend/archiveCron.go
@@ -13,16 +13,16 @@ import (
"github.com/luci/gae/filter/dsQueryBatch"
ds "github.com/luci/gae/service/datastore"
"github.com/luci/gae/service/info"
+ tq "github.com/luci/gae/service/taskqueue"
"github.com/luci/luci-go/appengine/logdog/coordinator"
"github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/errors"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/parallel"
"golang.org/x/net/context"
)
-const archiveTaskVersion = "v4"
-
// HandleArchiveCron is the handler for the archive cron endpoint. This scans
// for log streams that are ready for archival.
//
@@ -33,9 +33,97 @@ func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r
})
}
+// HandleArchiveScanTask is the handler for the "/archive/cron/scan" endpoint.
+// This scans for log streams that are ready for archival for a specific project
+// namespace.
+//
+// This is tasked during a HandleArchiveCron run.
+func (b *Backend) HandleArchiveScanTask(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
+ errorWrapper(c, w, func() error {
+ return b.archiveScanTask(c)
+ })
+}
+
func (b *Backend) archiveCron(c context.Context) error {
- svc := coordinator.GetServices(c)
- _, cfg, err := svc.Config(c)
+ log.Debugf(c, "Beginning archive cron project dispatching.")
+
+ gcfg, cfg, err := coordinator.GetServices(c).Config(c)
+ if err != nil {
+ return fmt.Errorf("failed to load configuration: %v", err)
+ }
+
+ queueName := cfg.Coordinator.ArchiveScanProjectQueueName
+ if queueName == "" {
+ return errors.New(`configuration is missing "archive_scan_project_queue_name"`)
+ }
+
+ // For each luci-config project that we have registered, execute a namespace
+ // sweep.
+ projects, err := gcfg.Projects(c)
+ if err != nil {
+ return fmt.Errorf("failed to enumerate projects")
+ }
+
+ log.Fields{
+ "projectCount": len(projects),
+ }.Debugf(c, "Adding cron tasks for projects.")
+ err = parallel.FanOutIn(func(taskC chan<- func() error) {
+ // TODO(dnj): Remove this empty namespace once it's no longer supported.
+ taskC <- func() error {
+ ic := c
+ if err := coordinator.WithProjectNamespace(&ic, ""); err != nil {
+ return err
+ }
+
+ if err := tq.Get(c).Add(tq.NewPOSTTask("/archive/cron/scan", nil), queueName); err != nil {
+ log.WithError(err).Errorf(c, "Failed to add non-namesapced scan task.")
Ryan Tseng 2016/04/28 20:16:03 namespaced
+ return err
+ }
+ return nil
+ }
+
+ for _, p := range projects {
+ p := p
+ log.Fields{
+ "project": p,
+ }.Debugf(c, "Creating archive scan task for project.")
+
+ taskC <- func() error {
+ ic := c
Ryan Tseng 2016/04/28 20:16:03 Maybe add small comment about why the context is b
+ if err := coordinator.WithProjectNamespace(&ic, config.ProjectName(p)); err != nil {
+ return err
+ }
+
+ if err := tq.Get(ic).Add(tq.NewPOSTTask("/archive/cron/scan", nil), queueName); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "project": p,
+ }.Errorf(c, "Failed to add scan task for project.")
+ return err
+ }
+
+ log.Fields{
+ "project": p,
+ }.Debugf(c, "Created scan task for project.")
+ return nil
+ }
+ }
+ })
+ if err != nil {
+ return errors.New("failed to create all scan tasks")
Ryan Tseng 2016/04/28 20:16:03 All? Or just some?
+ }
+ return nil
+}
+
+// archiveScanTask is an individual task that scans a single project namespace
+// for expired log streams and dispatches archival requests for them.
+func (b *Backend) archiveScanTask(c context.Context) error {
+ log.Fields{
+ "project": coordinator.Project(c),
+ }.Debugf(c, "Beginning archive scan task.")
+
+ services := coordinator.GetServices(c)
+ _, cfg, err := services.Config(c)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
@@ -45,7 +133,7 @@ func (b *Backend) archiveCron(c context.Context) error {
return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String())
}
- ap, err := svc.ArchivalPublisher(c)
+ ap, err := services.ArchivalPublisher(c)
if err != nil {
return fmt.Errorf("failed to get archival publisher: %v", err)
}
@@ -77,6 +165,8 @@ func (b *Backend) archiveCron(c context.Context) error {
var tasked int32
var failed int32
+ //
Ryan Tseng 2016/04/28 20:16:03 ?
+
var ierr error
parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) {
// Run a batched query across the expired log stream space.

Powered by Google App Engine
This is Rietveld 408576698