Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1037)

Unified Diff: appengine/logdog/coordinator/backend/archiveCron.go

Issue 1844963002: Iterate archive query alongside task queue. (Closed) Base URL: https://github.com/luci/luci-go@collector-gae-classic
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/logdog/coordinator/backend/archiveCron.go
diff --git a/appengine/logdog/coordinator/backend/archiveCron.go b/appengine/logdog/coordinator/backend/archiveCron.go
index 9e8b104d74e6ba3b1c1965d86f758d8537cabe22..c5129c79581a1e2e4f4d64711b5007ead083df1f 100644
--- a/appengine/logdog/coordinator/backend/archiveCron.go
+++ b/appengine/logdog/coordinator/backend/archiveCron.go
@@ -5,7 +5,6 @@
package backend
import (
- "errors"
"fmt"
"net/http"
"time"
@@ -17,6 +16,7 @@ import (
"github.com/luci/luci-go/appengine/logdog/coordinator/config"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
"github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/errors"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/proto/logdog/svcconfig"
"golang.org/x/net/context"
@@ -108,6 +108,14 @@ func (b *Backend) archiveCron(c context.Context, complete bool) error {
return err
}
+ // If the log stream has a terminal index, and its Updated time is less than
+ // the maximum archive delay, require this archival to be complete (no
+ // missing LogEntry).
+ //
+ // If we're past maximum archive delay, settle for any (even empty) archival.
+ // This is a failsafe to prevent logs from sitting in limbo forever.
+ maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
+
now := clock.Now(c).UTC()
q := ds.NewQuery("LogStream")
@@ -121,22 +129,32 @@ func (b *Backend) archiveCron(c context.Context, complete bool) error {
}
q = q.Lte("Updated", now.Add(-threshold))
- // Query and dispatch our tasks.
- var ierr error
- count, err := b.multiTask(c, queueName, func(taskC chan<- *tq.Task) {
- ierr = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error {
+ // We will enqueue tasks in batches, and there's no need to retrieve more than
+ // our batch size.
+ batch := b.getMultiTaskBatchSize()
+ tasks := make([]*tq.Task, 0, batch)
+ q = q.Limit(int32(batch))
+
+ // Perform an iterative query, dispatching tasks with each round.
+ di := ds.Get(c)
+ ti := tq.Get(c)
+
+ totalScheduledTasks := 0
+
+ var next ds.Cursor
+ for {
+ iterQuery := q
+ if next != nil {
+ iterQuery = iterQuery.Start(next)
+ next = nil
+ }
+
+ err = di.Run(iterQuery, func(ls *coordinator.LogStream, cb ds.CursorCB) error {
iannucci 2016/03/30 19:04:27 I would just loop in the query and dispatch the ta
dnj 2016/03/30 19:10:46 Done.
log.Fields{
"id": ls.HashID(),
"updated": ls.Updated.String(),
}.Infof(c, "Identified log stream ready for archival.")
- // If the log stream has a terminal index, and its Updated time is less than
- // the maximum archive delay, require this archival to be complete (no
- // missing LogEntry).
- //
- // If we're past maximum archive delay, settle for any (even empty) archival.
- // This is a failsafe to prevent logs from sitting in limbo forever.
- maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
requireComplete := !now.After(ls.Updated.Add(maxDelay))
if !requireComplete {
log.Fields{
@@ -155,21 +173,50 @@ func (b *Backend) archiveCron(c context.Context, complete bool) error {
return err
}
- taskC <- task
+ tasks = append(tasks, task)
+
+ // If we're at or over our batch size, break and enqueue the tasks.
+ if len(tasks) >= batch {
+ next, err = cb()
+ if err != nil {
+ return fmt.Errorf("failed to get cursor: %v", err)
+ }
+ return ds.Stop
+ }
return nil
})
- })
- if err != nil || ierr != nil {
- log.Fields{
- log.ErrorKey: err,
- "queryErr": ierr,
- "taskCount": count,
- }.Errorf(c, "Failed to dispatch archival tasks.")
- return errors.New("failed to dispatch archival tasks")
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "scheduledTaskCount": totalScheduledTasks,
+ }.Errorf(c, "Failed to query for archival tasks.")
+ return errors.New("failed to dispatch archival tasks")
+ }
+
+ // Dispatch the accumulated tasks.
+ if len(tasks) > 0 {
+ if err := errors.Filter(ti.AddMulti(tasks, queueName), tq.ErrTaskAlreadyAdded); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "queue": queueName,
+ "numTasks": len(tasks),
+ "scheduledTaskCount": totalScheduledTasks,
+ }.Errorf(c, "Failed to add tasks to task queue.")
+ return errors.New("failed to add tasks to task queue")
+ }
+
+ totalScheduledTasks += len(tasks)
+ tasks = tasks[:0]
+ }
+
+ // If there is no query cursor, we're done.
+ if next == nil {
+ break
+ }
}
log.Fields{
- "taskCount": count,
+ "scheduledTaskCount": totalScheduledTasks,
}.Debugf(c, "Archive sweep completed successfully.")
return nil
}
« no previous file with comments | « no previous file | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698