Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(139)

Side by Side Diff: server/internal/logdog/archivist/archivist.go

Issue 1874563005: Archivist asserts completeness through keys scan. (Closed) Base URL: https://github.com/luci/luci-go@logdog-storage-keysonly
Patch Set: Remove unused members. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "encoding/hex" 9 "encoding/hex"
10 "fmt" 10 "fmt"
(...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after
165 age := ls.Age.Duration() 165 age := ls.Age.Duration()
166 if age < at.SettleDelay.Duration() { 166 if age < at.SettleDelay.Duration() {
167 log.Fields{ 167 log.Fields{
168 "age": age, 168 "age": age,
169 "settleDelay": at.SettleDelay.Duration(), 169 "settleDelay": at.SettleDelay.Duration(),
170 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.") 170 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.")
171 return false, errors.New("log stream is within settle delay") 171 return false, errors.New("log stream is within settle delay")
172 } 172 }
173 173
174 // Are we required to archive a complete log stream? 174 // Are we required to archive a complete log stream?
175 » complete := (age <= at.CompletePeriod.Duration()) 175 » if age <= at.CompletePeriod.Duration() {
176 » if complete && ls.State.TerminalIndex < 0 { 176 » » tidx := ls.State.TerminalIndex
177 » » log.Warningf(c, "Cannot archive complete stream with no terminal index.") 177
178 » » return false, errors.New("completeness required, but stream has no terminal index") 178 » » if tidx < 0 {
179 » » » log.Warningf(c, "Cannot archive complete stream with no terminal index.")
180 » » » return false, errors.New("completeness required, but str eam has no terminal index")
181 » » }
182
183 » » // If we're requiring completeness, perform a keys-only scan of intermediate
184 » » // storage to ensure that we have all of the records before we b other
185 » » // streaming to storage only to find that we are missing data.
186 » » if err := a.checkComplete(types.StreamPath(at.Path), types.Messa geIndex(tidx)); err != nil {
187 » » » return false, err
188 » » }
179 } 189 }
180 190
181 ar := logdog.ArchiveStreamRequest{ 191 ar := logdog.ArchiveStreamRequest{
182 Path: at.Path, 192 Path: at.Path,
183 } 193 }
184 194
185 // Archive to staging. 195 // Archive to staging.
186 // 196 //
187 // If a non-transient failure occurs here, we will report it to the Arch ivist 197 // If a non-transient failure occurs here, we will report it to the Arch ivist
188 // under the assumption that it will continue occurring. 198 // under the assumption that it will continue occurring.
189 // 199 //
190 // We will handle error creating the plan and executing the plan in the same 200 // We will handle error creating the plan and executing the plan in the same
191 // switch statement below. 201 // switch statement below.
192 staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, ta sk.UniqueID()) 202 staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, ta sk.UniqueID())
193 if err != nil { 203 if err != nil {
194 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") 204 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.")
195 } else { 205 } else {
196 » » err = staged.stage(c, complete) 206 » » err = staged.stage(c)
197 } 207 }
198 208
199 switch { 209 switch {
200 case errors.IsTransient(err): 210 case errors.IsTransient(err):
201 // If this is a transient error, exit immediately and do not del ete the 211 // If this is a transient error, exit immediately and do not del ete the
202 // archival task. 212 // archival task.
203 log.WithError(err).Warningf(c, "TRANSIENT error during archival operation.") 213 log.WithError(err).Warningf(c, "TRANSIENT error during archival operation.")
204 return false, err 214 return false, err
205 215
206 case err != nil: 216 case err != nil:
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
252 if _, err := a.Service.ArchiveStream(c, &ar); err != nil { 262 if _, err := a.Service.ArchiveStream(c, &ar); err != nil {
253 log.WithError(err).Errorf(c, "Failed to report archive state.") 263 log.WithError(err).Errorf(c, "Failed to report archive state.")
254 return false, err 264 return false, err
255 } 265 }
256 266
257 // Archival is complete and acknowledged by Coordinator. Consume the arc hival 267 // Archival is complete and acknowledged by Coordinator. Consume the arc hival
258 // task. 268 // task.
259 return true, nil 269 return true, nil
260 } 270 }
261 271
272 // checkComplete performs a quick scan of intermediate storage to ensure that
273 // all of the log stream's records are available.
274 func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex ) error {
275 sreq := storage.GetRequest{
276 Path: path,
277 KeysOnly: true,
278 }
279
280 nextIndex := types.MessageIndex(0)
281 var ierr error
282 err := a.Storage.Get(sreq, func(idx types.MessageIndex, d []byte) bool {
283 switch {
284 case idx != nextIndex:
285 ierr = fmt.Errorf("missing log entry index %d (next %d)" , nextIndex, idx)
286 return false
287
288 case idx == tidx:
289 // We have hit our terminal index, so all of the log dat a is here!
290 return false
291
292 default:
293 nextIndex++
294 return true
295 }
296 })
297 if ierr != nil {
298 return ierr
299 }
300 if err != nil {
301 return err
302 }
303 return nil
304 }
305
262 func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) ( 306 func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) (
263 *stagedArchival, error) { 307 *stagedArchival, error) {
264 sa := stagedArchival{ 308 sa := stagedArchival{
265 Archivist: a, 309 Archivist: a,
266 path: path, 310 path: path,
267 311
268 terminalIndex: ls.State.TerminalIndex, 312 terminalIndex: ls.State.TerminalIndex,
269 } 313 }
270 314
271 // Deserialize and validate the descriptor protobuf. If this fails, it i s a 315 // Deserialize and validate the descriptor protobuf. If this fails, it i s a
(...skipping 27 matching lines...) Expand all
299 final: a.GSBase.Concat(string(path), name), 343 final: a.GSBase.Concat(string(path), name),
300 } 344 }
301 } 345 }
302 346
303 type stagedArchival struct { 347 type stagedArchival struct {
304 *Archivist 348 *Archivist
305 349
306 path types.StreamPath 350 path types.StreamPath
307 desc logpb.LogStreamDescriptor 351 desc logpb.LogStreamDescriptor
308 352
309 » stream stagingPaths 353 » stream stagingPaths
310 » streamSize int64 354 » index stagingPaths
311 355 » data stagingPaths
312 » index stagingPaths
313 » indexSize int64
314
315 » data stagingPaths
316 » dataSize int64
317 356
318 finalized bool 357 finalized bool
319 terminalIndex int64 358 terminalIndex int64
320 logEntryCount int64 359 logEntryCount int64
321 } 360 }
322 361
323 // stage executes the archival process, archiving to the staged storage paths. 362 // stage executes the archival process, archiving to the staged storage paths.
324 // 363 //
325 // If stage fails, it may return a transient error. 364 // If stage fails, it may return a transient error.
326 func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) { 365 func (sa *stagedArchival) stage(c context.Context) (err error) {
327 log.Fields{ 366 log.Fields{
328 "streamURL": sa.stream.staged, 367 "streamURL": sa.stream.staged,
329 "indexURL": sa.index.staged, 368 "indexURL": sa.index.staged,
330 "dataURL": sa.data.staged, 369 "dataURL": sa.data.staged,
331 }.Debugf(c, "Staging log stream...") 370 }.Debugf(c, "Staging log stream...")
332 371
333 // Group any transient errors that occur during cleanup. If we aren't 372 // Group any transient errors that occur during cleanup. If we aren't
334 // returning a non-transient error, return a transient "terr". 373 // returning a non-transient error, return a transient "terr".
335 var terr errors.MultiError 374 var terr errors.MultiError
336 defer func() { 375 defer func() {
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
389 if dataWriter, err = createWriter(sa.data.staged); err != nil { 428 if dataWriter, err = createWriter(sa.data.staged); err != nil {
390 return err 429 return err
391 } 430 }
392 defer closeWriter(dataWriter, sa.data.staged) 431 defer closeWriter(dataWriter, sa.data.staged)
393 432
394 // Read our log entries from intermediate storage. 433 // Read our log entries from intermediate storage.
395 ss := storageSource{ 434 ss := storageSource{
396 Context: c, 435 Context: c,
397 st: sa.Storage, 436 st: sa.Storage,
398 path: sa.path, 437 path: sa.path,
399 contiguous: complete,
400 terminalIndex: types.MessageIndex(sa.terminalIndex), 438 terminalIndex: types.MessageIndex(sa.terminalIndex),
401 lastIndex: -1, 439 lastIndex: -1,
402 } 440 }
403 441
404 m := archive.Manifest{ 442 m := archive.Manifest{
405 Desc: &sa.desc, 443 Desc: &sa.desc,
406 Source: &ss, 444 Source: &ss,
407 LogWriter: streamWriter, 445 LogWriter: streamWriter,
408 IndexWriter: indexWriter, 446 IndexWriter: indexWriter,
409 DataWriter: dataWriter, 447 DataWriter: dataWriter,
410 StreamIndexRange: sa.StreamIndexRange, 448 StreamIndexRange: sa.StreamIndexRange,
411 PrefixIndexRange: sa.PrefixIndexRange, 449 PrefixIndexRange: sa.PrefixIndexRange,
412 ByteRange: sa.ByteRange, 450 ByteRange: sa.ByteRange,
413 451
414 Logger: log.Get(c), 452 Logger: log.Get(c),
415 } 453 }
416 if err = archive.Archive(m); err != nil { 454 if err = archive.Archive(m); err != nil {
417 log.WithError(err).Errorf(c, "Failed to archive log stream.") 455 log.WithError(err).Errorf(c, "Failed to archive log stream.")
418 return 456 return
419 } 457 }
420 458
421 » if tidx := sa.terminalIndex; tidx != int64(ss.lastIndex) { 459 » switch {
422 » » // Fail if we were requested to archive only the complete log. W e consider 460 » case ss.logEntryCount == 0:
423 » » // this a transient error with the expectation that the missing entries will 461 » » // If our last log index was <0, then no logs were archived.
424 » » // show up in future retries. 462 » » log.Warningf(c, "No log entries were archived.")
425 » » switch {
426 » » case complete && ss.hasMissingEntries:
427 » » » log.Errorf(c, "Log stream has missing entries, but compl eteness is required.")
428 » » » err = errors.WrapTransient(errors.New("stream has missin g entries"))
429 » » » return
430 463
431 » » case ss.logEntryCount == 0: 464 » default:
432 » » » // If our last log index was <0, then no logs were archi ved. 465 » » // Update our terminal index.
433 » » » log.Warningf(c, "No log entries were archived.") 466 » » log.Fields{
434 467 » » » "terminalIndex": ss.lastIndex,
435 » » default: 468 » » » "logEntryCount": ss.logEntryCount,
436 » » » // Update our terminal index. 469 » » }.Debugf(c, "Finished archiving log stream.")
437 » » » log.Fields{
438 » » » » "terminalIndex": ss.lastIndex,
439 » » » » "logEntryCount": ss.logEntryCount,
440 » » » » "hasMissingEntries": ss.hasMissingEntries,
441 » » » }.Debugf(c, "Finished archiving log stream.")
442 » » }
443 } 470 }
444 471
445 // Update our state with archival results. 472 // Update our state with archival results.
446 sa.terminalIndex = int64(ss.lastIndex) 473 sa.terminalIndex = int64(ss.lastIndex)
447 sa.logEntryCount = ss.logEntryCount 474 sa.logEntryCount = ss.logEntryCount
448 sa.stream.count = streamWriter.Count() 475 sa.stream.count = streamWriter.Count()
449 sa.index.count = indexWriter.Count() 476 sa.index.count = indexWriter.Count()
450 sa.data.count = dataWriter.Count() 477 sa.data.count = dataWriter.Count()
451 return 478 return
452 } 479 }
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
519 } 546 }
520 } 547 }
521 548
522 func (sa *stagedArchival) getStagingPaths() []*stagingPaths { 549 func (sa *stagedArchival) getStagingPaths() []*stagingPaths {
523 return []*stagingPaths{ 550 return []*stagingPaths{
524 &sa.stream, 551 &sa.stream,
525 &sa.index, 552 &sa.index,
526 &sa.data, 553 &sa.data,
527 } 554 }
528 } 555 }
OLDNEW
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698