| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "encoding/hex" | 9 "encoding/hex" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 165 age := ls.Age.Duration() | 165 age := ls.Age.Duration() |
| 166 if age < at.SettleDelay.Duration() { | 166 if age < at.SettleDelay.Duration() { |
| 167 log.Fields{ | 167 log.Fields{ |
| 168 "age": age, | 168 "age": age, |
| 169 "settleDelay": at.SettleDelay.Duration(), | 169 "settleDelay": at.SettleDelay.Duration(), |
| 170 }.Infof(c, "Log stream is younger than the settle delay. Returni
ng task to queue.") | 170 }.Infof(c, "Log stream is younger than the settle delay. Returni
ng task to queue.") |
| 171 return false, errors.New("log stream is within settle delay") | 171 return false, errors.New("log stream is within settle delay") |
| 172 } | 172 } |
| 173 | 173 |
| 174 // Are we required to archive a complete log stream? | 174 // Are we required to archive a complete log stream? |
| 175 » complete := (age <= at.CompletePeriod.Duration()) | 175 » if age <= at.CompletePeriod.Duration() { |
| 176 » if complete && ls.State.TerminalIndex < 0 { | 176 » » tidx := ls.State.TerminalIndex |
| 177 » » log.Warningf(c, "Cannot archive complete stream with no terminal
index.") | 177 |
| 178 » » return false, errors.New("completeness required, but stream has
no terminal index") | 178 » » if tidx < 0 { |
| 179 » » » log.Warningf(c, "Cannot archive complete stream with no
terminal index.") |
| 180 » » » return false, errors.New("completeness required, but str
eam has no terminal index") |
| 181 » » } |
| 182 |
| 183 » » // If we're requiring completeness, perform a keys-only scan of
intermediate |
| 184 » » // storage to ensure that we have all of the records before we b
other |
| 185 » » // streaming to storage only to find that we are missing data. |
| 186 » » if err := a.checkComplete(types.StreamPath(at.Path), types.Messa
geIndex(tidx)); err != nil { |
| 187 » » » return false, err |
| 188 » » } |
| 179 } | 189 } |
| 180 | 190 |
| 181 ar := logdog.ArchiveStreamRequest{ | 191 ar := logdog.ArchiveStreamRequest{ |
| 182 Path: at.Path, | 192 Path: at.Path, |
| 183 } | 193 } |
| 184 | 194 |
| 185 // Archive to staging. | 195 // Archive to staging. |
| 186 // | 196 // |
| 187 // If a non-transient failure occurs here, we will report it to the Arch
ivist | 197 // If a non-transient failure occurs here, we will report it to the Arch
ivist |
| 188 // under the assumption that it will continue occurring. | 198 // under the assumption that it will continue occurring. |
| 189 // | 199 // |
| 190 // We will handle error creating the plan and executing the plan in the
same | 200 // We will handle error creating the plan and executing the plan in the
same |
| 191 // switch statement below. | 201 // switch statement below. |
| 192 staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, ta
sk.UniqueID()) | 202 staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, ta
sk.UniqueID()) |
| 193 if err != nil { | 203 if err != nil { |
| 194 log.WithError(err).Errorf(c, "Failed to create staged archival p
lan.") | 204 log.WithError(err).Errorf(c, "Failed to create staged archival p
lan.") |
| 195 } else { | 205 } else { |
| 196 » » err = staged.stage(c, complete) | 206 » » err = staged.stage(c) |
| 197 } | 207 } |
| 198 | 208 |
| 199 switch { | 209 switch { |
| 200 case errors.IsTransient(err): | 210 case errors.IsTransient(err): |
| 201 // If this is a transient error, exit immediately and do not del
ete the | 211 // If this is a transient error, exit immediately and do not del
ete the |
| 202 // archival task. | 212 // archival task. |
| 203 log.WithError(err).Warningf(c, "TRANSIENT error during archival
operation.") | 213 log.WithError(err).Warningf(c, "TRANSIENT error during archival
operation.") |
| 204 return false, err | 214 return false, err |
| 205 | 215 |
| 206 case err != nil: | 216 case err != nil: |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 252 if _, err := a.Service.ArchiveStream(c, &ar); err != nil { | 262 if _, err := a.Service.ArchiveStream(c, &ar); err != nil { |
| 253 log.WithError(err).Errorf(c, "Failed to report archive state.") | 263 log.WithError(err).Errorf(c, "Failed to report archive state.") |
| 254 return false, err | 264 return false, err |
| 255 } | 265 } |
| 256 | 266 |
| 257 // Archival is complete and acknowledged by Coordinator. Consume the arc
hival | 267 // Archival is complete and acknowledged by Coordinator. Consume the arc
hival |
| 258 // task. | 268 // task. |
| 259 return true, nil | 269 return true, nil |
| 260 } | 270 } |
| 261 | 271 |
| 272 // checkComplete performs a quick scan of intermediate storage to ensure that |
| 273 // all of the log stream's records are available. |
| 274 func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex
) error { |
| 275 sreq := storage.GetRequest{ |
| 276 Path: path, |
| 277 KeysOnly: true, |
| 278 } |
| 279 |
| 280 nextIndex := types.MessageIndex(0) |
| 281 var ierr error |
| 282 err := a.Storage.Get(sreq, func(idx types.MessageIndex, d []byte) bool { |
| 283 switch { |
| 284 case idx != nextIndex: |
| 285 ierr = fmt.Errorf("missing log entry index %d (next %d)"
, nextIndex, idx) |
| 286 return false |
| 287 |
| 288 case idx == tidx: |
| 289 // We have hit our terminal index, so all of the log dat
a is here! |
| 290 return false |
| 291 |
| 292 default: |
| 293 nextIndex++ |
| 294 return true |
| 295 } |
| 296 }) |
| 297 if ierr != nil { |
| 298 return ierr |
| 299 } |
| 300 if err != nil { |
| 301 return err |
| 302 } |
| 303 return nil |
| 304 } |
| 305 |
| 262 func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath,
ls *logdog.LoadStreamResponse, uid string) ( | 306 func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath,
ls *logdog.LoadStreamResponse, uid string) ( |
| 263 *stagedArchival, error) { | 307 *stagedArchival, error) { |
| 264 sa := stagedArchival{ | 308 sa := stagedArchival{ |
| 265 Archivist: a, | 309 Archivist: a, |
| 266 path: path, | 310 path: path, |
| 267 | 311 |
| 268 terminalIndex: ls.State.TerminalIndex, | 312 terminalIndex: ls.State.TerminalIndex, |
| 269 } | 313 } |
| 270 | 314 |
| 271 // Deserialize and validate the descriptor protobuf. If this fails, it i
s a | 315 // Deserialize and validate the descriptor protobuf. If this fails, it i
s a |
| (...skipping 27 matching lines...) Expand all Loading... |
| 299 final: a.GSBase.Concat(string(path), name), | 343 final: a.GSBase.Concat(string(path), name), |
| 300 } | 344 } |
| 301 } | 345 } |
| 302 | 346 |
| 303 type stagedArchival struct { | 347 type stagedArchival struct { |
| 304 *Archivist | 348 *Archivist |
| 305 | 349 |
| 306 path types.StreamPath | 350 path types.StreamPath |
| 307 desc logpb.LogStreamDescriptor | 351 desc logpb.LogStreamDescriptor |
| 308 | 352 |
| 309 » stream stagingPaths | 353 » stream stagingPaths |
| 310 » streamSize int64 | 354 » index stagingPaths |
| 311 | 355 » data stagingPaths |
| 312 » index stagingPaths | |
| 313 » indexSize int64 | |
| 314 | |
| 315 » data stagingPaths | |
| 316 » dataSize int64 | |
| 317 | 356 |
| 318 finalized bool | 357 finalized bool |
| 319 terminalIndex int64 | 358 terminalIndex int64 |
| 320 logEntryCount int64 | 359 logEntryCount int64 |
| 321 } | 360 } |
| 322 | 361 |
| 323 // stage executes the archival process, archiving to the staged storage paths. | 362 // stage executes the archival process, archiving to the staged storage paths. |
| 324 // | 363 // |
| 325 // If stage fails, it may return a transient error. | 364 // If stage fails, it may return a transient error. |
| 326 func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) { | 365 func (sa *stagedArchival) stage(c context.Context) (err error) { |
| 327 log.Fields{ | 366 log.Fields{ |
| 328 "streamURL": sa.stream.staged, | 367 "streamURL": sa.stream.staged, |
| 329 "indexURL": sa.index.staged, | 368 "indexURL": sa.index.staged, |
| 330 "dataURL": sa.data.staged, | 369 "dataURL": sa.data.staged, |
| 331 }.Debugf(c, "Staging log stream...") | 370 }.Debugf(c, "Staging log stream...") |
| 332 | 371 |
| 333 // Group any transient errors that occur during cleanup. If we aren't | 372 // Group any transient errors that occur during cleanup. If we aren't |
| 334 // returning a non-transient error, return a transient "terr". | 373 // returning a non-transient error, return a transient "terr". |
| 335 var terr errors.MultiError | 374 var terr errors.MultiError |
| 336 defer func() { | 375 defer func() { |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 389 if dataWriter, err = createWriter(sa.data.staged); err != nil { | 428 if dataWriter, err = createWriter(sa.data.staged); err != nil { |
| 390 return err | 429 return err |
| 391 } | 430 } |
| 392 defer closeWriter(dataWriter, sa.data.staged) | 431 defer closeWriter(dataWriter, sa.data.staged) |
| 393 | 432 |
| 394 // Read our log entries from intermediate storage. | 433 // Read our log entries from intermediate storage. |
| 395 ss := storageSource{ | 434 ss := storageSource{ |
| 396 Context: c, | 435 Context: c, |
| 397 st: sa.Storage, | 436 st: sa.Storage, |
| 398 path: sa.path, | 437 path: sa.path, |
| 399 contiguous: complete, | |
| 400 terminalIndex: types.MessageIndex(sa.terminalIndex), | 438 terminalIndex: types.MessageIndex(sa.terminalIndex), |
| 401 lastIndex: -1, | 439 lastIndex: -1, |
| 402 } | 440 } |
| 403 | 441 |
| 404 m := archive.Manifest{ | 442 m := archive.Manifest{ |
| 405 Desc: &sa.desc, | 443 Desc: &sa.desc, |
| 406 Source: &ss, | 444 Source: &ss, |
| 407 LogWriter: streamWriter, | 445 LogWriter: streamWriter, |
| 408 IndexWriter: indexWriter, | 446 IndexWriter: indexWriter, |
| 409 DataWriter: dataWriter, | 447 DataWriter: dataWriter, |
| 410 StreamIndexRange: sa.StreamIndexRange, | 448 StreamIndexRange: sa.StreamIndexRange, |
| 411 PrefixIndexRange: sa.PrefixIndexRange, | 449 PrefixIndexRange: sa.PrefixIndexRange, |
| 412 ByteRange: sa.ByteRange, | 450 ByteRange: sa.ByteRange, |
| 413 | 451 |
| 414 Logger: log.Get(c), | 452 Logger: log.Get(c), |
| 415 } | 453 } |
| 416 if err = archive.Archive(m); err != nil { | 454 if err = archive.Archive(m); err != nil { |
| 417 log.WithError(err).Errorf(c, "Failed to archive log stream.") | 455 log.WithError(err).Errorf(c, "Failed to archive log stream.") |
| 418 return | 456 return |
| 419 } | 457 } |
| 420 | 458 |
| 421 » if tidx := sa.terminalIndex; tidx != int64(ss.lastIndex) { | 459 » switch { |
| 422 » » // Fail if we were requested to archive only the complete log. W
e consider | 460 » case ss.logEntryCount == 0: |
| 423 » » // this a transient error with the expectation that the missing
entries will | 461 » » // If our last log index was <0, then no logs were archived. |
| 424 » » // show up in future retries. | 462 » » log.Warningf(c, "No log entries were archived.") |
| 425 » » switch { | |
| 426 » » case complete && ss.hasMissingEntries: | |
| 427 » » » log.Errorf(c, "Log stream has missing entries, but compl
eteness is required.") | |
| 428 » » » err = errors.WrapTransient(errors.New("stream has missin
g entries")) | |
| 429 » » » return | |
| 430 | 463 |
| 431 » » case ss.logEntryCount == 0: | 464 » default: |
| 432 » » » // If our last log index was <0, then no logs were archi
ved. | 465 » » // Update our terminal index. |
| 433 » » » log.Warningf(c, "No log entries were archived.") | 466 » » log.Fields{ |
| 434 | 467 » » » "terminalIndex": ss.lastIndex, |
| 435 » » default: | 468 » » » "logEntryCount": ss.logEntryCount, |
| 436 » » » // Update our terminal index. | 469 » » }.Debugf(c, "Finished archiving log stream.") |
| 437 » » » log.Fields{ | |
| 438 » » » » "terminalIndex": ss.lastIndex, | |
| 439 » » » » "logEntryCount": ss.logEntryCount, | |
| 440 » » » » "hasMissingEntries": ss.hasMissingEntries, | |
| 441 » » » }.Debugf(c, "Finished archiving log stream.") | |
| 442 » » } | |
| 443 } | 470 } |
| 444 | 471 |
| 445 // Update our state with archival results. | 472 // Update our state with archival results. |
| 446 sa.terminalIndex = int64(ss.lastIndex) | 473 sa.terminalIndex = int64(ss.lastIndex) |
| 447 sa.logEntryCount = ss.logEntryCount | 474 sa.logEntryCount = ss.logEntryCount |
| 448 sa.stream.count = streamWriter.Count() | 475 sa.stream.count = streamWriter.Count() |
| 449 sa.index.count = indexWriter.Count() | 476 sa.index.count = indexWriter.Count() |
| 450 sa.data.count = dataWriter.Count() | 477 sa.data.count = dataWriter.Count() |
| 451 return | 478 return |
| 452 } | 479 } |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 519 } | 546 } |
| 520 } | 547 } |
| 521 | 548 |
| 522 func (sa *stagedArchival) getStagingPaths() []*stagingPaths { | 549 func (sa *stagedArchival) getStagingPaths() []*stagingPaths { |
| 523 return []*stagingPaths{ | 550 return []*stagingPaths{ |
| 524 &sa.stream, | 551 &sa.stream, |
| 525 &sa.index, | 552 &sa.index, |
| 526 &sa.data, | 553 &sa.data, |
| 527 } | 554 } |
| 528 } | 555 } |
| OLD | NEW |