Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2396)

Unified Diff: appengine/logdog/coordinator/backend/archiveCron.go

Issue 1844963002: Iterate archive query alongside task queue. (Closed) Base URL: https://github.com/luci/luci-go@collector-gae-classic
Patch Set: Respond to code review comments. Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/logdog/coordinator/backend/archiveCron.go
diff --git a/appengine/logdog/coordinator/backend/archiveCron.go b/appengine/logdog/coordinator/backend/archiveCron.go
index 9e8b104d74e6ba3b1c1965d86f758d8537cabe22..f3b5d79e063b6fa2a1c0435ada4a5c63b51968fd 100644
--- a/appengine/logdog/coordinator/backend/archiveCron.go
+++ b/appengine/logdog/coordinator/backend/archiveCron.go
@@ -5,23 +5,26 @@
package backend
import (
- "errors"
"fmt"
"net/http"
"time"
"github.com/julienschmidt/httprouter"
+ "github.com/luci/gae/filter/dsQueryBatch"
ds "github.com/luci/gae/service/datastore"
tq "github.com/luci/gae/service/taskqueue"
"github.com/luci/luci-go/appengine/logdog/coordinator"
"github.com/luci/luci-go/appengine/logdog/coordinator/config"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
"github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/errors"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/proto/logdog/svcconfig"
"golang.org/x/net/context"
)
+const archiveTaskVersion = "v1"
+
// archiveTaskQueueName returns the task queue name for archival, or an error
// if it's not configured.
func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) {
@@ -32,6 +35,10 @@ func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) {
return q, nil
}
+func archiveTaskNameForHash(hashID string) string {
+ return fmt.Sprintf("archive-%s-%s", hashID, archiveTaskVersion)
+}
+
// createArchiveTask creates a new archive Task.
func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, complete bool) (*tq.Task, error) {
desc := logdog.ArchiveTask{
@@ -43,7 +50,7 @@ func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, co
return nil, err
}
- t.Name = fmt.Sprintf("archive-%s", ls.HashID())
+ t.Name = archiveTaskNameForHash(ls.HashID())
return t, nil
}
@@ -121,55 +128,97 @@ func (b *Backend) archiveCron(c context.Context, complete bool) error {
}
q = q.Lte("Updated", now.Add(-threshold))
- // Query and dispatch our tasks.
- var ierr error
- count, err := b.multiTask(c, queueName, func(taskC chan<- *tq.Task) {
- ierr = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error {
+ // If the log stream has a terminal index, and its Updated time is less than
+ // the maximum archive delay, require this archival to be complete (no
+ // missing LogEntry).
+ //
+ // If we're past maximum archive delay, settle for any (even empty) archival.
+ // This is a failsafe to prevent logs from sitting in limbo forever.
+ maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
+
+ // Perform a query, dispatching tasks in batches.
+ batch := b.getMultiTaskBatchSize()
+
+ ti := tq.Get(c)
+ tasks := make([]*tq.Task, 0, batch)
+ totalScheduledTasks := 0
+ addAndMaybeDispatchTasks := func(task *tq.Task) error {
+ switch task {
+ case nil:
+ if len(tasks) == 0 {
+ return nil
+ }
+
+ default:
+ tasks = append(tasks, task)
+ if len(tasks) < batch {
+ return nil
+ }
+ }
+
+ err := ti.AddMulti(tasks, queueName)
+ if merr, ok := err.(errors.MultiError); ok {
+ for _, e := range merr {
+ log.Warningf(c, "Task add error: %v", e)
+ }
+ }
+ if err := errors.Filter(err, tq.ErrTaskAlreadyAdded); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "queue": queueName,
+ "numTasks": len(tasks),
+ "scheduledTaskCount": totalScheduledTasks,
+ }.Errorf(c, "Failed to add tasks to task queue.")
+ return errors.New("failed to add tasks to task queue")
+ }
+
+ totalScheduledTasks += len(tasks)
+ tasks = tasks[:0]
+ return nil
+ }
+
+ err = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(ls *coordinator.LogStream) error {
+ requireComplete := !now.After(ls.Updated.Add(maxDelay))
+ if !requireComplete {
+ log.Fields{
+ "path": ls.Path(),
+ "id": ls.HashID(),
+ "updatedTimestamp": ls.Updated,
+ "maxDelay": maxDelay,
+ }.Warningf(c, "Identified log stream past maximum archival delay.")
+ } else {
log.Fields{
"id": ls.HashID(),
"updated": ls.Updated.String(),
}.Infof(c, "Identified log stream ready for archival.")
+ }
- // If the log stream has a terminal index, and its Updated time is less than
- // the maximum archive delay, require this archival to be complete (no
- // missing LogEntry).
- //
- // If we're past maximum archive delay, settle for any (even empty) archival.
- // This is a failsafe to prevent logs from sitting in limbo forever.
- maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
- requireComplete := !now.After(ls.Updated.Add(maxDelay))
- if !requireComplete {
- log.Fields{
- "path": ls.Path(),
- "updatedTimestamp": ls.Updated,
- "maxDelay": maxDelay,
- }.Warningf(c, "Log stream is past maximum archival delay. Dropping completeness requirement.")
- }
-
- task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete)
- if err != nil {
- log.Fields{
- log.ErrorKey: err,
- "path": ls.Path(),
- }.Errorf(c, "Failed to create archive task.")
- return err
- }
+ task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete)
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "path": ls.Path(),
+ }.Errorf(c, "Failed to create archive task.")
+ return err
+ }
- taskC <- task
- return nil
- })
+ return addAndMaybeDispatchTasks(task)
})
- if err != nil || ierr != nil {
+ if err != nil {
log.Fields{
- log.ErrorKey: err,
- "queryErr": ierr,
- "taskCount": count,
- }.Errorf(c, "Failed to dispatch archival tasks.")
- return errors.New("failed to dispatch archival tasks")
+ log.ErrorKey: err,
+ "scheduledTaskCount": totalScheduledTasks,
+ }.Errorf(c, "Outer archive query failed.")
+ return errors.New("outer archive query failed")
+ }
+
+ // Dispatch any remaining enqueued tasks.
+ if err := addAndMaybeDispatchTasks(nil); err != nil {
+ return err
}
log.Fields{
- "taskCount": count,
+ "scheduledTaskCount": totalScheduledTasks,
}.Debugf(c, "Archive sweep completed successfully.")
return nil
}
« no previous file with comments | « no previous file | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698