Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(70)

Side by Side Diff: server/internal/logdog/archivist/archivist.go

Issue 1968063003: LogDog: Use per-project settings for archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-useconfig
Patch Set: Updated from code review comments. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « server/cmd/logdog_archivist/main.go ('k') | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "encoding/hex" 9 "encoding/hex"
10 "fmt" 10 "fmt"
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
97 // consumed. This may be called multiple times with no additional effect . 97 // consumed. This may be called multiple times with no additional effect .
98 Consume() 98 Consume()
99 99
100 // AssertLease asserts that the lease for this Task is still held. 100 // AssertLease asserts that the lease for this Task is still held.
101 // 101 //
102 // On failure, it will return an error. If successful, the Archivist may 102 // On failure, it will return an error. If successful, the Archivist may
103 // assume that it holds the lease longer. 103 // assume that it holds the lease longer.
104 AssertLease(context.Context) error 104 AssertLease(context.Context) error
105 } 105 }
106 106
107 // Settings defines the archival parameters for a specific archival operation.
108 //
109 // In practice, this will be formed from service and project settings.
110 type Settings struct {
111 // GSBase is the base Google Storage path. This includes the bucket name
112 // and any associated path.
113 //
114 // This must be unique to this archival project. In practice, it will be
115 // composed of the project's archival bucket and project ID.
116 GSBase gs.Path
117 // GSStagingBase is the base Google Storage path for archive staging. Th is
118 // includes the bucket name and any associated path.
119 //
120 // This must be unique to this archival project. In practice, it will be
121 // composed of the project's staging archival bucket and project ID.
122 GSStagingBase gs.Path
123
124 // AlwaysRender, if true, means that a binary should be archived
125 // regardless of whether a specific binary file extension has been suppl ied
126 // with the log stream.
127 AlwaysRender bool
128
129 // IndexStreamRange is the maximum number of stream indexes in between i ndex
nodir 2016/05/19 17:27:49 stream entries? below too
dnj (Google) 2016/05/19 22:53:55 Index entries. So basically all three of these det
130 // entries. See archive.Manifest for more information.
131 IndexStreamRange int
132 // IndexPrefixRange is the maximum number of prefix indexes in between i ndex
133 // entries. See archive.Manifest for more information.
134 IndexPrefixRange int
135 // IndexByteRange is the maximum number of stream data bytes in between index
136 // entries. See archive.Manifest for more information.
137 IndexByteRange int
138 }
139
140 // SettingsLoader returns archival Settings for a given project.
141 type SettingsLoader func(context.Context, config.ProjectName) (*Settings, error)
142
107 // Archivist is a stateless configuration capable of archiving individual log 143 // Archivist is a stateless configuration capable of archiving individual log
108 // streams. 144 // streams.
109 type Archivist struct { 145 type Archivist struct {
110 // Service is the client to use to communicate with Coordinator's Servic es 146 // Service is the client to use to communicate with Coordinator's Servic es
111 // endpoint. 147 // endpoint.
112 Service logdog.ServicesClient 148 Service logdog.ServicesClient
113 149
150 // SettingsLoader loads archival settings for a specific project.
151 SettingsLoader SettingsLoader
152
114 // Storage is the archival source Storage instance. 153 // Storage is the archival source Storage instance.
115 Storage storage.Storage 154 Storage storage.Storage
116 // GSClient is the Google Storage client to for archive generation. 155 // GSClient is the Google Storage client to for archive generation.
117 GSClient gs.Client 156 GSClient gs.Client
118
119 // GSBase is the base Google Storage path. This includes the bucket name
120 // and any associated path.
121 GSBase gs.Path
122 // GSStagingBase is the base Google Storage path for archive staging. Th is
123 // includes the bucket name and any associated path.
124 GSStagingBase gs.Path
125
126 // PrefixIndexRange is the maximum number of stream indexes in between i ndex
127 // entries. See archive.Manifest for more information.
128 StreamIndexRange int
129 // PrefixIndexRange is the maximum number of prefix indexes in between i ndex
130 // entries. See archive.Manifest for more information.
131 PrefixIndexRange int
132 // ByteRange is the maximum number of stream data bytes in between index
133 // entries. See archive.Manifest for more information.
134 ByteRange int
135 } 157 }
136 158
137 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used 159 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used
138 // to during archival. This should be greater than the maximum LogEntry size. 160 // to during archival. This should be greater than the maximum LogEntry size.
139 const storageBufferSize = types.MaxLogEntryDataSize * 64 161 const storageBufferSize = types.MaxLogEntryDataSize * 64
140 162
141 // ArchiveTask processes and executes a single log stream archive task. 163 // ArchiveTask processes and executes a single log stream archive task.
142 // 164 //
143 // During processing, the Task's Consume method may be called to indicate that 165 // During processing, the Task's Consume method may be called to indicate that
144 // it should be consumed. 166 // it should be consumed.
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
245 // retry later. 267 // retry later.
246 age := ls.Age.Duration() 268 age := ls.Age.Duration()
247 if age < at.SettleDelay.Duration() { 269 if age < at.SettleDelay.Duration() {
248 log.Fields{ 270 log.Fields{
249 "age": age, 271 "age": age,
250 "settleDelay": at.SettleDelay.Duration(), 272 "settleDelay": at.SettleDelay.Duration(),
251 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.") 273 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.")
252 return statusErr(errors.New("log stream is within settle delay") ) 274 return statusErr(errors.New("log stream is within settle delay") )
253 } 275 }
254 276
277 // Load archival settings for this project.
278 settings, err := a.loadSettings(c, config.ProjectName(at.Project))
279 if err != nil {
280 log.Fields{
281 log.ErrorKey: err,
282 "project": at.Project,
283 }.Errorf(c, "Failed to load settings for project.")
284 return err
285 }
286
255 ar := logdog.ArchiveStreamRequest{ 287 ar := logdog.ArchiveStreamRequest{
256 Project: at.Project, 288 Project: at.Project,
257 Id: at.Id, 289 Id: at.Id,
258 } 290 }
259 291
260 // Build our staged archival plan. This doesn't actually do any archivin g. 292 // Build our staged archival plan. This doesn't actually do any archivin g.
261 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), l s, task.UniqueID()) 293 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), s ettings, ls, task.UniqueID())
262 if err != nil { 294 if err != nil {
263 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") 295 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.")
264 return err 296 return err
265 } 297 }
266 298
267 // Are we required to archive a complete log stream? 299 // Are we required to archive a complete log stream?
268 if age <= at.CompletePeriod.Duration() { 300 if age <= at.CompletePeriod.Duration() {
269 // If we're requiring completeness, perform a keys-only scan of intermediate 301 // If we're requiring completeness, perform a keys-only scan of intermediate
270 // storage to ensure that we have all of the records before we b other 302 // storage to ensure that we have all of the records before we b other
271 // streaming to storage only to find that we are missing data. 303 // streaming to storage only to find that we are missing data.
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
348 log.WithError(err).Errorf(c, "Failed to report archive state.") 380 log.WithError(err).Errorf(c, "Failed to report archive state.")
349 return err 381 return err
350 } 382 }
351 383
352 // Archival is complete and acknowledged by Coordinator. Consume the arc hival 384 // Archival is complete and acknowledged by Coordinator. Consume the arc hival
353 // task. 385 // task.
354 task.Consume() 386 task.Consume()
355 return nil 387 return nil
356 } 388 }
357 389
390 // loadSettings loads and validates archival settings.
391 func (a *Archivist) loadSettings(c context.Context, project config.ProjectName) (*Settings, error) {
392 if a.SettingsLoader == nil {
393 panic("no settings loader configured")
394 }
395
396 st, err := a.SettingsLoader(c, project)
397 switch {
398 case err != nil:
399 return nil, err
400
401 case st.GSBase.Bucket() == "":
402 log.Fields{
403 log.ErrorKey: err,
404 "gsBase": st.GSBase,
405 }.Errorf(c, "Invalid storage base.")
406 return nil, errors.New("invalid storage base")
407
408 case st.GSStagingBase.Bucket() == "":
409 log.Fields{
410 log.ErrorKey: err,
411 "gsStagingBase": st.GSStagingBase,
412 }.Errorf(c, "Invalid storage staging base.")
413 return nil, errors.New("invalid storage staging base")
414
415 default:
416 return st, nil
417 }
418 }
419
358 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name, 420 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name,
359 » ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { 421 » st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva l, error) {
360 sa := stagedArchival{ 422 sa := stagedArchival{
361 Archivist: a, 423 Archivist: a,
424 Settings: st,
362 project: project, 425 project: project,
363 426
364 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), 427 terminalIndex: types.MessageIndex(ls.State.TerminalIndex),
365 } 428 }
366 429
367 // Deserialize and validate the descriptor protobuf. If this fails, it i s a 430 // Deserialize and validate the descriptor protobuf. If this fails, it i s a
368 // non-transient error. 431 // non-transient error.
369 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { 432 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil {
370 log.Fields{ 433 log.Fields{
371 log.ErrorKey: err, 434 log.ErrorKey: err,
(...skipping 10 matching lines...) Expand all
382 445
383 // Construct our staged archival paths. 446 // Construct our staged archival paths.
384 sa.stream = sa.makeStagingPaths("logstream.entries", uid) 447 sa.stream = sa.makeStagingPaths("logstream.entries", uid)
385 sa.index = sa.makeStagingPaths("logstream.index", uid) 448 sa.index = sa.makeStagingPaths("logstream.index", uid)
386 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) 449 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid)
387 return &sa, nil 450 return &sa, nil
388 } 451 }
389 452
390 type stagedArchival struct { 453 type stagedArchival struct {
391 *Archivist 454 *Archivist
455 *Settings
392 456
393 project config.ProjectName 457 project config.ProjectName
394 path types.StreamPath 458 path types.StreamPath
395 desc logpb.LogStreamDescriptor 459 desc logpb.LogStreamDescriptor
396 460
397 stream stagingPaths 461 stream stagingPaths
398 index stagingPaths 462 index stagingPaths
399 data stagingPaths 463 data stagingPaths
400 464
401 finalized bool 465 finalized bool
402 terminalIndex types.MessageIndex 466 terminalIndex types.MessageIndex
403 logEntryCount int64 467 logEntryCount int64
404 } 468 }
405 469
406 // makeStagingPaths returns a stagingPaths instance for the given path and 470 // makeStagingPaths returns a stagingPaths instance for the given path and
407 // file name. It incorporates a unique ID into the staging name to differentiate 471 // file name. It incorporates a unique ID into the staging name to differentiate
408 // it from other staging paths for the same path/name. 472 // it from other staging paths for the same path/name.
409 func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths { 473 func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths {
410 » // TODO(dnj): This won't be necessary when empty project is invalid. 474 » proj := string(sa.project)
411 » project := string(sa.project) 475 » // TODO (dnj): When empty projects are disallowed, remove this.
412 » if project == "" { 476 » if proj == "" {
413 » » project = "_" 477 » » proj = "_"
414 } 478 }
415 479
480 // Either of these paths may be shared between projects. To enforce
481 // an absence of conflicts, we will insert the project name as part of t he
482 // path.
416 return stagingPaths{ 483 return stagingPaths{
417 » » staged: sa.GSStagingBase.Concat(project, string(sa.path), uid, n ame), 484 » » staged: sa.GSStagingBase.Concat(proj, string(sa.path), uid, name ),
418 » » final: sa.GSBase.Concat(project, string(sa.path), name), 485 » » final: sa.GSBase.Concat(proj, string(sa.path), name),
419 } 486 }
420 } 487 }
421 488
422 // checkComplete performs a quick scan of intermediate storage to ensure that 489 // checkComplete performs a quick scan of intermediate storage to ensure that
423 // all of the log stream's records are available. 490 // all of the log stream's records are available.
424 func (sa *stagedArchival) checkComplete(c context.Context) error { 491 func (sa *stagedArchival) checkComplete(c context.Context) error {
425 if sa.terminalIndex < 0 { 492 if sa.terminalIndex < 0 {
426 log.Warningf(c, "Cannot archive complete stream with no terminal index.") 493 log.Warningf(c, "Cannot archive complete stream with no terminal index.")
427 return statusErr(errors.New("completeness required, but stream h as no terminal index")) 494 return statusErr(errors.New("completeness required, but stream h as no terminal index"))
428 } 495 }
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after
539 terminalIndex: sa.terminalIndex, 606 terminalIndex: sa.terminalIndex,
540 lastIndex: -1, 607 lastIndex: -1,
541 } 608 }
542 609
543 m := archive.Manifest{ 610 m := archive.Manifest{
544 Desc: &sa.desc, 611 Desc: &sa.desc,
545 Source: &ss, 612 Source: &ss,
546 LogWriter: streamWriter, 613 LogWriter: streamWriter,
547 IndexWriter: indexWriter, 614 IndexWriter: indexWriter,
548 DataWriter: dataWriter, 615 DataWriter: dataWriter,
549 » » StreamIndexRange: sa.StreamIndexRange, 616 » » StreamIndexRange: sa.IndexStreamRange,
550 » » PrefixIndexRange: sa.PrefixIndexRange, 617 » » PrefixIndexRange: sa.IndexPrefixRange,
551 » » ByteRange: sa.ByteRange, 618 » » ByteRange: sa.IndexByteRange,
552 619
553 Logger: log.Get(c), 620 Logger: log.Get(c),
554 } 621 }
555 if err = archive.Archive(m); err != nil { 622 if err = archive.Archive(m); err != nil {
556 log.WithError(err).Errorf(c, "Failed to archive log stream.") 623 log.WithError(err).Errorf(c, "Failed to archive log stream.")
557 return 624 return
558 } 625 }
559 626
560 switch { 627 switch {
561 case ss.logEntryCount == 0: 628 case ss.logEntryCount == 0:
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
687 return e.inner 754 return e.inner
688 } 755 }
689 756
690 func isFailure(err error) bool { 757 func isFailure(err error) bool {
691 if err == nil { 758 if err == nil {
692 return false 759 return false
693 } 760 }
694 _, ok := err.(*statusErrorWrapper) 761 _, ok := err.(*statusErrorWrapper)
695 return !ok 762 return !ok
696 } 763 }
OLDNEW
« no previous file with comments | « server/cmd/logdog_archivist/main.go ('k') | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698