Chromium Code Reviews| Index: appengine/logdog/coordinator/backend/archiveCron.go |
| diff --git a/appengine/logdog/coordinator/backend/archiveCron.go b/appengine/logdog/coordinator/backend/archiveCron.go |
| index f3b5d79e063b6fa2a1c0435ada4a5c63b51968fd..8c113596f98927ee221fa314b5e9319ec3bbc84d 100644 |
| --- a/appengine/logdog/coordinator/backend/archiveCron.go |
| +++ b/appengine/logdog/coordinator/backend/archiveCron.go |
| @@ -7,7 +7,7 @@ package backend |
| import ( |
| "fmt" |
| "net/http" |
| - "time" |
| + "sync/atomic" |
| "github.com/julienschmidt/httprouter" |
| "github.com/luci/gae/filter/dsQueryBatch" |
| @@ -15,15 +15,15 @@ import ( |
| tq "github.com/luci/gae/service/taskqueue" |
| "github.com/luci/luci-go/appengine/logdog/coordinator" |
| "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
| - "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| "github.com/luci/luci-go/common/clock" |
| "github.com/luci/luci-go/common/errors" |
| log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/parallel" |
| "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
| "golang.org/x/net/context" |
| ) |
| -const archiveTaskVersion = "v1" |
| +const archiveTaskVersion = "v4" |
| // archiveTaskQueueName returns the task queue name for archival, or an error |
| // if it's not configured. |
| @@ -35,190 +35,123 @@ func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) { |
| return q, nil |
| } |
| -func archiveTaskNameForHash(hashID string) string { |
| - return fmt.Sprintf("archive-%s-%s", hashID, archiveTaskVersion) |
| -} |
| - |
| -// createArchiveTask creates a new archive Task. |
| -func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, complete bool) (*tq.Task, error) { |
| - desc := logdog.ArchiveTask{ |
| - Path: string(ls.Path()), |
| - Complete: complete, |
| - } |
| - t, err := createPullTask(&desc) |
| - if err != nil { |
| - return nil, err |
| - } |
| - |
| - t.Name = archiveTaskNameForHash(ls.HashID()) |
| - return t, nil |
| -} |
| - |
| // HandleArchiveCron is the handler for the archive cron endpoint. This scans |
| -// for terminal log streams that are ready for archival. |
| +// for log streams that are ready for archival. |
| // |
| // This will be called periodically by AppEngine cron. |
| func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { |
| errorWrapper(c, w, func() error { |
| - return b.archiveCron(c, true) |
| + return b.archiveCron(c) |
| }) |
| } |
| -// HandleArchiveCronNT is the handler for the archive non-terminal cron |
| -// endpoint. This scans for non-terminal log streams that have not been updated |
| -// in sufficiently long that we're willing to declare them complete and mark |
| -// them terminal. |
| -// |
| -// This will be called periodically by AppEngine cron. |
| -func (b *Backend) HandleArchiveCronNT(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { |
| - errorWrapper(c, w, func() error { |
| - return b.archiveCron(c, false) |
| - }) |
| -} |
| - |
| -// HandleArchiveCronPurge purges all archival tasks from the task queue. |
| -func (b *Backend) HandleArchiveCronPurge(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { |
| - errorWrapper(c, w, func() error { |
| - cfg, err := config.Load(c) |
| - if err != nil { |
| - log.WithError(err).Errorf(c, "Failed to load configuration.") |
| - return err |
| - } |
| - |
| - queueName, err := archiveTaskQueueName(cfg) |
| - if err != nil { |
| - log.Errorf(c, "Failed to get task queue name.") |
| - return err |
| - } |
| - |
| - if err := tq.Get(c).Purge(queueName); err != nil { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "queue": queueName, |
| - }.Errorf(c, "Failed to purge task queue.") |
| - return err |
| - } |
| - return nil |
| - }) |
| -} |
| - |
| -func (b *Backend) archiveCron(c context.Context, complete bool) error { |
| +func (b *Backend) archiveCron(c context.Context) error { |
| cfg, err := config.Load(c) |
| if err != nil { |
| - log.WithError(err).Errorf(c, "Failed to load configuration.") |
| - return err |
| + return fmt.Errorf("failed to load configuration: %v", err) |
| } |
| + ccfg := cfg.GetCoordinator() |
| - queueName, err := archiveTaskQueueName(cfg) |
| - if err != nil { |
| - log.Errorf(c, "Failed to get task queue name.") |
| - return err |
| + if ccfg.ArchiveTaskQueue == "" { |
| + return errors.New("missing archival task queue name") |
| } |
| - now := clock.Now(c).UTC() |
| - q := ds.NewQuery("LogStream") |
| - |
| - var threshold time.Duration |
| - if complete { |
| - threshold = cfg.GetCoordinator().ArchiveDelay.Duration() |
| - q = q.Eq("State", coordinator.LSTerminated) |
| - } else { |
| - threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration() |
| - q = q.Eq("State", coordinator.LSPending) |
| + archiveDelayMax := ccfg.ArchiveDelayMax.Duration() |
| + if archiveDelayMax <= 0 { |
| + return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String()) |
| } |
| - q = q.Lte("Updated", now.Add(-threshold)) |
| - // If the log stream has a terminal index, and its Updated time is less than |
| - // the maximum archive delay, require this archival to be complete (no |
| - // missing LogEntry). |
| - // |
| - // If we're past maximum archive delay, settle for any (even empty) archival. |
| - // This is a failsafe to prevent logs from sitting in limbo forever. |
| - maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration() |
| + threshold := clock.Now(c).UTC().Add(-archiveDelayMax) |
| + log.Fields{ |
| + "threshold": threshold, |
| + }.Infof(c, "Querying for all streaming logs created before max archival threshold.") |
| - // Perform a query, dispatching tasks in batches. |
| + // Query for log streams that were created <= our threshold and that are |
| + // still in LSStreaming state. |
| + // |
| + // We order descending because this is already an index that we use for our |
| + // "logdog.Logs.Query". |
| + q := ds.NewQuery("LogStream"). |
| + Eq("State", coordinator.LSStreaming). |
| + Lte("Created", threshold). |
| + Order("-Created", "State") |
| + |
| + // Since these logs are beyond maximum archival delay, we will dispatch |
| + // archival immediately. |
| + params := coordinator.ArchivalParams{} |
| + |
| + // Create archive tasks for our expired log streams in parallel. |
| batch := b.getMultiTaskBatchSize() |
| + var tasked int32 |
| + var failed int32 |
| - ti := tq.Get(c) |
| - tasks := make([]*tq.Task, 0, batch) |
| - totalScheduledTasks := 0 |
| - addAndMaybeDispatchTasks := func(task *tq.Task) error { |
| - switch task { |
| - case nil: |
| - if len(tasks) == 0 { |
| - return nil |
| - } |
| - |
| - default: |
| - tasks = append(tasks, task) |
| - if len(tasks) < batch { |
| + var ierr error |
| + parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) { |
| + // Run a batched query across the expired log stream space. |
| + ierr = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(ls *coordinator.LogStream) error { |
| + log.Fields{ |
| + "path": ls.Path(), |
| + "id": ls.HashID(), |
| + }.Infof(c, "Identified expired log stream.") |
| + |
| + // Archive this log stream in a transaction. |
| + taskC <- func() error { |
| + err := ds.Get(c).RunInTransaction(func(c context.Context) error { |
| + createdTask, err := params.CreateTask(tq.Get(c), ls, ccfg.ArchiveTaskQueue) |
|
Vadim Sh.
2016/04/07 01:21:32
nit: you can check ls.State outside the transactio
dnj
2016/04/11 17:20:03
Is this worth doing (and incurring another write c
|
| + if err != nil { |
| + return err |
| + } |
| + |
| + if !createdTask { |
| + return nil |
| + } |
| + return ds.Get(c).Put(ls) |
| + }, nil) |
| + |
| + if err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "path": ls.Path(), |
| + }.Errorf(c, "Failed to archive log stream.") |
| + atomic.AddInt32(&failed, 1) |
| + return nil // Nothing will consume it anyway. |
| + } |
| + |
| + log.Fields{ |
| + "path": ls.Path(), |
| + "id": ls.HashID(), |
| + "archiveTask": ls.ArchiveTaskName, |
| + }.Infof(c, "Created archive task.") |
| + atomic.AddInt32(&tasked, 1) |
| return nil |
| } |
| - } |
| - |
| - err := ti.AddMulti(tasks, queueName) |
| - if merr, ok := err.(errors.MultiError); ok { |
| - for _, e := range merr { |
| - log.Warningf(c, "Task add error: %v", e) |
| - } |
| - } |
| - if err := errors.Filter(err, tq.ErrTaskAlreadyAdded); err != nil { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "queue": queueName, |
| - "numTasks": len(tasks), |
| - "scheduledTaskCount": totalScheduledTasks, |
| - }.Errorf(c, "Failed to add tasks to task queue.") |
| - return errors.New("failed to add tasks to task queue") |
| - } |
| - |
| - totalScheduledTasks += len(tasks) |
| - tasks = tasks[:0] |
| - return nil |
| - } |
| - err = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(ls *coordinator.LogStream) error { |
| - requireComplete := !now.After(ls.Updated.Add(maxDelay)) |
| - if !requireComplete { |
| - log.Fields{ |
| - "path": ls.Path(), |
| - "id": ls.HashID(), |
| - "updatedTimestamp": ls.Updated, |
| - "maxDelay": maxDelay, |
| - }.Warningf(c, "Identified log stream past maximum archival delay.") |
| - } else { |
| - log.Fields{ |
| - "id": ls.HashID(), |
| - "updated": ls.Updated.String(), |
| - }.Infof(c, "Identified log stream ready for archival.") |
| - } |
| + return nil |
| + }) |
| + })) |
| - task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete) |
| - if err != nil { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "path": ls.Path(), |
| - }.Errorf(c, "Failed to create archive task.") |
| - return err |
| - } |
| + // Return an error code if we experienced any failures. This doesn't really |
| + // have an impact, but it will show up as a "!" in the cron UI. |
| + switch { |
| + case ierr != nil: |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "archiveCount": tasked, |
| + }.Errorf(c, "Failed to execute expired tasks query.") |
| + return ierr |
| - return addAndMaybeDispatchTasks(task) |
| - }) |
| - if err != nil { |
| + case failed > 0: |
| log.Fields{ |
| - log.ErrorKey: err, |
| - "scheduledTaskCount": totalScheduledTasks, |
| - }.Errorf(c, "Outer archive query failed.") |
| - return errors.New("outer archive query failed") |
| - } |
| + log.ErrorKey: err, |
| + "archiveCount": tasked, |
| + "failCount": failed, |
| + }.Errorf(c, "Failed to archive candidate all streams.") |
| + return errors.New("failed to archive all candidate streams") |
| - // Dispatch any remaining enqueued tasks. |
| - if err := addAndMaybeDispatchTasks(nil); err != nil { |
| - return err |
| + default: |
| + log.Fields{ |
| + "archiveCount": tasked, |
| + }.Infof(c, "Archive sweep completed successfully.") |
| + return nil |
| } |
| - |
| - log.Fields{ |
| - "scheduledTaskCount": totalScheduledTasks, |
| - }.Debugf(c, "Archive sweep completed successfully.") |
| - return nil |
| } |