Chromium Code Reviews| Index: appengine/logdog/coordinator/backend/archiveCron.go |
| diff --git a/appengine/logdog/coordinator/backend/archiveCron.go b/appengine/logdog/coordinator/backend/archiveCron.go |
| index 9e8b104d74e6ba3b1c1965d86f758d8537cabe22..c5129c79581a1e2e4f4d64711b5007ead083df1f 100644 |
| --- a/appengine/logdog/coordinator/backend/archiveCron.go |
| +++ b/appengine/logdog/coordinator/backend/archiveCron.go |
| @@ -5,7 +5,6 @@ |
| package backend |
| import ( |
| - "errors" |
| "fmt" |
| "net/http" |
| "time" |
| @@ -17,6 +16,7 @@ import ( |
| "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/errors" |
| log "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
| "golang.org/x/net/context" |
| @@ -108,6 +108,14 @@ func (b *Backend) archiveCron(c context.Context, complete bool) error { |
| return err |
| } |
| + // If the log stream has a terminal index, and its Updated time is less than |
| + // the maximum archive delay, require this archival to be complete (no |
| + // missing LogEntry). |
| + // |
| + // If we're past maximum archive delay, settle for any (even empty) archival. |
| + // This is a failsafe to prevent logs from sitting in limbo forever. |
| + maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration() |
| + |
| now := clock.Now(c).UTC() |
| q := ds.NewQuery("LogStream") |
| @@ -121,22 +129,32 @@ func (b *Backend) archiveCron(c context.Context, complete bool) error { |
| } |
| q = q.Lte("Updated", now.Add(-threshold)) |
| - // Query and dispatch our tasks. |
| - var ierr error |
| - count, err := b.multiTask(c, queueName, func(taskC chan<- *tq.Task) { |
| - ierr = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error { |
| + // We will enqueue tasks in batches, and there's no need to retrieve more than |
| + // our batch size. |
| + batch := b.getMultiTaskBatchSize() |
| + tasks := make([]*tq.Task, 0, batch) |
| + q = q.Limit(int32(batch)) |
| + |
| + // Perform an iterative query, dispatching tasks with each round. |
| + di := ds.Get(c) |
| + ti := tq.Get(c) |
| + |
| + totalScheduledTasks := 0 |
| + |
| + var next ds.Cursor |
| + for { |
| + iterQuery := q |
| + if next != nil { |
| + iterQuery = iterQuery.Start(next) |
| + next = nil |
| + } |
| + |
| + err = di.Run(iterQuery, func(ls *coordinator.LogStream, cb ds.CursorCB) error { |
|
iannucci
2016/03/30 19:04:27
I would just loop in the query and dispatch the ta
dnj
2016/03/30 19:10:46
Done.
|
| log.Fields{ |
| "id": ls.HashID(), |
| "updated": ls.Updated.String(), |
| }.Infof(c, "Identified log stream ready for archival.") |
| - // If the log stream has a terminal index, and its Updated time is less than |
| - // the maximum archive delay, require this archival to be complete (no |
| - // missing LogEntry). |
| - // |
| - // If we're past maximum archive delay, settle for any (even empty) archival. |
| - // This is a failsafe to prevent logs from sitting in limbo forever. |
| - maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration() |
| requireComplete := !now.After(ls.Updated.Add(maxDelay)) |
| if !requireComplete { |
| log.Fields{ |
| @@ -155,21 +173,50 @@ func (b *Backend) archiveCron(c context.Context, complete bool) error { |
| return err |
| } |
| - taskC <- task |
| + tasks = append(tasks, task) |
| + |
| + // If we're at or over our batch size, break and enqueue the tasks. |
| + if len(tasks) >= batch { |
| + next, err = cb() |
| + if err != nil { |
| + return fmt.Errorf("failed to get cursor: %v", err) |
| + } |
| + return ds.Stop |
| + } |
| return nil |
| }) |
| - }) |
| - if err != nil || ierr != nil { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "queryErr": ierr, |
| - "taskCount": count, |
| - }.Errorf(c, "Failed to dispatch archival tasks.") |
| - return errors.New("failed to dispatch archival tasks") |
| + if err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "scheduledTaskCount": totalScheduledTasks, |
| + }.Errorf(c, "Failed to query for archival tasks.") |
| + return errors.New("failed to dispatch archival tasks") |
| + } |
| + |
| + // Dispatch the accumulated tasks. |
| + if len(tasks) > 0 { |
| + if err := errors.Filter(ti.AddMulti(tasks, queueName), tq.ErrTaskAlreadyAdded); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "queue": queueName, |
| + "numTasks": len(tasks), |
| + "scheduledTaskCount": totalScheduledTasks, |
| + }.Errorf(c, "Failed to add tasks to task queue.") |
| + return errors.New("failed to add tasks to task queue") |
| + } |
| + |
| + totalScheduledTasks += len(tasks) |
| + tasks = tasks[:0] |
| + } |
| + |
| + // If there is no query cursor, we're done. |
| + if next == nil { |
| + break |
| + } |
| } |
| log.Fields{ |
| - "taskCount": count, |
| + "scheduledTaskCount": totalScheduledTasks, |
| }.Debugf(c, "Archive sweep completed successfully.") |
| return nil |
| } |