| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package backend | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "net/http" | |
| 10 "sync/atomic" | |
| 11 | |
| 12 "github.com/julienschmidt/httprouter" | |
| 13 "github.com/luci/gae/filter/dsQueryBatch" | |
| 14 ds "github.com/luci/gae/service/datastore" | |
| 15 "github.com/luci/gae/service/info" | |
| 16 "github.com/luci/luci-go/appengine/logdog/coordinator" | |
| 17 "github.com/luci/luci-go/common/clock" | |
| 18 "github.com/luci/luci-go/common/errors" | |
| 19 log "github.com/luci/luci-go/common/logging" | |
| 20 "github.com/luci/luci-go/common/parallel" | |
| 21 "golang.org/x/net/context" | |
| 22 ) | |
| 23 | |
| 24 const archiveTaskVersion = "v4" | |
| 25 | |
| 26 // HandleArchiveCron is the handler for the archive cron endpoint. This scans | |
| 27 // for log streams that are ready for archival. | |
| 28 // | |
| 29 // This will be called periodically by AppEngine cron. | |
| 30 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r
*http.Request, p httprouter.Params) { | |
| 31 errorWrapper(c, w, func() error { | |
| 32 return b.archiveCron(c) | |
| 33 }) | |
| 34 } | |
| 35 | |
| 36 func (b *Backend) archiveCron(c context.Context) error { | |
| 37 svc := coordinator.GetServices(c) | |
| 38 _, cfg, err := svc.Config(c) | |
| 39 if err != nil { | |
| 40 return fmt.Errorf("failed to load configuration: %v", err) | |
| 41 } | |
| 42 | |
| 43 archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration() | |
| 44 if archiveDelayMax <= 0 { | |
| 45 return fmt.Errorf("must have positive maximum archive delay, not
%q", archiveDelayMax.String()) | |
| 46 } | |
| 47 | |
| 48 ap, err := svc.ArchivalPublisher(c) | |
| 49 if err != nil { | |
| 50 return fmt.Errorf("failed to get archival publisher: %v", err) | |
| 51 } | |
| 52 | |
| 53 threshold := clock.Now(c).UTC().Add(-archiveDelayMax) | |
| 54 log.Fields{ | |
| 55 "threshold": threshold, | |
| 56 }.Infof(c, "Querying for all streaming logs created before max archival
threshold.") | |
| 57 | |
| 58 // Query for log streams that were created <= our threshold and that are | |
| 59 // still in LSStreaming state. | |
| 60 // | |
| 61 // We order descending because this is already an index that we use for
our | |
| 62 // "logdog.Logs.Query". | |
| 63 q := ds.NewQuery("LogStream"). | |
| 64 KeysOnly(true). | |
| 65 Eq("State", coordinator.LSStreaming). | |
| 66 Lte("Created", threshold). | |
| 67 Order("-Created", "State") | |
| 68 | |
| 69 // Since these logs are beyond maximum archival delay, we will dispatch | |
| 70 // archival immediately. | |
| 71 params := coordinator.ArchivalParams{ | |
| 72 RequestID: info.Get(c).RequestID(), | |
| 73 } | |
| 74 | |
| 75 // Create archive tasks for our expired log streams in parallel. | |
| 76 batch := b.getMultiTaskBatchSize() | |
| 77 var tasked int32 | |
| 78 var failed int32 | |
| 79 | |
| 80 var ierr error | |
| 81 parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) { | |
| 82 // Run a batched query across the expired log stream space. | |
| 83 ierr = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q,
func(lsKey *ds.Key) error { | |
| 84 var ls coordinator.LogStream | |
| 85 ds.PopulateKey(&ls, lsKey) | |
| 86 | |
| 87 // Archive this log stream in a transaction. | |
| 88 taskC <- func() error { | |
| 89 err := ds.Get(c).RunInTransaction(func(c context
.Context) error { | |
| 90 if err := ds.Get(c).Get(&ls); err != nil
{ | |
| 91 log.WithError(err).Errorf(c, "Fa
iled to load stream.") | |
| 92 return err | |
| 93 } | |
| 94 | |
| 95 log.Fields{ | |
| 96 "path": ls.Path(), | |
| 97 "id": ls.HashID, | |
| 98 }.Infof(c, "Identified expired log strea
m.") | |
| 99 | |
| 100 if err := params.PublishTask(c, ap, &ls)
; err != nil { | |
| 101 if err == coordinator.ErrArchive
Tasked { | |
| 102 log.Warningf(c, "Archiva
l has already been tasked for this stream.") | |
| 103 return nil | |
| 104 } | |
| 105 return err | |
| 106 } | |
| 107 return ds.Get(c).Put(&ls) | |
| 108 }, nil) | |
| 109 | |
| 110 if err != nil { | |
| 111 log.Fields{ | |
| 112 log.ErrorKey: err, | |
| 113 "path": ls.Path(), | |
| 114 }.Errorf(c, "Failed to archive log strea
m.") | |
| 115 atomic.AddInt32(&failed, 1) | |
| 116 return nil // Nothing will consume it an
yway. | |
| 117 } | |
| 118 | |
| 119 log.Fields{ | |
| 120 "path": ls.Path(), | |
| 121 "id": ls.HashID, | |
| 122 "archiveTopic": cfg.Coordinator.ArchiveT
opic, | |
| 123 }.Infof(c, "Created archive task.") | |
| 124 atomic.AddInt32(&tasked, 1) | |
| 125 return nil | |
| 126 } | |
| 127 | |
| 128 return nil | |
| 129 }) | |
| 130 })) | |
| 131 | |
| 132 // Return an error code if we experienced any failures. This doesn't rea
lly | |
| 133 // have an impact, but it will show up as a "!" in the cron UI. | |
| 134 switch { | |
| 135 case ierr != nil: | |
| 136 log.Fields{ | |
| 137 log.ErrorKey: err, | |
| 138 "archiveCount": tasked, | |
| 139 }.Errorf(c, "Failed to execute expired tasks query.") | |
| 140 return ierr | |
| 141 | |
| 142 case failed > 0: | |
| 143 log.Fields{ | |
| 144 log.ErrorKey: err, | |
| 145 "archiveCount": tasked, | |
| 146 "failCount": failed, | |
| 147 }.Errorf(c, "Failed to archive candidate all streams.") | |
| 148 return errors.New("failed to archive all candidate streams") | |
| 149 | |
| 150 default: | |
| 151 log.Fields{ | |
| 152 "archiveCount": tasked, | |
| 153 }.Infof(c, "Archive sweep completed successfully.") | |
| 154 return nil | |
| 155 } | |
| 156 } | |
| OLD | NEW |