Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package backend | 5 package backend |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | |
| 9 "fmt" | 8 "fmt" |
| 10 "net/http" | 9 "net/http" |
| 11 "time" | 10 "time" |
| 12 | 11 |
| 13 "github.com/julienschmidt/httprouter" | 12 "github.com/julienschmidt/httprouter" |
| 14 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
| 15 tq "github.com/luci/gae/service/taskqueue" | 14 tq "github.com/luci/gae/service/taskqueue" |
| 16 "github.com/luci/luci-go/appengine/logdog/coordinator" | 15 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 17 "github.com/luci/luci-go/appengine/logdog/coordinator/config" | 16 "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
| 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 19 "github.com/luci/luci-go/common/clock" | 18 "github.com/luci/luci-go/common/clock" |
| 19 "github.com/luci/luci-go/common/errors" | |
| 20 log "github.com/luci/luci-go/common/logging" | 20 log "github.com/luci/luci-go/common/logging" |
| 21 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | 21 "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
| 22 "golang.org/x/net/context" | 22 "golang.org/x/net/context" |
| 23 ) | 23 ) |
| 24 | 24 |
| 25 // archiveTaskQueueName returns the task queue name for archival, or an error | 25 // archiveTaskQueueName returns the task queue name for archival, or an error |
| 26 // if it's not configured. | 26 // if it's not configured. |
| 27 func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) { | 27 func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) { |
| 28 q := cfg.GetCoordinator().ArchiveTaskQueue | 28 q := cfg.GetCoordinator().ArchiveTaskQueue |
| 29 if q == "" { | 29 if q == "" { |
| (...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 101 log.WithError(err).Errorf(c, "Failed to load configuration.") | 101 log.WithError(err).Errorf(c, "Failed to load configuration.") |
| 102 return err | 102 return err |
| 103 } | 103 } |
| 104 | 104 |
| 105 queueName, err := archiveTaskQueueName(cfg) | 105 queueName, err := archiveTaskQueueName(cfg) |
| 106 if err != nil { | 106 if err != nil { |
| 107 log.Errorf(c, "Failed to get task queue name.") | 107 log.Errorf(c, "Failed to get task queue name.") |
| 108 return err | 108 return err |
| 109 } | 109 } |
| 110 | 110 |
| 111 // If the log stream has a terminal index, and its Updated time is less than | |
| 112 // the maximum archive delay, require this archival to be complete (no | |
| 113 // missing LogEntry). | |
| 114 // | |
| 115 // If we're past maximum archive delay, settle for any (even empty) arch ival. | |
| 116 // This is a failsafe to prevent logs from sitting in limbo forever. | |
| 117 maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration() | |
| 118 | |
| 111 now := clock.Now(c).UTC() | 119 now := clock.Now(c).UTC() |
| 112 q := ds.NewQuery("LogStream") | 120 q := ds.NewQuery("LogStream") |
| 113 | 121 |
| 114 var threshold time.Duration | 122 var threshold time.Duration |
| 115 if complete { | 123 if complete { |
| 116 threshold = cfg.GetCoordinator().ArchiveDelay.Duration() | 124 threshold = cfg.GetCoordinator().ArchiveDelay.Duration() |
| 117 q = q.Eq("State", coordinator.LSTerminated) | 125 q = q.Eq("State", coordinator.LSTerminated) |
| 118 } else { | 126 } else { |
| 119 threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration() | 127 threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration() |
| 120 q = q.Eq("State", coordinator.LSPending) | 128 q = q.Eq("State", coordinator.LSPending) |
| 121 } | 129 } |
| 122 q = q.Lte("Updated", now.Add(-threshold)) | 130 q = q.Lte("Updated", now.Add(-threshold)) |
| 123 | 131 |
| 124 » // Query and dispatch our tasks. | 132 » // We will enqueue tasks in batches, and there's no need to retrieve mor e than |
| 125 » var ierr error | 133 » // our batch size. |
| 126 » count, err := b.multiTask(c, queueName, func(taskC chan<- *tq.Task) { | 134 » batch := b.getMultiTaskBatchSize() |
| 127 » » ierr = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error { | 135 » tasks := make([]*tq.Task, 0, batch) |
| 136 » q = q.Limit(int32(batch)) | |
| 137 | |
| 138 » // Perform an iterative query, dispatching tasks with each round. | |
| 139 » di := ds.Get(c) | |
| 140 » ti := tq.Get(c) | |
| 141 | |
| 142 » totalScheduledTasks := 0 | |
| 143 | |
| 144 » var next ds.Cursor | |
| 145 » for { | |
| 146 » » iterQuery := q | |
| 147 » » if next != nil { | |
| 148 » » » iterQuery = iterQuery.Start(next) | |
| 149 » » » next = nil | |
| 150 » » } | |
| 151 | |
| 152 » » err = di.Run(iterQuery, func(ls *coordinator.LogStream, cb ds.Cu rsorCB) error { | |
|
iannucci
2016/03/30 19:04:27
I would just loop in the query and dispatch the ta
dnj
2016/03/30 19:10:46
Done.
| |
| 128 log.Fields{ | 153 log.Fields{ |
| 129 "id": ls.HashID(), | 154 "id": ls.HashID(), |
| 130 "updated": ls.Updated.String(), | 155 "updated": ls.Updated.String(), |
| 131 }.Infof(c, "Identified log stream ready for archival.") | 156 }.Infof(c, "Identified log stream ready for archival.") |
| 132 | 157 |
| 133 // If the log stream has a terminal index, and its Updat ed time is less than | |
| 134 // the maximum archive delay, require this archival to b e complete (no | |
| 135 // missing LogEntry). | |
| 136 // | |
| 137 // If we're past maximum archive delay, settle for any ( even empty) archival. | |
| 138 // This is a failsafe to prevent logs from sitting in li mbo forever. | |
| 139 maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duratio n() | |
| 140 requireComplete := !now.After(ls.Updated.Add(maxDelay)) | 158 requireComplete := !now.After(ls.Updated.Add(maxDelay)) |
| 141 if !requireComplete { | 159 if !requireComplete { |
| 142 log.Fields{ | 160 log.Fields{ |
| 143 "path": ls.Path(), | 161 "path": ls.Path(), |
| 144 "updatedTimestamp": ls.Updated, | 162 "updatedTimestamp": ls.Updated, |
| 145 "maxDelay": maxDelay, | 163 "maxDelay": maxDelay, |
| 146 }.Warningf(c, "Log stream is past maximum archiv al delay. Dropping completeness requirement.") | 164 }.Warningf(c, "Log stream is past maximum archiv al delay. Dropping completeness requirement.") |
| 147 } | 165 } |
| 148 | 166 |
| 149 task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete) | 167 task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete) |
| 150 if err != nil { | 168 if err != nil { |
| 151 log.Fields{ | 169 log.Fields{ |
| 152 log.ErrorKey: err, | 170 log.ErrorKey: err, |
| 153 "path": ls.Path(), | 171 "path": ls.Path(), |
| 154 }.Errorf(c, "Failed to create archive task.") | 172 }.Errorf(c, "Failed to create archive task.") |
| 155 return err | 173 return err |
| 156 } | 174 } |
| 157 | 175 |
| 158 » » » taskC <- task | 176 » » » tasks = append(tasks, task) |
| 177 | |
| 178 » » » // If we're at or over our batch size, break and enqueue the tasks. | |
| 179 » » » if len(tasks) >= batch { | |
| 180 » » » » next, err = cb() | |
| 181 » » » » if err != nil { | |
| 182 » » » » » return fmt.Errorf("failed to get cursor: %v", err) | |
| 183 » » » » } | |
| 184 » » » » return ds.Stop | |
| 185 » » » } | |
| 159 return nil | 186 return nil |
| 160 }) | 187 }) |
| 161 » }) | 188 » » if err != nil { |
| 162 » if err != nil || ierr != nil { | 189 » » » log.Fields{ |
| 163 » » log.Fields{ | 190 » » » » log.ErrorKey: err, |
| 164 » » » log.ErrorKey: err, | 191 » » » » "scheduledTaskCount": totalScheduledTasks, |
| 165 » » » "queryErr": ierr, | 192 » » » }.Errorf(c, "Failed to query for archival tasks.") |
| 166 » » » "taskCount": count, | 193 » » » return errors.New("failed to dispatch archival tasks") |
| 167 » » }.Errorf(c, "Failed to dispatch archival tasks.") | 194 » » } |
| 168 » » return errors.New("failed to dispatch archival tasks") | 195 |
| 196 » » // Dispatch the accumulated tasks. | |
| 197 » » if len(tasks) > 0 { | |
| 198 » » » if err := errors.Filter(ti.AddMulti(tasks, queueName), t q.ErrTaskAlreadyAdded); err != nil { | |
| 199 » » » » log.Fields{ | |
| 200 » » » » » log.ErrorKey: err, | |
| 201 » » » » » "queue": queueName, | |
| 202 » » » » » "numTasks": len(tasks), | |
| 203 » » » » » "scheduledTaskCount": totalScheduledTask s, | |
| 204 » » » » }.Errorf(c, "Failed to add tasks to task queue." ) | |
| 205 » » » » return errors.New("failed to add tasks to task q ueue") | |
| 206 » » » } | |
| 207 | |
| 208 » » » totalScheduledTasks += len(tasks) | |
| 209 » » » tasks = tasks[:0] | |
| 210 » » } | |
| 211 | |
| 212 » » // If there is no query cursor, we're done. | |
| 213 » » if next == nil { | |
| 214 » » » break | |
| 215 » » } | |
| 169 } | 216 } |
| 170 | 217 |
| 171 log.Fields{ | 218 log.Fields{ |
| 172 » » "taskCount": count, | 219 » » "scheduledTaskCount": totalScheduledTasks, |
| 173 }.Debugf(c, "Archive sweep completed successfully.") | 220 }.Debugf(c, "Archive sweep completed successfully.") |
| 174 return nil | 221 return nil |
| 175 } | 222 } |
| OLD | NEW |