Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "encoding/hex" | 9 "encoding/hex" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 97 // consumed. This may be called multiple times with no additional effect . | 97 // consumed. This may be called multiple times with no additional effect . |
| 98 Consume() | 98 Consume() |
| 99 | 99 |
| 100 // AssertLease asserts that the lease for this Task is still held. | 100 // AssertLease asserts that the lease for this Task is still held. |
| 101 // | 101 // |
| 102 // On failure, it will return an error. If successful, the Archivist may | 102 // On failure, it will return an error. If successful, the Archivist may |
| 103 // assume that it holds the lease longer. | 103 // assume that it holds the lease longer. |
| 104 AssertLease(context.Context) error | 104 AssertLease(context.Context) error |
| 105 } | 105 } |
| 106 | 106 |
| 107 // Settings retrieve the settings for an archival. | |
|
nodir
2016/05/19 15:56:29
s/retrieve/store
SettingsLoader retrieves them; Se
dnj (Google)
2016/05/19 16:34:51
Done.
| |
| 108 type Settings struct { | |
| 109 // GSBase is the base Google Storage path. This includes the bucket name | |
| 110 // and any associated path. | |
| 111 GSBase gs.Path | |
| 112 // GSStagingBase is the base Google Storage path for archive staging. Th is | |
| 113 // includes the bucket name and any associated path. | |
| 114 GSStagingBase gs.Path | |
|
nodir
2016/05/19 15:56:29
rebase?
dnj (Google)
2016/05/19 16:34:51
Acknowledged.
| |
| 115 | |
| 116 // AlwaysCreateBinary if true, means that a binary should be archived | |
|
nodir
2016/05/19 15:56:29
nit: comma before if?
dnj (Google)
2016/05/19 16:34:51
Done.
| |
| 117 // regardless of whether a specific binary file extension has been suppl ied | |
| 118 // with the log stream. | |
| 119 AlwaysCreateBinary bool | |
| 120 | |
| 121 // IndexStreamRange is the maximum number of stream indexes in between i ndex | |
| 122 // entries. See archive.Manifest for more information. | |
| 123 IndexStreamRange int | |
| 124 // IndexPrefixRange is the maximum number of prefix indexes in between i ndex | |
| 125 // entries. See archive.Manifest for more information. | |
| 126 IndexPrefixRange int | |
| 127 // IndexByteRange is the maximum number of stream data bytes in between index | |
| 128 // entries. See archive.Manifest for more information. | |
| 129 IndexByteRange int | |
| 130 } | |
| 131 | |
| 132 // SettingsLoader returns archival Settings for a given project. | |
| 133 type SettingsLoader func(context.Context, config.ProjectName) (*Settings, error) | |
| 134 | |
| 107 // Archivist is a stateless configuration capable of archiving individual log | 135 // Archivist is a stateless configuration capable of archiving individual log |
| 108 // streams. | 136 // streams. |
| 109 type Archivist struct { | 137 type Archivist struct { |
| 110 // Service is the client to use to communicate with Coordinator's Servic es | 138 // Service is the client to use to communicate with Coordinator's Servic es |
| 111 // endpoint. | 139 // endpoint. |
| 112 Service logdog.ServicesClient | 140 Service logdog.ServicesClient |
| 113 | 141 |
| 142 // SettingsLoader loads archival settings for a specific project. | |
| 143 SettingsLoader SettingsLoader | |
| 144 | |
| 114 // Storage is the archival source Storage instance. | 145 // Storage is the archival source Storage instance. |
| 115 Storage storage.Storage | 146 Storage storage.Storage |
| 116 // GSClient is the Google Storage client to for archive generation. | 147 // GSClient is the Google Storage client to for archive generation. |
| 117 GSClient gs.Client | 148 GSClient gs.Client |
| 118 | |
| 119 // GSBase is the base Google Storage path. This includes the bucket name | |
| 120 // and any associated path. | |
| 121 GSBase gs.Path | |
| 122 // GSStagingBase is the base Google Storage path for archive staging. Th is | |
| 123 // includes the bucket name and any associated path. | |
| 124 GSStagingBase gs.Path | |
| 125 | |
| 126 // PrefixIndexRange is the maximum number of stream indexes in between i ndex | |
| 127 // entries. See archive.Manifest for more information. | |
| 128 StreamIndexRange int | |
| 129 // PrefixIndexRange is the maximum number of prefix indexes in between i ndex | |
| 130 // entries. See archive.Manifest for more information. | |
| 131 PrefixIndexRange int | |
| 132 // ByteRange is the maximum number of stream data bytes in between index | |
| 133 // entries. See archive.Manifest for more information. | |
| 134 ByteRange int | |
| 135 } | 149 } |
| 136 | 150 |
| 137 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used | 151 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used |
| 138 // to during archival. This should be greater than the maximum LogEntry size. | 152 // to during archival. This should be greater than the maximum LogEntry size. |
| 139 const storageBufferSize = types.MaxLogEntryDataSize * 64 | 153 const storageBufferSize = types.MaxLogEntryDataSize * 64 |
| 140 | 154 |
| 141 // ArchiveTask processes and executes a single log stream archive task. | 155 // ArchiveTask processes and executes a single log stream archive task. |
| 142 // | 156 // |
| 143 // During processing, the Task's Consume method may be called to indicate that | 157 // During processing, the Task's Consume method may be called to indicate that |
| 144 // it should be consumed. | 158 // it should be consumed. |
| (...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 245 // retry later. | 259 // retry later. |
| 246 age := ls.Age.Duration() | 260 age := ls.Age.Duration() |
| 247 if age < at.SettleDelay.Duration() { | 261 if age < at.SettleDelay.Duration() { |
| 248 log.Fields{ | 262 log.Fields{ |
| 249 "age": age, | 263 "age": age, |
| 250 "settleDelay": at.SettleDelay.Duration(), | 264 "settleDelay": at.SettleDelay.Duration(), |
| 251 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.") | 265 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.") |
| 252 return statusErr(errors.New("log stream is within settle delay") ) | 266 return statusErr(errors.New("log stream is within settle delay") ) |
| 253 } | 267 } |
| 254 | 268 |
| 269 // Load archival settings for this project. | |
| 270 settings, err := a.loadSettings(c, config.ProjectName(at.Project)) | |
| 271 if err != nil { | |
| 272 log.Fields{ | |
| 273 log.ErrorKey: err, | |
| 274 "project": at.Project, | |
| 275 }.Errorf(c, "Failed to load settings for project.") | |
| 276 return err | |
| 277 } | |
| 278 | |
| 255 ar := logdog.ArchiveStreamRequest{ | 279 ar := logdog.ArchiveStreamRequest{ |
| 256 Project: at.Project, | 280 Project: at.Project, |
| 257 Id: at.Id, | 281 Id: at.Id, |
| 258 } | 282 } |
| 259 | 283 |
| 260 // Build our staged archival plan. This doesn't actually do any archivin g. | 284 // Build our staged archival plan. This doesn't actually do any archivin g. |
| 261 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), l s, task.UniqueID()) | 285 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), s ettings, ls, task.UniqueID()) |
| 262 if err != nil { | 286 if err != nil { |
| 263 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") | 287 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") |
| 264 return err | 288 return err |
| 265 } | 289 } |
| 266 | 290 |
| 267 // Are we required to archive a complete log stream? | 291 // Are we required to archive a complete log stream? |
| 268 if age <= at.CompletePeriod.Duration() { | 292 if age <= at.CompletePeriod.Duration() { |
| 269 // If we're requiring completeness, perform a keys-only scan of intermediate | 293 // If we're requiring completeness, perform a keys-only scan of intermediate |
| 270 // storage to ensure that we have all of the records before we b other | 294 // storage to ensure that we have all of the records before we b other |
| 271 // streaming to storage only to find that we are missing data. | 295 // streaming to storage only to find that we are missing data. |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 348 log.WithError(err).Errorf(c, "Failed to report archive state.") | 372 log.WithError(err).Errorf(c, "Failed to report archive state.") |
| 349 return err | 373 return err |
| 350 } | 374 } |
| 351 | 375 |
| 352 // Archival is complete and acknowledged by Coordinator. Consume the arc hival | 376 // Archival is complete and acknowledged by Coordinator. Consume the arc hival |
| 353 // task. | 377 // task. |
| 354 task.Consume() | 378 task.Consume() |
| 355 return nil | 379 return nil |
| 356 } | 380 } |
| 357 | 381 |
| 382 // loadSettings loads and validates archival settings. | |
| 383 func (a *Archivist) loadSettings(c context.Context, project config.ProjectName) (*Settings, error) { | |
| 384 if a.SettingsLoader == nil { | |
| 385 panic("no settings loader configured") | |
|
nodir
2016/05/19 15:56:29
I am glad to see you didn't ban panics altogether
dnj (Google)
2016/05/19 16:34:51
IMO panics have their place. I think explicit erro
| |
| 386 } | |
| 387 | |
| 388 st, err := a.SettingsLoader(c, project) | |
| 389 switch { | |
| 390 case err != nil: | |
| 391 return nil, err | |
| 392 | |
| 393 case st.GSBase.Bucket() == "": | |
| 394 log.Fields{ | |
| 395 log.ErrorKey: err, | |
| 396 "gsBase": st.GSBase, | |
| 397 }.Errorf(c, "Invalid storage base.") | |
| 398 return nil, errors.New("invalid storage base") | |
| 399 | |
| 400 case st.GSStagingBase.Bucket() == "": | |
| 401 log.Fields{ | |
| 402 log.ErrorKey: err, | |
| 403 "gsStagingBase": st.GSStagingBase, | |
| 404 }.Errorf(c, "Invalid storage staging base.") | |
| 405 return nil, errors.New("invalid storage staging base") | |
| 406 | |
| 407 default: | |
| 408 return st, nil | |
| 409 } | |
| 410 } | |
| 411 | |
| 358 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name, | 412 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name, |
| 359 » ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { | 413 » st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva l, error) { |
| 360 sa := stagedArchival{ | 414 sa := stagedArchival{ |
| 361 Archivist: a, | 415 Archivist: a, |
| 416 Settings: st, | |
| 362 project: project, | 417 project: project, |
| 363 | 418 |
| 364 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), | 419 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), |
| 365 } | 420 } |
| 366 | 421 |
| 367 // Deserialize and validate the descriptor protobuf. If this fails, it i s a | 422 // Deserialize and validate the descriptor protobuf. If this fails, it i s a |
| 368 // non-transient error. | 423 // non-transient error. |
| 369 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { | 424 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { |
| 370 log.Fields{ | 425 log.Fields{ |
| 371 log.ErrorKey: err, | 426 log.ErrorKey: err, |
| (...skipping 10 matching lines...) Expand all Loading... | |
| 382 | 437 |
| 383 // Construct our staged archival paths. | 438 // Construct our staged archival paths. |
| 384 sa.stream = sa.makeStagingPaths("logstream.entries", uid) | 439 sa.stream = sa.makeStagingPaths("logstream.entries", uid) |
| 385 sa.index = sa.makeStagingPaths("logstream.index", uid) | 440 sa.index = sa.makeStagingPaths("logstream.index", uid) |
| 386 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) | 441 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) |
| 387 return &sa, nil | 442 return &sa, nil |
| 388 } | 443 } |
| 389 | 444 |
| 390 type stagedArchival struct { | 445 type stagedArchival struct { |
| 391 *Archivist | 446 *Archivist |
| 447 *Settings | |
| 392 | 448 |
| 393 project config.ProjectName | 449 project config.ProjectName |
| 394 path types.StreamPath | 450 path types.StreamPath |
| 395 desc logpb.LogStreamDescriptor | 451 desc logpb.LogStreamDescriptor |
| 396 | 452 |
| 397 stream stagingPaths | 453 stream stagingPaths |
| 398 index stagingPaths | 454 index stagingPaths |
| 399 data stagingPaths | 455 data stagingPaths |
| 400 | 456 |
| 401 finalized bool | 457 finalized bool |
| 402 terminalIndex types.MessageIndex | 458 terminalIndex types.MessageIndex |
| 403 logEntryCount int64 | 459 logEntryCount int64 |
| 404 } | 460 } |
| 405 | 461 |
| 406 // makeStagingPaths returns a stagingPaths instance for the given path and | 462 // makeStagingPaths returns a stagingPaths instance for the given path and |
| 407 // file name. It incorporates a unique ID into the staging name to differentiate | 463 // file name. It incorporates a unique ID into the staging name to differentiate |
| 408 // it from other staging paths for the same path/name. | 464 // it from other staging paths for the same path/name. |
| 409 func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths { | 465 func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths { |
| 410 » // TODO(dnj): This won't be necessary when empty project is invalid. | 466 » proj := string(sa.project) |
| 411 » project := string(sa.project) | 467 » // TODO (dnj): When empty projects are disallowed, remove this. |
| 412 » if project == "" { | 468 » if proj == "" { |
| 413 » » project = "_" | 469 » » proj = "_" |
| 414 } | 470 } |
| 415 | 471 |
| 472 // Either of these paths may be shared between projects. To enforce | |
| 473 // an absence of conflicts, we will insert the project name as part of t he | |
| 474 // path. | |
| 416 return stagingPaths{ | 475 return stagingPaths{ |
| 417 » » staged: sa.GSStagingBase.Concat(project, string(sa.path), uid, n ame), | 476 » » staged: sa.GSStagingBase.Concat(proj, string(sa.path), uid, name ), |
| 418 » » final: sa.GSBase.Concat(project, string(sa.path), name), | 477 » » final: sa.GSBase.Concat(proj, string(sa.path), name), |
| 419 } | 478 } |
| 420 } | 479 } |
| 421 | 480 |
| 422 // checkComplete performs a quick scan of intermediate storage to ensure that | 481 // checkComplete performs a quick scan of intermediate storage to ensure that |
| 423 // all of the log stream's records are available. | 482 // all of the log stream's records are available. |
| 424 func (sa *stagedArchival) checkComplete(c context.Context) error { | 483 func (sa *stagedArchival) checkComplete(c context.Context) error { |
| 425 if sa.terminalIndex < 0 { | 484 if sa.terminalIndex < 0 { |
| 426 log.Warningf(c, "Cannot archive complete stream with no terminal index.") | 485 log.Warningf(c, "Cannot archive complete stream with no terminal index.") |
| 427 return statusErr(errors.New("completeness required, but stream h as no terminal index")) | 486 return statusErr(errors.New("completeness required, but stream h as no terminal index")) |
| 428 } | 487 } |
| (...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 539 terminalIndex: sa.terminalIndex, | 598 terminalIndex: sa.terminalIndex, |
| 540 lastIndex: -1, | 599 lastIndex: -1, |
| 541 } | 600 } |
| 542 | 601 |
| 543 m := archive.Manifest{ | 602 m := archive.Manifest{ |
| 544 Desc: &sa.desc, | 603 Desc: &sa.desc, |
| 545 Source: &ss, | 604 Source: &ss, |
| 546 LogWriter: streamWriter, | 605 LogWriter: streamWriter, |
| 547 IndexWriter: indexWriter, | 606 IndexWriter: indexWriter, |
| 548 DataWriter: dataWriter, | 607 DataWriter: dataWriter, |
| 549 » » StreamIndexRange: sa.StreamIndexRange, | 608 » » StreamIndexRange: sa.IndexStreamRange, |
| 550 » » PrefixIndexRange: sa.PrefixIndexRange, | 609 » » PrefixIndexRange: sa.IndexPrefixRange, |
| 551 » » ByteRange: sa.ByteRange, | 610 » » ByteRange: sa.IndexByteRange, |
| 552 | 611 |
| 553 Logger: log.Get(c), | 612 Logger: log.Get(c), |
| 554 } | 613 } |
| 555 if err = archive.Archive(m); err != nil { | 614 if err = archive.Archive(m); err != nil { |
| 556 log.WithError(err).Errorf(c, "Failed to archive log stream.") | 615 log.WithError(err).Errorf(c, "Failed to archive log stream.") |
| 557 return | 616 return |
| 558 } | 617 } |
| 559 | 618 |
| 560 switch { | 619 switch { |
| 561 case ss.logEntryCount == 0: | 620 case ss.logEntryCount == 0: |
| (...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 687 return e.inner | 746 return e.inner |
| 688 } | 747 } |
| 689 | 748 |
| 690 func isFailure(err error) bool { | 749 func isFailure(err error) bool { |
| 691 if err == nil { | 750 if err == nil { |
| 692 return false | 751 return false |
| 693 } | 752 } |
| 694 _, ok := err.(*statusErrorWrapper) | 753 _, ok := err.(*statusErrorWrapper) |
| 695 return !ok | 754 return !ok |
| 696 } | 755 } |
| OLD | NEW |