Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(376)

Side by Side Diff: appengine/logdog/coordinator/backend/archiveCron.go

Issue 1844963002: Iterate archive query alongside task queue. (Closed) Base URL: https://github.com/luci/luci-go@collector-gae-classic
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package backend 5 package backend
6 6
7 import ( 7 import (
8 "errors"
9 "fmt" 8 "fmt"
10 "net/http" 9 "net/http"
11 "time" 10 "time"
12 11
13 "github.com/julienschmidt/httprouter" 12 "github.com/julienschmidt/httprouter"
14 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
15 tq "github.com/luci/gae/service/taskqueue" 14 tq "github.com/luci/gae/service/taskqueue"
16 "github.com/luci/luci-go/appengine/logdog/coordinator" 15 "github.com/luci/luci-go/appengine/logdog/coordinator"
17 "github.com/luci/luci-go/appengine/logdog/coordinator/config" 16 "github.com/luci/luci-go/appengine/logdog/coordinator/config"
18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
19 "github.com/luci/luci-go/common/clock" 18 "github.com/luci/luci-go/common/clock"
19 "github.com/luci/luci-go/common/errors"
20 log "github.com/luci/luci-go/common/logging" 20 log "github.com/luci/luci-go/common/logging"
21 "github.com/luci/luci-go/common/proto/logdog/svcconfig" 21 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
22 "golang.org/x/net/context" 22 "golang.org/x/net/context"
23 ) 23 )
24 24
25 // archiveTaskQueueName returns the task queue name for archival, or an error 25 // archiveTaskQueueName returns the task queue name for archival, or an error
26 // if it's not configured. 26 // if it's not configured.
27 func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) { 27 func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) {
28 q := cfg.GetCoordinator().ArchiveTaskQueue 28 q := cfg.GetCoordinator().ArchiveTaskQueue
29 if q == "" { 29 if q == "" {
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
101 log.WithError(err).Errorf(c, "Failed to load configuration.") 101 log.WithError(err).Errorf(c, "Failed to load configuration.")
102 return err 102 return err
103 } 103 }
104 104
105 queueName, err := archiveTaskQueueName(cfg) 105 queueName, err := archiveTaskQueueName(cfg)
106 if err != nil { 106 if err != nil {
107 log.Errorf(c, "Failed to get task queue name.") 107 log.Errorf(c, "Failed to get task queue name.")
108 return err 108 return err
109 } 109 }
110 110
111 // If the log stream has a terminal index, and its Updated time is less than
112 // the maximum archive delay, require this archival to be complete (no
113 // missing LogEntry).
114 //
115 // If we're past maximum archive delay, settle for any (even empty) arch ival.
116 // This is a failsafe to prevent logs from sitting in limbo forever.
117 maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
118
111 now := clock.Now(c).UTC() 119 now := clock.Now(c).UTC()
112 q := ds.NewQuery("LogStream") 120 q := ds.NewQuery("LogStream")
113 121
114 var threshold time.Duration 122 var threshold time.Duration
115 if complete { 123 if complete {
116 threshold = cfg.GetCoordinator().ArchiveDelay.Duration() 124 threshold = cfg.GetCoordinator().ArchiveDelay.Duration()
117 q = q.Eq("State", coordinator.LSTerminated) 125 q = q.Eq("State", coordinator.LSTerminated)
118 } else { 126 } else {
119 threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration() 127 threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration()
120 q = q.Eq("State", coordinator.LSPending) 128 q = q.Eq("State", coordinator.LSPending)
121 } 129 }
122 q = q.Lte("Updated", now.Add(-threshold)) 130 q = q.Lte("Updated", now.Add(-threshold))
123 131
124 » // Query and dispatch our tasks. 132 » // We will enqueue tasks in batches, and there's no need to retrieve mor e than
125 » var ierr error 133 » // our batch size.
126 » count, err := b.multiTask(c, queueName, func(taskC chan<- *tq.Task) { 134 » batch := b.getMultiTaskBatchSize()
127 » » ierr = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error { 135 » tasks := make([]*tq.Task, 0, batch)
136 » q = q.Limit(int32(batch))
137
138 » // Perform an iterative query, dispatching tasks with each round.
139 » di := ds.Get(c)
140 » ti := tq.Get(c)
141
142 » totalScheduledTasks := 0
143
144 » var next ds.Cursor
145 » for {
146 » » iterQuery := q
147 » » if next != nil {
148 » » » iterQuery = iterQuery.Start(next)
149 » » » next = nil
150 » » }
151
152 » » err = di.Run(iterQuery, func(ls *coordinator.LogStream, cb ds.Cu rsorCB) error {
iannucci 2016/03/30 19:04:27 I would just loop in the query and dispatch the ta
dnj 2016/03/30 19:10:46 Done.
128 log.Fields{ 153 log.Fields{
129 "id": ls.HashID(), 154 "id": ls.HashID(),
130 "updated": ls.Updated.String(), 155 "updated": ls.Updated.String(),
131 }.Infof(c, "Identified log stream ready for archival.") 156 }.Infof(c, "Identified log stream ready for archival.")
132 157
133 // If the log stream has a terminal index, and its Updat ed time is less than
134 // the maximum archive delay, require this archival to b e complete (no
135 // missing LogEntry).
136 //
137 // If we're past maximum archive delay, settle for any ( even empty) archival.
138 // This is a failsafe to prevent logs from sitting in li mbo forever.
139 maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duratio n()
140 requireComplete := !now.After(ls.Updated.Add(maxDelay)) 158 requireComplete := !now.After(ls.Updated.Add(maxDelay))
141 if !requireComplete { 159 if !requireComplete {
142 log.Fields{ 160 log.Fields{
143 "path": ls.Path(), 161 "path": ls.Path(),
144 "updatedTimestamp": ls.Updated, 162 "updatedTimestamp": ls.Updated,
145 "maxDelay": maxDelay, 163 "maxDelay": maxDelay,
146 }.Warningf(c, "Log stream is past maximum archiv al delay. Dropping completeness requirement.") 164 }.Warningf(c, "Log stream is past maximum archiv al delay. Dropping completeness requirement.")
147 } 165 }
148 166
149 task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete) 167 task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete)
150 if err != nil { 168 if err != nil {
151 log.Fields{ 169 log.Fields{
152 log.ErrorKey: err, 170 log.ErrorKey: err,
153 "path": ls.Path(), 171 "path": ls.Path(),
154 }.Errorf(c, "Failed to create archive task.") 172 }.Errorf(c, "Failed to create archive task.")
155 return err 173 return err
156 } 174 }
157 175
158 » » » taskC <- task 176 » » » tasks = append(tasks, task)
177
178 » » » // If we're at or over our batch size, break and enqueue the tasks.
179 » » » if len(tasks) >= batch {
180 » » » » next, err = cb()
181 » » » » if err != nil {
182 » » » » » return fmt.Errorf("failed to get cursor: %v", err)
183 » » » » }
184 » » » » return ds.Stop
185 » » » }
159 return nil 186 return nil
160 }) 187 })
161 » }) 188 » » if err != nil {
162 » if err != nil || ierr != nil { 189 » » » log.Fields{
163 » » log.Fields{ 190 » » » » log.ErrorKey: err,
164 » » » log.ErrorKey: err, 191 » » » » "scheduledTaskCount": totalScheduledTasks,
165 » » » "queryErr": ierr, 192 » » » }.Errorf(c, "Failed to query for archival tasks.")
166 » » » "taskCount": count, 193 » » » return errors.New("failed to dispatch archival tasks")
167 » » }.Errorf(c, "Failed to dispatch archival tasks.") 194 » » }
168 » » return errors.New("failed to dispatch archival tasks") 195
196 » » // Dispatch the accumulated tasks.
197 » » if len(tasks) > 0 {
198 » » » if err := errors.Filter(ti.AddMulti(tasks, queueName), t q.ErrTaskAlreadyAdded); err != nil {
199 » » » » log.Fields{
200 » » » » » log.ErrorKey: err,
201 » » » » » "queue": queueName,
202 » » » » » "numTasks": len(tasks),
203 » » » » » "scheduledTaskCount": totalScheduledTask s,
204 » » » » }.Errorf(c, "Failed to add tasks to task queue." )
205 » » » » return errors.New("failed to add tasks to task q ueue")
206 » » » }
207
208 » » » totalScheduledTasks += len(tasks)
209 » » » tasks = tasks[:0]
210 » » }
211
212 » » // If there is no query cursor, we're done.
213 » » if next == nil {
214 » » » break
215 » » }
169 } 216 }
170 217
171 log.Fields{ 218 log.Fields{
172 » » "taskCount": count, 219 » » "scheduledTaskCount": totalScheduledTasks,
173 }.Debugf(c, "Archive sweep completed successfully.") 220 }.Debugf(c, "Archive sweep completed successfully.")
174 return nil 221 return nil
175 } 222 }
OLDNEW
« no previous file with comments | « no previous file | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698