Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(615)

Side by Side Diff: server/internal/logdog/archivist/archivist.go

Issue 1968063003: LogDog: Use per-project settings for archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-useconfig
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "encoding/hex" 9 "encoding/hex"
10 "fmt" 10 "fmt"
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
97 // consumed. This may be called multiple times with no additional effect . 97 // consumed. This may be called multiple times with no additional effect .
98 Consume() 98 Consume()
99 99
100 // AssertLease asserts that the lease for this Task is still held. 100 // AssertLease asserts that the lease for this Task is still held.
101 // 101 //
102 // On failure, it will return an error. If successful, the Archivist may 102 // On failure, it will return an error. If successful, the Archivist may
103 // assume that it holds the lease longer. 103 // assume that it holds the lease longer.
104 AssertLease(context.Context) error 104 AssertLease(context.Context) error
105 } 105 }
106 106
107 // Settings retrieve the settings for an archival.
nodir 2016/05/19 15:56:29 s/retrieve/store SettingsLoader retrieves them; Se
dnj (Google) 2016/05/19 16:34:51 Done.
108 type Settings struct {
109 // GSBase is the base Google Storage path. This includes the bucket name
110 // and any associated path.
111 GSBase gs.Path
112 // GSStagingBase is the base Google Storage path for archive staging. Th is
113 // includes the bucket name and any associated path.
114 GSStagingBase gs.Path
nodir 2016/05/19 15:56:29 rebase?
dnj (Google) 2016/05/19 16:34:51 Acknowledged.
115
116 // AlwaysCreateBinary if true, means that a binary should be archived
nodir 2016/05/19 15:56:29 nit: comma before if?
dnj (Google) 2016/05/19 16:34:51 Done.
117 // regardless of whether a specific binary file extension has been suppl ied
118 // with the log stream.
119 AlwaysCreateBinary bool
120
121 // IndexStreamRange is the maximum number of stream indexes in between i ndex
122 // entries. See archive.Manifest for more information.
123 IndexStreamRange int
124 // IndexPrefixRange is the maximum number of prefix indexes in between i ndex
125 // entries. See archive.Manifest for more information.
126 IndexPrefixRange int
127 // IndexByteRange is the maximum number of stream data bytes in between index
128 // entries. See archive.Manifest for more information.
129 IndexByteRange int
130 }
131
132 // SettingsLoader returns archival Settings for a given project.
133 type SettingsLoader func(context.Context, config.ProjectName) (*Settings, error)
134
107 // Archivist is a stateless configuration capable of archiving individual log 135 // Archivist is a stateless configuration capable of archiving individual log
108 // streams. 136 // streams.
109 type Archivist struct { 137 type Archivist struct {
110 // Service is the client to use to communicate with Coordinator's Servic es 138 // Service is the client to use to communicate with Coordinator's Servic es
111 // endpoint. 139 // endpoint.
112 Service logdog.ServicesClient 140 Service logdog.ServicesClient
113 141
142 // SettingsLoader loads archival settings for a specific project.
143 SettingsLoader SettingsLoader
144
114 // Storage is the archival source Storage instance. 145 // Storage is the archival source Storage instance.
115 Storage storage.Storage 146 Storage storage.Storage
116 // GSClient is the Google Storage client to for archive generation. 147 // GSClient is the Google Storage client to for archive generation.
117 GSClient gs.Client 148 GSClient gs.Client
118
119 // GSBase is the base Google Storage path. This includes the bucket name
120 // and any associated path.
121 GSBase gs.Path
122 // GSStagingBase is the base Google Storage path for archive staging. Th is
123 // includes the bucket name and any associated path.
124 GSStagingBase gs.Path
125
126 // PrefixIndexRange is the maximum number of stream indexes in between i ndex
127 // entries. See archive.Manifest for more information.
128 StreamIndexRange int
129 // PrefixIndexRange is the maximum number of prefix indexes in between i ndex
130 // entries. See archive.Manifest for more information.
131 PrefixIndexRange int
132 // ByteRange is the maximum number of stream data bytes in between index
133 // entries. See archive.Manifest for more information.
134 ByteRange int
135 } 149 }
136 150
137 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used 151 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used
138 // to during archival. This should be greater than the maximum LogEntry size. 152 // to during archival. This should be greater than the maximum LogEntry size.
139 const storageBufferSize = types.MaxLogEntryDataSize * 64 153 const storageBufferSize = types.MaxLogEntryDataSize * 64
140 154
141 // ArchiveTask processes and executes a single log stream archive task. 155 // ArchiveTask processes and executes a single log stream archive task.
142 // 156 //
143 // During processing, the Task's Consume method may be called to indicate that 157 // During processing, the Task's Consume method may be called to indicate that
144 // it should be consumed. 158 // it should be consumed.
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
245 // retry later. 259 // retry later.
246 age := ls.Age.Duration() 260 age := ls.Age.Duration()
247 if age < at.SettleDelay.Duration() { 261 if age < at.SettleDelay.Duration() {
248 log.Fields{ 262 log.Fields{
249 "age": age, 263 "age": age,
250 "settleDelay": at.SettleDelay.Duration(), 264 "settleDelay": at.SettleDelay.Duration(),
251 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.") 265 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.")
252 return statusErr(errors.New("log stream is within settle delay") ) 266 return statusErr(errors.New("log stream is within settle delay") )
253 } 267 }
254 268
269 // Load archival settings for this project.
270 settings, err := a.loadSettings(c, config.ProjectName(at.Project))
271 if err != nil {
272 log.Fields{
273 log.ErrorKey: err,
274 "project": at.Project,
275 }.Errorf(c, "Failed to load settings for project.")
276 return err
277 }
278
255 ar := logdog.ArchiveStreamRequest{ 279 ar := logdog.ArchiveStreamRequest{
256 Project: at.Project, 280 Project: at.Project,
257 Id: at.Id, 281 Id: at.Id,
258 } 282 }
259 283
260 // Build our staged archival plan. This doesn't actually do any archivin g. 284 // Build our staged archival plan. This doesn't actually do any archivin g.
261 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), l s, task.UniqueID()) 285 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), s ettings, ls, task.UniqueID())
262 if err != nil { 286 if err != nil {
263 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") 287 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.")
264 return err 288 return err
265 } 289 }
266 290
267 // Are we required to archive a complete log stream? 291 // Are we required to archive a complete log stream?
268 if age <= at.CompletePeriod.Duration() { 292 if age <= at.CompletePeriod.Duration() {
269 // If we're requiring completeness, perform a keys-only scan of intermediate 293 // If we're requiring completeness, perform a keys-only scan of intermediate
270 // storage to ensure that we have all of the records before we b other 294 // storage to ensure that we have all of the records before we b other
271 // streaming to storage only to find that we are missing data. 295 // streaming to storage only to find that we are missing data.
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
348 log.WithError(err).Errorf(c, "Failed to report archive state.") 372 log.WithError(err).Errorf(c, "Failed to report archive state.")
349 return err 373 return err
350 } 374 }
351 375
352 // Archival is complete and acknowledged by Coordinator. Consume the arc hival 376 // Archival is complete and acknowledged by Coordinator. Consume the arc hival
353 // task. 377 // task.
354 task.Consume() 378 task.Consume()
355 return nil 379 return nil
356 } 380 }
357 381
382 // loadSettings loads and validates archival settings.
383 func (a *Archivist) loadSettings(c context.Context, project config.ProjectName) (*Settings, error) {
384 if a.SettingsLoader == nil {
385 panic("no settings loader configured")
nodir 2016/05/19 15:56:29 I am glad to see you didn't ban panics altogether
dnj (Google) 2016/05/19 16:34:51 IMO panics have their place. I think explicit erro
386 }
387
388 st, err := a.SettingsLoader(c, project)
389 switch {
390 case err != nil:
391 return nil, err
392
393 case st.GSBase.Bucket() == "":
394 log.Fields{
395 log.ErrorKey: err,
396 "gsBase": st.GSBase,
397 }.Errorf(c, "Invalid storage base.")
398 return nil, errors.New("invalid storage base")
399
400 case st.GSStagingBase.Bucket() == "":
401 log.Fields{
402 log.ErrorKey: err,
403 "gsStagingBase": st.GSStagingBase,
404 }.Errorf(c, "Invalid storage staging base.")
405 return nil, errors.New("invalid storage staging base")
406
407 default:
408 return st, nil
409 }
410 }
411
358 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name, 412 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name,
359 » ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { 413 » st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva l, error) {
360 sa := stagedArchival{ 414 sa := stagedArchival{
361 Archivist: a, 415 Archivist: a,
416 Settings: st,
362 project: project, 417 project: project,
363 418
364 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), 419 terminalIndex: types.MessageIndex(ls.State.TerminalIndex),
365 } 420 }
366 421
367 // Deserialize and validate the descriptor protobuf. If this fails, it i s a 422 // Deserialize and validate the descriptor protobuf. If this fails, it i s a
368 // non-transient error. 423 // non-transient error.
369 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { 424 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil {
370 log.Fields{ 425 log.Fields{
371 log.ErrorKey: err, 426 log.ErrorKey: err,
(...skipping 10 matching lines...) Expand all
382 437
383 // Construct our staged archival paths. 438 // Construct our staged archival paths.
384 sa.stream = sa.makeStagingPaths("logstream.entries", uid) 439 sa.stream = sa.makeStagingPaths("logstream.entries", uid)
385 sa.index = sa.makeStagingPaths("logstream.index", uid) 440 sa.index = sa.makeStagingPaths("logstream.index", uid)
386 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) 441 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid)
387 return &sa, nil 442 return &sa, nil
388 } 443 }
389 444
390 type stagedArchival struct { 445 type stagedArchival struct {
391 *Archivist 446 *Archivist
447 *Settings
392 448
393 project config.ProjectName 449 project config.ProjectName
394 path types.StreamPath 450 path types.StreamPath
395 desc logpb.LogStreamDescriptor 451 desc logpb.LogStreamDescriptor
396 452
397 stream stagingPaths 453 stream stagingPaths
398 index stagingPaths 454 index stagingPaths
399 data stagingPaths 455 data stagingPaths
400 456
401 finalized bool 457 finalized bool
402 terminalIndex types.MessageIndex 458 terminalIndex types.MessageIndex
403 logEntryCount int64 459 logEntryCount int64
404 } 460 }
405 461
406 // makeStagingPaths returns a stagingPaths instance for the given path and 462 // makeStagingPaths returns a stagingPaths instance for the given path and
407 // file name. It incorporates a unique ID into the staging name to differentiate 463 // file name. It incorporates a unique ID into the staging name to differentiate
408 // it from other staging paths for the same path/name. 464 // it from other staging paths for the same path/name.
409 func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths { 465 func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths {
410 » // TODO(dnj): This won't be necessary when empty project is invalid. 466 » proj := string(sa.project)
411 » project := string(sa.project) 467 » // TODO (dnj): When empty projects are disallowed, remove this.
412 » if project == "" { 468 » if proj == "" {
413 » » project = "_" 469 » » proj = "_"
414 } 470 }
415 471
472 // Either of these paths may be shared between projects. To enforce
473 // an absence of conflicts, we will insert the project name as part of t he
474 // path.
416 return stagingPaths{ 475 return stagingPaths{
417 » » staged: sa.GSStagingBase.Concat(project, string(sa.path), uid, n ame), 476 » » staged: sa.GSStagingBase.Concat(proj, string(sa.path), uid, name ),
418 » » final: sa.GSBase.Concat(project, string(sa.path), name), 477 » » final: sa.GSBase.Concat(proj, string(sa.path), name),
419 } 478 }
420 } 479 }
421 480
422 // checkComplete performs a quick scan of intermediate storage to ensure that 481 // checkComplete performs a quick scan of intermediate storage to ensure that
423 // all of the log stream's records are available. 482 // all of the log stream's records are available.
424 func (sa *stagedArchival) checkComplete(c context.Context) error { 483 func (sa *stagedArchival) checkComplete(c context.Context) error {
425 if sa.terminalIndex < 0 { 484 if sa.terminalIndex < 0 {
426 log.Warningf(c, "Cannot archive complete stream with no terminal index.") 485 log.Warningf(c, "Cannot archive complete stream with no terminal index.")
427 return statusErr(errors.New("completeness required, but stream h as no terminal index")) 486 return statusErr(errors.New("completeness required, but stream h as no terminal index"))
428 } 487 }
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after
539 terminalIndex: sa.terminalIndex, 598 terminalIndex: sa.terminalIndex,
540 lastIndex: -1, 599 lastIndex: -1,
541 } 600 }
542 601
543 m := archive.Manifest{ 602 m := archive.Manifest{
544 Desc: &sa.desc, 603 Desc: &sa.desc,
545 Source: &ss, 604 Source: &ss,
546 LogWriter: streamWriter, 605 LogWriter: streamWriter,
547 IndexWriter: indexWriter, 606 IndexWriter: indexWriter,
548 DataWriter: dataWriter, 607 DataWriter: dataWriter,
549 » » StreamIndexRange: sa.StreamIndexRange, 608 » » StreamIndexRange: sa.IndexStreamRange,
550 » » PrefixIndexRange: sa.PrefixIndexRange, 609 » » PrefixIndexRange: sa.IndexPrefixRange,
551 » » ByteRange: sa.ByteRange, 610 » » ByteRange: sa.IndexByteRange,
552 611
553 Logger: log.Get(c), 612 Logger: log.Get(c),
554 } 613 }
555 if err = archive.Archive(m); err != nil { 614 if err = archive.Archive(m); err != nil {
556 log.WithError(err).Errorf(c, "Failed to archive log stream.") 615 log.WithError(err).Errorf(c, "Failed to archive log stream.")
557 return 616 return
558 } 617 }
559 618
560 switch { 619 switch {
561 case ss.logEntryCount == 0: 620 case ss.logEntryCount == 0:
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
687 return e.inner 746 return e.inner
688 } 747 }
689 748
690 func isFailure(err error) bool { 749 func isFailure(err error) bool {
691 if err == nil { 750 if err == nil {
692 return false 751 return false
693 } 752 }
694 _, ok := err.(*statusErrorWrapper) 753 _, ok := err.(*statusErrorWrapper)
695 return !ok 754 return !ok
696 } 755 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698