OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package archivist | 5 package archivist |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "encoding/hex" | 9 "encoding/hex" |
10 "fmt" | 10 "fmt" |
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
97 // consumed. This may be called multiple times with no additional effect . | 97 // consumed. This may be called multiple times with no additional effect . |
98 Consume() | 98 Consume() |
99 | 99 |
100 // AssertLease asserts that the lease for this Task is still held. | 100 // AssertLease asserts that the lease for this Task is still held. |
101 // | 101 // |
102 // On failure, it will return an error. If successful, the Archivist may | 102 // On failure, it will return an error. If successful, the Archivist may |
103 // assume that it holds the lease longer. | 103 // assume that it holds the lease longer. |
104 AssertLease(context.Context) error | 104 AssertLease(context.Context) error |
105 } | 105 } |
106 | 106 |
107 // Settings retrieve the settings for an archival. | |
nodir
2016/05/19 15:56:29
s/retrieve/store
SettingsLoader retrieves them; Se
dnj (Google)
2016/05/19 16:34:51
Done.
| |
108 type Settings struct { | |
109 // GSBase is the base Google Storage path. This includes the bucket name | |
110 // and any associated path. | |
111 GSBase gs.Path | |
112 // GSStagingBase is the base Google Storage path for archive staging. Th is | |
113 // includes the bucket name and any associated path. | |
114 GSStagingBase gs.Path | |
nodir
2016/05/19 15:56:29
rebase?
dnj (Google)
2016/05/19 16:34:51
Acknowledged.
| |
115 | |
116 // AlwaysCreateBinary if true, means that a binary should be archived | |
nodir
2016/05/19 15:56:29
nit: comma before if?
dnj (Google)
2016/05/19 16:34:51
Done.
| |
117 // regardless of whether a specific binary file extension has been suppl ied | |
118 // with the log stream. | |
119 AlwaysCreateBinary bool | |
120 | |
121 // IndexStreamRange is the maximum number of stream indexes in between i ndex | |
122 // entries. See archive.Manifest for more information. | |
123 IndexStreamRange int | |
124 // IndexPrefixRange is the maximum number of prefix indexes in between i ndex | |
125 // entries. See archive.Manifest for more information. | |
126 IndexPrefixRange int | |
127 // IndexByteRange is the maximum number of stream data bytes in between index | |
128 // entries. See archive.Manifest for more information. | |
129 IndexByteRange int | |
130 } | |
131 | |
132 // SettingsLoader returns archival Settings for a given project. | |
133 type SettingsLoader func(context.Context, config.ProjectName) (*Settings, error) | |
134 | |
107 // Archivist is a stateless configuration capable of archiving individual log | 135 // Archivist is a stateless configuration capable of archiving individual log |
108 // streams. | 136 // streams. |
109 type Archivist struct { | 137 type Archivist struct { |
110 // Service is the client to use to communicate with Coordinator's Servic es | 138 // Service is the client to use to communicate with Coordinator's Servic es |
111 // endpoint. | 139 // endpoint. |
112 Service logdog.ServicesClient | 140 Service logdog.ServicesClient |
113 | 141 |
142 // SettingsLoader loads archival settings for a specific project. | |
143 SettingsLoader SettingsLoader | |
144 | |
114 // Storage is the archival source Storage instance. | 145 // Storage is the archival source Storage instance. |
115 Storage storage.Storage | 146 Storage storage.Storage |
116 // GSClient is the Google Storage client to for archive generation. | 147 // GSClient is the Google Storage client to for archive generation. |
117 GSClient gs.Client | 148 GSClient gs.Client |
118 | |
119 // GSBase is the base Google Storage path. This includes the bucket name | |
120 // and any associated path. | |
121 GSBase gs.Path | |
122 // GSStagingBase is the base Google Storage path for archive staging. Th is | |
123 // includes the bucket name and any associated path. | |
124 GSStagingBase gs.Path | |
125 | |
126 // PrefixIndexRange is the maximum number of stream indexes in between i ndex | |
127 // entries. See archive.Manifest for more information. | |
128 StreamIndexRange int | |
129 // PrefixIndexRange is the maximum number of prefix indexes in between i ndex | |
130 // entries. See archive.Manifest for more information. | |
131 PrefixIndexRange int | |
132 // ByteRange is the maximum number of stream data bytes in between index | |
133 // entries. See archive.Manifest for more information. | |
134 ByteRange int | |
135 } | 149 } |
136 | 150 |
137 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used | 151 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used |
138 // to during archival. This should be greater than the maximum LogEntry size. | 152 // to during archival. This should be greater than the maximum LogEntry size. |
139 const storageBufferSize = types.MaxLogEntryDataSize * 64 | 153 const storageBufferSize = types.MaxLogEntryDataSize * 64 |
140 | 154 |
141 // ArchiveTask processes and executes a single log stream archive task. | 155 // ArchiveTask processes and executes a single log stream archive task. |
142 // | 156 // |
143 // During processing, the Task's Consume method may be called to indicate that | 157 // During processing, the Task's Consume method may be called to indicate that |
144 // it should be consumed. | 158 // it should be consumed. |
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
245 // retry later. | 259 // retry later. |
246 age := ls.Age.Duration() | 260 age := ls.Age.Duration() |
247 if age < at.SettleDelay.Duration() { | 261 if age < at.SettleDelay.Duration() { |
248 log.Fields{ | 262 log.Fields{ |
249 "age": age, | 263 "age": age, |
250 "settleDelay": at.SettleDelay.Duration(), | 264 "settleDelay": at.SettleDelay.Duration(), |
251 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.") | 265 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.") |
252 return statusErr(errors.New("log stream is within settle delay") ) | 266 return statusErr(errors.New("log stream is within settle delay") ) |
253 } | 267 } |
254 | 268 |
269 // Load archival settings for this project. | |
270 settings, err := a.loadSettings(c, config.ProjectName(at.Project)) | |
271 if err != nil { | |
272 log.Fields{ | |
273 log.ErrorKey: err, | |
274 "project": at.Project, | |
275 }.Errorf(c, "Failed to load settings for project.") | |
276 return err | |
277 } | |
278 | |
255 ar := logdog.ArchiveStreamRequest{ | 279 ar := logdog.ArchiveStreamRequest{ |
256 Project: at.Project, | 280 Project: at.Project, |
257 Id: at.Id, | 281 Id: at.Id, |
258 } | 282 } |
259 | 283 |
260 // Build our staged archival plan. This doesn't actually do any archivin g. | 284 // Build our staged archival plan. This doesn't actually do any archivin g. |
261 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), l s, task.UniqueID()) | 285 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), s ettings, ls, task.UniqueID()) |
262 if err != nil { | 286 if err != nil { |
263 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") | 287 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") |
264 return err | 288 return err |
265 } | 289 } |
266 | 290 |
267 // Are we required to archive a complete log stream? | 291 // Are we required to archive a complete log stream? |
268 if age <= at.CompletePeriod.Duration() { | 292 if age <= at.CompletePeriod.Duration() { |
269 // If we're requiring completeness, perform a keys-only scan of intermediate | 293 // If we're requiring completeness, perform a keys-only scan of intermediate |
270 // storage to ensure that we have all of the records before we b other | 294 // storage to ensure that we have all of the records before we b other |
271 // streaming to storage only to find that we are missing data. | 295 // streaming to storage only to find that we are missing data. |
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
348 log.WithError(err).Errorf(c, "Failed to report archive state.") | 372 log.WithError(err).Errorf(c, "Failed to report archive state.") |
349 return err | 373 return err |
350 } | 374 } |
351 | 375 |
352 // Archival is complete and acknowledged by Coordinator. Consume the arc hival | 376 // Archival is complete and acknowledged by Coordinator. Consume the arc hival |
353 // task. | 377 // task. |
354 task.Consume() | 378 task.Consume() |
355 return nil | 379 return nil |
356 } | 380 } |
357 | 381 |
382 // loadSettings loads and validates archival settings. | |
383 func (a *Archivist) loadSettings(c context.Context, project config.ProjectName) (*Settings, error) { | |
384 if a.SettingsLoader == nil { | |
385 panic("no settings loader configured") | |
nodir
2016/05/19 15:56:29
I am glad to see you didn't ban panics altogether
dnj (Google)
2016/05/19 16:34:51
IMO panics have their place. I think explicit erro
| |
386 } | |
387 | |
388 st, err := a.SettingsLoader(c, project) | |
389 switch { | |
390 case err != nil: | |
391 return nil, err | |
392 | |
393 case st.GSBase.Bucket() == "": | |
394 log.Fields{ | |
395 log.ErrorKey: err, | |
396 "gsBase": st.GSBase, | |
397 }.Errorf(c, "Invalid storage base.") | |
398 return nil, errors.New("invalid storage base") | |
399 | |
400 case st.GSStagingBase.Bucket() == "": | |
401 log.Fields{ | |
402 log.ErrorKey: err, | |
403 "gsStagingBase": st.GSStagingBase, | |
404 }.Errorf(c, "Invalid storage staging base.") | |
405 return nil, errors.New("invalid storage staging base") | |
406 | |
407 default: | |
408 return st, nil | |
409 } | |
410 } | |
411 | |
358 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name, | 412 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name, |
359 » ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { | 413 » st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva l, error) { |
360 sa := stagedArchival{ | 414 sa := stagedArchival{ |
361 Archivist: a, | 415 Archivist: a, |
416 Settings: st, | |
362 project: project, | 417 project: project, |
363 | 418 |
364 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), | 419 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), |
365 } | 420 } |
366 | 421 |
367 // Deserialize and validate the descriptor protobuf. If this fails, it i s a | 422 // Deserialize and validate the descriptor protobuf. If this fails, it i s a |
368 // non-transient error. | 423 // non-transient error. |
369 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { | 424 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { |
370 log.Fields{ | 425 log.Fields{ |
371 log.ErrorKey: err, | 426 log.ErrorKey: err, |
(...skipping 10 matching lines...) Expand all Loading... | |
382 | 437 |
383 // Construct our staged archival paths. | 438 // Construct our staged archival paths. |
384 sa.stream = sa.makeStagingPaths("logstream.entries", uid) | 439 sa.stream = sa.makeStagingPaths("logstream.entries", uid) |
385 sa.index = sa.makeStagingPaths("logstream.index", uid) | 440 sa.index = sa.makeStagingPaths("logstream.index", uid) |
386 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) | 441 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) |
387 return &sa, nil | 442 return &sa, nil |
388 } | 443 } |
389 | 444 |
390 type stagedArchival struct { | 445 type stagedArchival struct { |
391 *Archivist | 446 *Archivist |
447 *Settings | |
392 | 448 |
393 project config.ProjectName | 449 project config.ProjectName |
394 path types.StreamPath | 450 path types.StreamPath |
395 desc logpb.LogStreamDescriptor | 451 desc logpb.LogStreamDescriptor |
396 | 452 |
397 stream stagingPaths | 453 stream stagingPaths |
398 index stagingPaths | 454 index stagingPaths |
399 data stagingPaths | 455 data stagingPaths |
400 | 456 |
401 finalized bool | 457 finalized bool |
402 terminalIndex types.MessageIndex | 458 terminalIndex types.MessageIndex |
403 logEntryCount int64 | 459 logEntryCount int64 |
404 } | 460 } |
405 | 461 |
406 // makeStagingPaths returns a stagingPaths instance for the given path and | 462 // makeStagingPaths returns a stagingPaths instance for the given path and |
407 // file name. It incorporates a unique ID into the staging name to differentiate | 463 // file name. It incorporates a unique ID into the staging name to differentiate |
408 // it from other staging paths for the same path/name. | 464 // it from other staging paths for the same path/name. |
409 func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths { | 465 func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths { |
410 » // TODO(dnj): This won't be necessary when empty project is invalid. | 466 » proj := string(sa.project) |
411 » project := string(sa.project) | 467 » // TODO (dnj): When empty projects are disallowed, remove this. |
412 » if project == "" { | 468 » if proj == "" { |
413 » » project = "_" | 469 » » proj = "_" |
414 } | 470 } |
415 | 471 |
472 // Either of these paths may be shared between projects. To enforce | |
473 // an absence of conflicts, we will insert the project name as part of t he | |
474 // path. | |
416 return stagingPaths{ | 475 return stagingPaths{ |
417 » » staged: sa.GSStagingBase.Concat(project, string(sa.path), uid, n ame), | 476 » » staged: sa.GSStagingBase.Concat(proj, string(sa.path), uid, name ), |
418 » » final: sa.GSBase.Concat(project, string(sa.path), name), | 477 » » final: sa.GSBase.Concat(proj, string(sa.path), name), |
419 } | 478 } |
420 } | 479 } |
421 | 480 |
422 // checkComplete performs a quick scan of intermediate storage to ensure that | 481 // checkComplete performs a quick scan of intermediate storage to ensure that |
423 // all of the log stream's records are available. | 482 // all of the log stream's records are available. |
424 func (sa *stagedArchival) checkComplete(c context.Context) error { | 483 func (sa *stagedArchival) checkComplete(c context.Context) error { |
425 if sa.terminalIndex < 0 { | 484 if sa.terminalIndex < 0 { |
426 log.Warningf(c, "Cannot archive complete stream with no terminal index.") | 485 log.Warningf(c, "Cannot archive complete stream with no terminal index.") |
427 return statusErr(errors.New("completeness required, but stream h as no terminal index")) | 486 return statusErr(errors.New("completeness required, but stream h as no terminal index")) |
428 } | 487 } |
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
539 terminalIndex: sa.terminalIndex, | 598 terminalIndex: sa.terminalIndex, |
540 lastIndex: -1, | 599 lastIndex: -1, |
541 } | 600 } |
542 | 601 |
543 m := archive.Manifest{ | 602 m := archive.Manifest{ |
544 Desc: &sa.desc, | 603 Desc: &sa.desc, |
545 Source: &ss, | 604 Source: &ss, |
546 LogWriter: streamWriter, | 605 LogWriter: streamWriter, |
547 IndexWriter: indexWriter, | 606 IndexWriter: indexWriter, |
548 DataWriter: dataWriter, | 607 DataWriter: dataWriter, |
549 » » StreamIndexRange: sa.StreamIndexRange, | 608 » » StreamIndexRange: sa.IndexStreamRange, |
550 » » PrefixIndexRange: sa.PrefixIndexRange, | 609 » » PrefixIndexRange: sa.IndexPrefixRange, |
551 » » ByteRange: sa.ByteRange, | 610 » » ByteRange: sa.IndexByteRange, |
552 | 611 |
553 Logger: log.Get(c), | 612 Logger: log.Get(c), |
554 } | 613 } |
555 if err = archive.Archive(m); err != nil { | 614 if err = archive.Archive(m); err != nil { |
556 log.WithError(err).Errorf(c, "Failed to archive log stream.") | 615 log.WithError(err).Errorf(c, "Failed to archive log stream.") |
557 return | 616 return |
558 } | 617 } |
559 | 618 |
560 switch { | 619 switch { |
561 case ss.logEntryCount == 0: | 620 case ss.logEntryCount == 0: |
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
687 return e.inner | 746 return e.inner |
688 } | 747 } |
689 | 748 |
690 func isFailure(err error) bool { | 749 func isFailure(err error) bool { |
691 if err == nil { | 750 if err == nil { |
692 return false | 751 return false |
693 } | 752 } |
694 _, ok := err.(*statusErrorWrapper) | 753 _, ok := err.(*statusErrorWrapper) |
695 return !ok | 754 return !ok |
696 } | 755 } |
OLD | NEW |