Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8373)

Unified Diff: appengine/logdog/coordinator/backend/archiveCron.go

Issue 1910633006: LogDog: Support per-namespace expired archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-coordinator-svcdec
Patch Set: Update another test. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/backend/archiveCron.go
diff --git a/appengine/logdog/coordinator/backend/archiveCron.go b/appengine/logdog/coordinator/backend/archiveCron.go
deleted file mode 100644
index f24e7233a12bedf243fe707ab18d5d3865d55654..0000000000000000000000000000000000000000
--- a/appengine/logdog/coordinator/backend/archiveCron.go
+++ /dev/null
@@ -1,156 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-package backend
-
-import (
- "fmt"
- "net/http"
- "sync/atomic"
-
- "github.com/julienschmidt/httprouter"
- "github.com/luci/gae/filter/dsQueryBatch"
- ds "github.com/luci/gae/service/datastore"
- "github.com/luci/gae/service/info"
- "github.com/luci/luci-go/appengine/logdog/coordinator"
- "github.com/luci/luci-go/common/clock"
- "github.com/luci/luci-go/common/errors"
- log "github.com/luci/luci-go/common/logging"
- "github.com/luci/luci-go/common/parallel"
- "golang.org/x/net/context"
-)
-
-const archiveTaskVersion = "v4"
-
-// HandleArchiveCron is the handler for the archive cron endpoint. This scans
-// for log streams that are ready for archival.
-//
-// This will be called periodically by AppEngine cron.
-func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
- errorWrapper(c, w, func() error {
- return b.archiveCron(c)
- })
-}
-
-func (b *Backend) archiveCron(c context.Context) error {
- svc := coordinator.GetServices(c)
- _, cfg, err := svc.Config(c)
- if err != nil {
- return fmt.Errorf("failed to load configuration: %v", err)
- }
-
- archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration()
- if archiveDelayMax <= 0 {
- return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String())
- }
-
- ap, err := svc.ArchivalPublisher(c)
- if err != nil {
- return fmt.Errorf("failed to get archival publisher: %v", err)
- }
-
- threshold := clock.Now(c).UTC().Add(-archiveDelayMax)
- log.Fields{
- "threshold": threshold,
- }.Infof(c, "Querying for all streaming logs created before max archival threshold.")
-
- // Query for log streams that were created <= our threshold and that are
- // still in LSStreaming state.
- //
- // We order descending because this is already an index that we use for our
- // "logdog.Logs.Query".
- q := ds.NewQuery("LogStream").
- KeysOnly(true).
- Eq("State", coordinator.LSStreaming).
- Lte("Created", threshold).
- Order("-Created", "State")
-
- // Since these logs are beyond maximum archival delay, we will dispatch
- // archival immediately.
- params := coordinator.ArchivalParams{
- RequestID: info.Get(c).RequestID(),
- }
-
- // Create archive tasks for our expired log streams in parallel.
- batch := b.getMultiTaskBatchSize()
- var tasked int32
- var failed int32
-
- var ierr error
- parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) {
- // Run a batched query across the expired log stream space.
- ierr = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(lsKey *ds.Key) error {
- var ls coordinator.LogStream
- ds.PopulateKey(&ls, lsKey)
-
- // Archive this log stream in a transaction.
- taskC <- func() error {
- err := ds.Get(c).RunInTransaction(func(c context.Context) error {
- if err := ds.Get(c).Get(&ls); err != nil {
- log.WithError(err).Errorf(c, "Failed to load stream.")
- return err
- }
-
- log.Fields{
- "path": ls.Path(),
- "id": ls.HashID,
- }.Infof(c, "Identified expired log stream.")
-
- if err := params.PublishTask(c, ap, &ls); err != nil {
- if err == coordinator.ErrArchiveTasked {
- log.Warningf(c, "Archival has already been tasked for this stream.")
- return nil
- }
- return err
- }
- return ds.Get(c).Put(&ls)
- }, nil)
-
- if err != nil {
- log.Fields{
- log.ErrorKey: err,
- "path": ls.Path(),
- }.Errorf(c, "Failed to archive log stream.")
- atomic.AddInt32(&failed, 1)
- return nil // Nothing will consume it anyway.
- }
-
- log.Fields{
- "path": ls.Path(),
- "id": ls.HashID,
- "archiveTopic": cfg.Coordinator.ArchiveTopic,
- }.Infof(c, "Created archive task.")
- atomic.AddInt32(&tasked, 1)
- return nil
- }
-
- return nil
- })
- }))
-
- // Return an error code if we experienced any failures. This doesn't really
- // have an impact, but it will show up as a "!" in the cron UI.
- switch {
- case ierr != nil:
- log.Fields{
- log.ErrorKey: err,
- "archiveCount": tasked,
- }.Errorf(c, "Failed to execute expired tasks query.")
- return ierr
-
- case failed > 0:
- log.Fields{
- log.ErrorKey: err,
- "archiveCount": tasked,
- "failCount": failed,
- }.Errorf(c, "Failed to archive candidate all streams.")
- return errors.New("failed to archive all candidate streams")
-
- default:
- log.Fields{
- "archiveCount": tasked,
- }.Infof(c, "Archive sweep completed successfully.")
- return nil
- }
-}
« no previous file with comments | « appengine/logdog/coordinator/archivalPublisher.go ('k') | appengine/logdog/coordinator/backend/archiveCron_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698