| Index: appengine/logdog/coordinator/backend/archiveCron.go
|
| diff --git a/appengine/logdog/coordinator/backend/archiveCron.go b/appengine/logdog/coordinator/backend/archiveCron.go
|
| index f3b5d79e063b6fa2a1c0435ada4a5c63b51968fd..d06558df20c8e70814d1fd72e01f86bee11ba561 100644
|
| --- a/appengine/logdog/coordinator/backend/archiveCron.go
|
| +++ b/appengine/logdog/coordinator/backend/archiveCron.go
|
| @@ -7,218 +7,150 @@ package backend
|
| import (
|
| "fmt"
|
| "net/http"
|
| - "time"
|
| + "sync/atomic"
|
|
|
| "github.com/julienschmidt/httprouter"
|
| "github.com/luci/gae/filter/dsQueryBatch"
|
| ds "github.com/luci/gae/service/datastore"
|
| - tq "github.com/luci/gae/service/taskqueue"
|
| + "github.com/luci/gae/service/info"
|
| "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| - "github.com/luci/luci-go/appengine/logdog/coordinator/config"
|
| - "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/errors"
|
| log "github.com/luci/luci-go/common/logging"
|
| - "github.com/luci/luci-go/common/proto/logdog/svcconfig"
|
| + "github.com/luci/luci-go/common/parallel"
|
| "golang.org/x/net/context"
|
| )
|
|
|
| -const archiveTaskVersion = "v1"
|
| -
|
| -// archiveTaskQueueName returns the task queue name for archival, or an error
|
| -// if it's not configured.
|
| -func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) {
|
| - q := cfg.GetCoordinator().ArchiveTaskQueue
|
| - if q == "" {
|
| - return "", errors.New("missing archive task queue name")
|
| - }
|
| - return q, nil
|
| -}
|
| -
|
| -func archiveTaskNameForHash(hashID string) string {
|
| - return fmt.Sprintf("archive-%s-%s", hashID, archiveTaskVersion)
|
| -}
|
| -
|
| -// createArchiveTask creates a new archive Task.
|
| -func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, complete bool) (*tq.Task, error) {
|
| - desc := logdog.ArchiveTask{
|
| - Path: string(ls.Path()),
|
| - Complete: complete,
|
| - }
|
| - t, err := createPullTask(&desc)
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| -
|
| - t.Name = archiveTaskNameForHash(ls.HashID())
|
| - return t, nil
|
| -}
|
| +const archiveTaskVersion = "v4"
|
|
|
| // HandleArchiveCron is the handler for the archive cron endpoint. This scans
|
| -// for terminal log streams that are ready for archival.
|
| +// for log streams that are ready for archival.
|
| //
|
| // This will be called periodically by AppEngine cron.
|
| func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
|
| errorWrapper(c, w, func() error {
|
| - return b.archiveCron(c, true)
|
| - })
|
| -}
|
| -
|
| -// HandleArchiveCronNT is the handler for the archive non-terminal cron
|
| -// endpoint. This scans for non-terminal log streams that have not been updated
|
| -// in sufficiently long that we're willing to declare them complete and mark
|
| -// them terminal.
|
| -//
|
| -// This will be called periodically by AppEngine cron.
|
| -func (b *Backend) HandleArchiveCronNT(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
|
| - errorWrapper(c, w, func() error {
|
| - return b.archiveCron(c, false)
|
| - })
|
| -}
|
| -
|
| -// HandleArchiveCronPurge purges all archival tasks from the task queue.
|
| -func (b *Backend) HandleArchiveCronPurge(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
|
| - errorWrapper(c, w, func() error {
|
| - cfg, err := config.Load(c)
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to load configuration.")
|
| - return err
|
| - }
|
| -
|
| - queueName, err := archiveTaskQueueName(cfg)
|
| - if err != nil {
|
| - log.Errorf(c, "Failed to get task queue name.")
|
| - return err
|
| - }
|
| -
|
| - if err := tq.Get(c).Purge(queueName); err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "queue": queueName,
|
| - }.Errorf(c, "Failed to purge task queue.")
|
| - return err
|
| - }
|
| - return nil
|
| + return b.archiveCron(c)
|
| })
|
| }
|
|
|
| -func (b *Backend) archiveCron(c context.Context, complete bool) error {
|
| - cfg, err := config.Load(c)
|
| +func (b *Backend) archiveCron(c context.Context) error {
|
| + services := b.GetServices()
|
| + _, cfg, err := services.Config(c)
|
| if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to load configuration.")
|
| - return err
|
| + return fmt.Errorf("failed to load configuration: %v", err)
|
| }
|
|
|
| - queueName, err := archiveTaskQueueName(cfg)
|
| - if err != nil {
|
| - log.Errorf(c, "Failed to get task queue name.")
|
| - return err
|
| + archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration()
|
| + if archiveDelayMax <= 0 {
|
| + return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String())
|
| }
|
|
|
| - now := clock.Now(c).UTC()
|
| - q := ds.NewQuery("LogStream")
|
| -
|
| - var threshold time.Duration
|
| - if complete {
|
| - threshold = cfg.GetCoordinator().ArchiveDelay.Duration()
|
| - q = q.Eq("State", coordinator.LSTerminated)
|
| - } else {
|
| - threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration()
|
| - q = q.Eq("State", coordinator.LSPending)
|
| + ap, err := services.ArchivalPublisher(c)
|
| + if err != nil {
|
| + return fmt.Errorf("failed to get archival publisher: %v", err)
|
| }
|
| - q = q.Lte("Updated", now.Add(-threshold))
|
|
|
| - // If the log stream has a terminal index, and its Updated time is less than
|
| - // the maximum archive delay, require this archival to be complete (no
|
| - // missing LogEntry).
|
| + threshold := clock.Now(c).UTC().Add(-archiveDelayMax)
|
| + log.Fields{
|
| + "threshold": threshold,
|
| + }.Infof(c, "Querying for all streaming logs created before max archival threshold.")
|
| +
|
| + // Query for log streams that were created <= our threshold and that are
|
| + // still in LSStreaming state.
|
| //
|
| - // If we're past maximum archive delay, settle for any (even empty) archival.
|
| - // This is a failsafe to prevent logs from sitting in limbo forever.
|
| - maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
|
| + // We order descending because this is already an index that we use for our
|
| + // "logdog.Logs.Query".
|
| + q := ds.NewQuery("LogStream").
|
| + KeysOnly(true).
|
| + Eq("State", coordinator.LSStreaming).
|
| + Lte("Created", threshold).
|
| + Order("-Created", "State")
|
| +
|
| + // Since these logs are beyond maximum archival delay, we will dispatch
|
| + // archival immediately.
|
| + params := coordinator.ArchivalParams{
|
| + RequestID: info.Get(c).RequestID(),
|
| + }
|
|
|
| - // Perform a query, dispatching tasks in batches.
|
| + // Create archive tasks for our expired log streams in parallel.
|
| batch := b.getMultiTaskBatchSize()
|
| -
|
| - ti := tq.Get(c)
|
| - tasks := make([]*tq.Task, 0, batch)
|
| - totalScheduledTasks := 0
|
| - addAndMaybeDispatchTasks := func(task *tq.Task) error {
|
| - switch task {
|
| - case nil:
|
| - if len(tasks) == 0 {
|
| + var tasked int32
|
| + var failed int32
|
| +
|
| + var ierr error
|
| + parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) {
|
| + // Run a batched query across the expired log stream space.
|
| + ierr = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(lsKey *ds.Key) error {
|
| + var ls coordinator.LogStream
|
| + ds.PopulateKey(&ls, lsKey)
|
| +
|
| + // Archive this log stream in a transaction.
|
| + taskC <- func() error {
|
| + err := ds.Get(c).RunInTransaction(func(c context.Context) error {
|
| + if err := ds.Get(c).Get(&ls); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to load stream.")
|
| + return err
|
| + }
|
| +
|
| + log.Fields{
|
| + "path": ls.Path(),
|
| + "id": ls.HashID,
|
| + }.Infof(c, "Identified expired log stream.")
|
| +
|
| + if err := params.PublishTask(c, ap, &ls); err != nil {
|
| + if err == coordinator.ErrArchiveTasked {
|
| + log.Warningf(c, "Archival has already been tasked for this stream.")
|
| + return nil
|
| + }
|
| + return err
|
| + }
|
| + return ds.Get(c).Put(&ls)
|
| + }, nil)
|
| +
|
| + if err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "path": ls.Path(),
|
| + }.Errorf(c, "Failed to archive log stream.")
|
| + atomic.AddInt32(&failed, 1)
|
| + return nil // Nothing will consume it anyway.
|
| + }
|
| +
|
| + log.Fields{
|
| + "path": ls.Path(),
|
| + "id": ls.HashID,
|
| + "archiveTopic": cfg.Coordinator.ArchiveTopic,
|
| + }.Infof(c, "Created archive task.")
|
| + atomic.AddInt32(&tasked, 1)
|
| return nil
|
| }
|
|
|
| - default:
|
| - tasks = append(tasks, task)
|
| - if len(tasks) < batch {
|
| - return nil
|
| - }
|
| - }
|
| + return nil
|
| + })
|
| + }))
|
|
|
| - err := ti.AddMulti(tasks, queueName)
|
| - if merr, ok := err.(errors.MultiError); ok {
|
| - for _, e := range merr {
|
| - log.Warningf(c, "Task add error: %v", e)
|
| - }
|
| - }
|
| - if err := errors.Filter(err, tq.ErrTaskAlreadyAdded); err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "queue": queueName,
|
| - "numTasks": len(tasks),
|
| - "scheduledTaskCount": totalScheduledTasks,
|
| - }.Errorf(c, "Failed to add tasks to task queue.")
|
| - return errors.New("failed to add tasks to task queue")
|
| - }
|
| -
|
| - totalScheduledTasks += len(tasks)
|
| - tasks = tasks[:0]
|
| - return nil
|
| - }
|
| + // Return an error code if we experienced any failures. This doesn't really
|
| + // have an impact, but it will show up as a "!" in the cron UI.
|
| + switch {
|
| + case ierr != nil:
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "archiveCount": tasked,
|
| + }.Errorf(c, "Failed to execute expired tasks query.")
|
| + return ierr
|
|
|
| - err = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(ls *coordinator.LogStream) error {
|
| - requireComplete := !now.After(ls.Updated.Add(maxDelay))
|
| - if !requireComplete {
|
| - log.Fields{
|
| - "path": ls.Path(),
|
| - "id": ls.HashID(),
|
| - "updatedTimestamp": ls.Updated,
|
| - "maxDelay": maxDelay,
|
| - }.Warningf(c, "Identified log stream past maximum archival delay.")
|
| - } else {
|
| - log.Fields{
|
| - "id": ls.HashID(),
|
| - "updated": ls.Updated.String(),
|
| - }.Infof(c, "Identified log stream ready for archival.")
|
| - }
|
| -
|
| - task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete)
|
| - if err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "path": ls.Path(),
|
| - }.Errorf(c, "Failed to create archive task.")
|
| - return err
|
| - }
|
| -
|
| - return addAndMaybeDispatchTasks(task)
|
| - })
|
| - if err != nil {
|
| + case failed > 0:
|
| log.Fields{
|
| - log.ErrorKey: err,
|
| - "scheduledTaskCount": totalScheduledTasks,
|
| - }.Errorf(c, "Outer archive query failed.")
|
| - return errors.New("outer archive query failed")
|
| - }
|
| + log.ErrorKey: err,
|
| + "archiveCount": tasked,
|
| + "failCount": failed,
|
| + }.Errorf(c, "Failed to archive candidate all streams.")
|
| + return errors.New("failed to archive all candidate streams")
|
|
|
| - // Dispatch any remaining enqueued tasks.
|
| - if err := addAndMaybeDispatchTasks(nil); err != nil {
|
| - return err
|
| + default:
|
| + log.Fields{
|
| + "archiveCount": tasked,
|
| + }.Infof(c, "Archive sweep completed successfully.")
|
| + return nil
|
| }
|
| -
|
| - log.Fields{
|
| - "scheduledTaskCount": totalScheduledTasks,
|
| - }.Debugf(c, "Archive sweep completed successfully.")
|
| - return nil
|
| }
|
|
|