Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "encoding/hex" | 9 "encoding/hex" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 97 // consumed. This may be called multiple times with no additional effect . | 97 // consumed. This may be called multiple times with no additional effect . |
| 98 Consume() | 98 Consume() |
| 99 | 99 |
| 100 // AssertLease asserts that the lease for this Task is still held. | 100 // AssertLease asserts that the lease for this Task is still held. |
| 101 // | 101 // |
| 102 // On failure, it will return an error. If successful, the Archivist may | 102 // On failure, it will return an error. If successful, the Archivist may |
| 103 // assume that it holds the lease longer. | 103 // assume that it holds the lease longer. |
| 104 AssertLease(context.Context) error | 104 AssertLease(context.Context) error |
| 105 } | 105 } |
| 106 | 106 |
| 107 // Settings defines the archival parameters for a specific archival operation. | |
| 108 // | |
| 109 // In practice, this will be formed from service and project settings. | |
| 110 type Settings struct { | |
| 111 // GSBase is the base Google Storage path. This includes the bucket name | |
| 112 // and any associated path. | |
| 113 // | |
| 114 // This must be unique to this archival project. In practice, it will be | |
| 115 // composed of the project's archival bucket and project ID. | |
| 116 GSBase gs.Path | |
| 117 // GSStagingBase is the base Google Storage path for archive staging. Th is | |
| 118 // includes the bucket name and any associated path. | |
| 119 // | |
| 120 // This must be unique to this archival project. In practice, it will be | |
| 121 // composed of the project's staging archival bucket and project ID. | |
| 122 GSStagingBase gs.Path | |
| 123 | |
| 124 // AlwaysRender, if true, means that a binary should be archived | |
| 125 // regardless of whether a specific binary file extension has been suppl ied | |
| 126 // with the log stream. | |
| 127 AlwaysRender bool | |
| 128 | |
| 129 // IndexStreamRange is the maximum number of stream indexes in between i ndex | |
|
nodir
2016/05/19 17:27:49
stream entries? below too
dnj (Google)
2016/05/19 22:53:55
Index entries. So basically all three of these det
| |
| 130 // entries. See archive.Manifest for more information. | |
| 131 IndexStreamRange int | |
| 132 // IndexPrefixRange is the maximum number of prefix indexes in between i ndex | |
| 133 // entries. See archive.Manifest for more information. | |
| 134 IndexPrefixRange int | |
| 135 // IndexByteRange is the maximum number of stream data bytes in between index | |
| 136 // entries. See archive.Manifest for more information. | |
| 137 IndexByteRange int | |
| 138 } | |
| 139 | |
| 140 // SettingsLoader returns archival Settings for a given project. | |
| 141 type SettingsLoader func(context.Context, config.ProjectName) (*Settings, error) | |
| 142 | |
| 107 // Archivist is a stateless configuration capable of archiving individual log | 143 // Archivist is a stateless configuration capable of archiving individual log |
| 108 // streams. | 144 // streams. |
| 109 type Archivist struct { | 145 type Archivist struct { |
| 110 // Service is the client to use to communicate with Coordinator's Servic es | 146 // Service is the client to use to communicate with Coordinator's Servic es |
| 111 // endpoint. | 147 // endpoint. |
| 112 Service logdog.ServicesClient | 148 Service logdog.ServicesClient |
| 113 | 149 |
| 150 // SettingsLoader loads archival settings for a specific project. | |
| 151 SettingsLoader SettingsLoader | |
| 152 | |
| 114 // Storage is the archival source Storage instance. | 153 // Storage is the archival source Storage instance. |
| 115 Storage storage.Storage | 154 Storage storage.Storage |
| 116 // GSClient is the Google Storage client to for archive generation. | 155 // GSClient is the Google Storage client to for archive generation. |
| 117 GSClient gs.Client | 156 GSClient gs.Client |
| 118 | |
| 119 // GSBase is the base Google Storage path. This includes the bucket name | |
| 120 // and any associated path. | |
| 121 GSBase gs.Path | |
| 122 // GSStagingBase is the base Google Storage path for archive staging. Th is | |
| 123 // includes the bucket name and any associated path. | |
| 124 GSStagingBase gs.Path | |
| 125 | |
| 126 // PrefixIndexRange is the maximum number of stream indexes in between i ndex | |
| 127 // entries. See archive.Manifest for more information. | |
| 128 StreamIndexRange int | |
| 129 // PrefixIndexRange is the maximum number of prefix indexes in between i ndex | |
| 130 // entries. See archive.Manifest for more information. | |
| 131 PrefixIndexRange int | |
| 132 // ByteRange is the maximum number of stream data bytes in between index | |
| 133 // entries. See archive.Manifest for more information. | |
| 134 ByteRange int | |
| 135 } | 157 } |
| 136 | 158 |
| 137 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used | 159 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used |
| 138 // to during archival. This should be greater than the maximum LogEntry size. | 160 // to during archival. This should be greater than the maximum LogEntry size. |
| 139 const storageBufferSize = types.MaxLogEntryDataSize * 64 | 161 const storageBufferSize = types.MaxLogEntryDataSize * 64 |
| 140 | 162 |
| 141 // ArchiveTask processes and executes a single log stream archive task. | 163 // ArchiveTask processes and executes a single log stream archive task. |
| 142 // | 164 // |
| 143 // During processing, the Task's Consume method may be called to indicate that | 165 // During processing, the Task's Consume method may be called to indicate that |
| 144 // it should be consumed. | 166 // it should be consumed. |
| (...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 245 // retry later. | 267 // retry later. |
| 246 age := ls.Age.Duration() | 268 age := ls.Age.Duration() |
| 247 if age < at.SettleDelay.Duration() { | 269 if age < at.SettleDelay.Duration() { |
| 248 log.Fields{ | 270 log.Fields{ |
| 249 "age": age, | 271 "age": age, |
| 250 "settleDelay": at.SettleDelay.Duration(), | 272 "settleDelay": at.SettleDelay.Duration(), |
| 251 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.") | 273 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.") |
| 252 return statusErr(errors.New("log stream is within settle delay") ) | 274 return statusErr(errors.New("log stream is within settle delay") ) |
| 253 } | 275 } |
| 254 | 276 |
| 277 // Load archival settings for this project. | |
| 278 settings, err := a.loadSettings(c, config.ProjectName(at.Project)) | |
| 279 if err != nil { | |
| 280 log.Fields{ | |
| 281 log.ErrorKey: err, | |
| 282 "project": at.Project, | |
| 283 }.Errorf(c, "Failed to load settings for project.") | |
| 284 return err | |
| 285 } | |
| 286 | |
| 255 ar := logdog.ArchiveStreamRequest{ | 287 ar := logdog.ArchiveStreamRequest{ |
| 256 Project: at.Project, | 288 Project: at.Project, |
| 257 Id: at.Id, | 289 Id: at.Id, |
| 258 } | 290 } |
| 259 | 291 |
| 260 // Build our staged archival plan. This doesn't actually do any archivin g. | 292 // Build our staged archival plan. This doesn't actually do any archivin g. |
| 261 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), l s, task.UniqueID()) | 293 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), s ettings, ls, task.UniqueID()) |
| 262 if err != nil { | 294 if err != nil { |
| 263 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") | 295 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") |
| 264 return err | 296 return err |
| 265 } | 297 } |
| 266 | 298 |
| 267 // Are we required to archive a complete log stream? | 299 // Are we required to archive a complete log stream? |
| 268 if age <= at.CompletePeriod.Duration() { | 300 if age <= at.CompletePeriod.Duration() { |
| 269 // If we're requiring completeness, perform a keys-only scan of intermediate | 301 // If we're requiring completeness, perform a keys-only scan of intermediate |
| 270 // storage to ensure that we have all of the records before we b other | 302 // storage to ensure that we have all of the records before we b other |
| 271 // streaming to storage only to find that we are missing data. | 303 // streaming to storage only to find that we are missing data. |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 348 log.WithError(err).Errorf(c, "Failed to report archive state.") | 380 log.WithError(err).Errorf(c, "Failed to report archive state.") |
| 349 return err | 381 return err |
| 350 } | 382 } |
| 351 | 383 |
| 352 // Archival is complete and acknowledged by Coordinator. Consume the arc hival | 384 // Archival is complete and acknowledged by Coordinator. Consume the arc hival |
| 353 // task. | 385 // task. |
| 354 task.Consume() | 386 task.Consume() |
| 355 return nil | 387 return nil |
| 356 } | 388 } |
| 357 | 389 |
| 390 // loadSettings loads and validates archival settings. | |
| 391 func (a *Archivist) loadSettings(c context.Context, project config.ProjectName) (*Settings, error) { | |
| 392 if a.SettingsLoader == nil { | |
| 393 panic("no settings loader configured") | |
| 394 } | |
| 395 | |
| 396 st, err := a.SettingsLoader(c, project) | |
| 397 switch { | |
| 398 case err != nil: | |
| 399 return nil, err | |
| 400 | |
| 401 case st.GSBase.Bucket() == "": | |
| 402 log.Fields{ | |
| 403 log.ErrorKey: err, | |
| 404 "gsBase": st.GSBase, | |
| 405 }.Errorf(c, "Invalid storage base.") | |
| 406 return nil, errors.New("invalid storage base") | |
| 407 | |
| 408 case st.GSStagingBase.Bucket() == "": | |
| 409 log.Fields{ | |
| 410 log.ErrorKey: err, | |
| 411 "gsStagingBase": st.GSStagingBase, | |
| 412 }.Errorf(c, "Invalid storage staging base.") | |
| 413 return nil, errors.New("invalid storage staging base") | |
| 414 | |
| 415 default: | |
| 416 return st, nil | |
| 417 } | |
| 418 } | |
| 419 | |
| 358 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name, | 420 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name, |
| 359 » ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { | 421 » st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva l, error) { |
| 360 sa := stagedArchival{ | 422 sa := stagedArchival{ |
| 361 Archivist: a, | 423 Archivist: a, |
| 424 Settings: st, | |
| 362 project: project, | 425 project: project, |
| 363 | 426 |
| 364 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), | 427 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), |
| 365 } | 428 } |
| 366 | 429 |
| 367 // Deserialize and validate the descriptor protobuf. If this fails, it i s a | 430 // Deserialize and validate the descriptor protobuf. If this fails, it i s a |
| 368 // non-transient error. | 431 // non-transient error. |
| 369 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { | 432 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { |
| 370 log.Fields{ | 433 log.Fields{ |
| 371 log.ErrorKey: err, | 434 log.ErrorKey: err, |
| (...skipping 10 matching lines...) Expand all Loading... | |
| 382 | 445 |
| 383 // Construct our staged archival paths. | 446 // Construct our staged archival paths. |
| 384 sa.stream = sa.makeStagingPaths("logstream.entries", uid) | 447 sa.stream = sa.makeStagingPaths("logstream.entries", uid) |
| 385 sa.index = sa.makeStagingPaths("logstream.index", uid) | 448 sa.index = sa.makeStagingPaths("logstream.index", uid) |
| 386 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) | 449 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) |
| 387 return &sa, nil | 450 return &sa, nil |
| 388 } | 451 } |
| 389 | 452 |
| 390 type stagedArchival struct { | 453 type stagedArchival struct { |
| 391 *Archivist | 454 *Archivist |
| 455 *Settings | |
| 392 | 456 |
| 393 project config.ProjectName | 457 project config.ProjectName |
| 394 path types.StreamPath | 458 path types.StreamPath |
| 395 desc logpb.LogStreamDescriptor | 459 desc logpb.LogStreamDescriptor |
| 396 | 460 |
| 397 stream stagingPaths | 461 stream stagingPaths |
| 398 index stagingPaths | 462 index stagingPaths |
| 399 data stagingPaths | 463 data stagingPaths |
| 400 | 464 |
| 401 finalized bool | 465 finalized bool |
| 402 terminalIndex types.MessageIndex | 466 terminalIndex types.MessageIndex |
| 403 logEntryCount int64 | 467 logEntryCount int64 |
| 404 } | 468 } |
| 405 | 469 |
| 406 // makeStagingPaths returns a stagingPaths instance for the given path and | 470 // makeStagingPaths returns a stagingPaths instance for the given path and |
| 407 // file name. It incorporates a unique ID into the staging name to differentiate | 471 // file name. It incorporates a unique ID into the staging name to differentiate |
| 408 // it from other staging paths for the same path/name. | 472 // it from other staging paths for the same path/name. |
| 409 func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths { | 473 func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths { |
| 410 » // TODO(dnj): This won't be necessary when empty project is invalid. | 474 » proj := string(sa.project) |
| 411 » project := string(sa.project) | 475 » // TODO (dnj): When empty projects are disallowed, remove this. |
| 412 » if project == "" { | 476 » if proj == "" { |
| 413 » » project = "_" | 477 » » proj = "_" |
| 414 } | 478 } |
| 415 | 479 |
| 480 // Either of these paths may be shared between projects. To enforce | |
| 481 // an absence of conflicts, we will insert the project name as part of t he | |
| 482 // path. | |
| 416 return stagingPaths{ | 483 return stagingPaths{ |
| 417 » » staged: sa.GSStagingBase.Concat(project, string(sa.path), uid, n ame), | 484 » » staged: sa.GSStagingBase.Concat(proj, string(sa.path), uid, name ), |
| 418 » » final: sa.GSBase.Concat(project, string(sa.path), name), | 485 » » final: sa.GSBase.Concat(proj, string(sa.path), name), |
| 419 } | 486 } |
| 420 } | 487 } |
| 421 | 488 |
| 422 // checkComplete performs a quick scan of intermediate storage to ensure that | 489 // checkComplete performs a quick scan of intermediate storage to ensure that |
| 423 // all of the log stream's records are available. | 490 // all of the log stream's records are available. |
| 424 func (sa *stagedArchival) checkComplete(c context.Context) error { | 491 func (sa *stagedArchival) checkComplete(c context.Context) error { |
| 425 if sa.terminalIndex < 0 { | 492 if sa.terminalIndex < 0 { |
| 426 log.Warningf(c, "Cannot archive complete stream with no terminal index.") | 493 log.Warningf(c, "Cannot archive complete stream with no terminal index.") |
| 427 return statusErr(errors.New("completeness required, but stream h as no terminal index")) | 494 return statusErr(errors.New("completeness required, but stream h as no terminal index")) |
| 428 } | 495 } |
| (...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 539 terminalIndex: sa.terminalIndex, | 606 terminalIndex: sa.terminalIndex, |
| 540 lastIndex: -1, | 607 lastIndex: -1, |
| 541 } | 608 } |
| 542 | 609 |
| 543 m := archive.Manifest{ | 610 m := archive.Manifest{ |
| 544 Desc: &sa.desc, | 611 Desc: &sa.desc, |
| 545 Source: &ss, | 612 Source: &ss, |
| 546 LogWriter: streamWriter, | 613 LogWriter: streamWriter, |
| 547 IndexWriter: indexWriter, | 614 IndexWriter: indexWriter, |
| 548 DataWriter: dataWriter, | 615 DataWriter: dataWriter, |
| 549 » » StreamIndexRange: sa.StreamIndexRange, | 616 » » StreamIndexRange: sa.IndexStreamRange, |
| 550 » » PrefixIndexRange: sa.PrefixIndexRange, | 617 » » PrefixIndexRange: sa.IndexPrefixRange, |
| 551 » » ByteRange: sa.ByteRange, | 618 » » ByteRange: sa.IndexByteRange, |
| 552 | 619 |
| 553 Logger: log.Get(c), | 620 Logger: log.Get(c), |
| 554 } | 621 } |
| 555 if err = archive.Archive(m); err != nil { | 622 if err = archive.Archive(m); err != nil { |
| 556 log.WithError(err).Errorf(c, "Failed to archive log stream.") | 623 log.WithError(err).Errorf(c, "Failed to archive log stream.") |
| 557 return | 624 return |
| 558 } | 625 } |
| 559 | 626 |
| 560 switch { | 627 switch { |
| 561 case ss.logEntryCount == 0: | 628 case ss.logEntryCount == 0: |
| (...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 687 return e.inner | 754 return e.inner |
| 688 } | 755 } |
| 689 | 756 |
| 690 func isFailure(err error) bool { | 757 func isFailure(err error) bool { |
| 691 if err == nil { | 758 if err == nil { |
| 692 return false | 759 return false |
| 693 } | 760 } |
| 694 _, ok := err.(*statusErrorWrapper) | 761 _, ok := err.(*statusErrorWrapper) |
| 695 return !ok | 762 return !ok |
| 696 } | 763 } |
| OLD | NEW |