Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(341)

Side by Side Diff: logdog/server/archivist/archivist.go

Issue 2626433004: Move "common/config" common types into cfgtypes. (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/common/viewer/url_test.go ('k') | logdog/server/archivist/archivist_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "encoding/hex" 9 "encoding/hex"
10 "fmt" 10 "fmt"
11 "io" 11 "io"
12 "time" 12 "time"
13 13
14 "github.com/golang/protobuf/proto" 14 "github.com/golang/protobuf/proto"
15 "golang.org/x/net/context" 15 "golang.org/x/net/context"
16 16
17 "github.com/luci/luci-go/common/clock" 17 "github.com/luci/luci-go/common/clock"
18 "github.com/luci/luci-go/common/config"
19 "github.com/luci/luci-go/common/errors" 18 "github.com/luci/luci-go/common/errors"
20 "github.com/luci/luci-go/common/gcloud/gs" 19 "github.com/luci/luci-go/common/gcloud/gs"
21 log "github.com/luci/luci-go/common/logging" 20 log "github.com/luci/luci-go/common/logging"
22 "github.com/luci/luci-go/common/sync/parallel" 21 "github.com/luci/luci-go/common/sync/parallel"
23 "github.com/luci/luci-go/common/tsmon/distribution" 22 "github.com/luci/luci-go/common/tsmon/distribution"
24 "github.com/luci/luci-go/common/tsmon/field" 23 "github.com/luci/luci-go/common/tsmon/field"
25 "github.com/luci/luci-go/common/tsmon/metric" 24 "github.com/luci/luci-go/common/tsmon/metric"
26 tsmon_types "github.com/luci/luci-go/common/tsmon/types" 25 tsmon_types "github.com/luci/luci-go/common/tsmon/types"
27 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" 26 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
28 "github.com/luci/luci-go/logdog/api/logpb" 27 "github.com/luci/luci-go/logdog/api/logpb"
29 "github.com/luci/luci-go/logdog/common/archive" 28 "github.com/luci/luci-go/logdog/common/archive"
30 "github.com/luci/luci-go/logdog/common/storage" 29 "github.com/luci/luci-go/logdog/common/storage"
31 "github.com/luci/luci-go/logdog/common/types" 30 "github.com/luci/luci-go/logdog/common/types"
31 "github.com/luci/luci-go/luci_config/common/cfgtypes"
32 ) 32 )
33 33
34 const ( 34 const (
35 tsEntriesField = "entries" 35 tsEntriesField = "entries"
36 tsIndexField = "index" 36 tsIndexField = "index"
37 tsDataField = "data" 37 tsDataField = "data"
38 38
39 // If the archive dispatch is within this range of the current time, we will 39 // If the archive dispatch is within this range of the current time, we will
40 // avoid archival. 40 // avoid archival.
41 dispatchThreshold = 5 * time.Minute 41 dispatchThreshold = 5 * time.Minute
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
143 IndexStreamRange int 143 IndexStreamRange int
144 // IndexPrefixRange is the maximum number of prefix indexes in between i ndex 144 // IndexPrefixRange is the maximum number of prefix indexes in between i ndex
145 // entries. See archive.Manifest for more information. 145 // entries. See archive.Manifest for more information.
146 IndexPrefixRange int 146 IndexPrefixRange int
147 // IndexByteRange is the maximum number of stream data bytes in between index 147 // IndexByteRange is the maximum number of stream data bytes in between index
148 // entries. See archive.Manifest for more information. 148 // entries. See archive.Manifest for more information.
149 IndexByteRange int 149 IndexByteRange int
150 } 150 }
151 151
152 // SettingsLoader returns archival Settings for a given project. 152 // SettingsLoader returns archival Settings for a given project.
153 type SettingsLoader func(context.Context, config.ProjectName) (*Settings, error) 153 type SettingsLoader func(context.Context, cfgtypes.ProjectName) (*Settings, erro r)
154 154
155 // Archivist is a stateless configuration capable of archiving individual log 155 // Archivist is a stateless configuration capable of archiving individual log
156 // streams. 156 // streams.
157 type Archivist struct { 157 type Archivist struct {
158 // Service is the client to use to communicate with Coordinator's Servic es 158 // Service is the client to use to communicate with Coordinator's Servic es
159 // endpoint. 159 // endpoint.
160 Service logdog.ServicesClient 160 Service logdog.ServicesClient
161 161
162 // SettingsLoader loads archival settings for a specific project. 162 // SettingsLoader loads archival settings for a specific project.
163 SettingsLoader SettingsLoader 163 SettingsLoader SettingsLoader
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
200 200
201 // archiveTaskImpl performs the actual task archival. 201 // archiveTaskImpl performs the actual task archival.
202 // 202 //
203 // Its error return value is used to indicate how the archive failed. isFailure 203 // Its error return value is used to indicate how the archive failed. isFailure
204 // will be called to determine if the returned error value is a failure or a 204 // will be called to determine if the returned error value is a failure or a
205 // status error. 205 // status error.
206 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error { 206 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error {
207 at := task.Task() 207 at := task.Task()
208 208
209 // Validate the project name. 209 // Validate the project name.
210 » if err := config.ProjectName(at.Project).Validate(); err != nil { 210 » if err := cfgtypes.ProjectName(at.Project).Validate(); err != nil {
211 task.Consume() 211 task.Consume()
212 return fmt.Errorf("invalid project name %q: %s", at.Project, err ) 212 return fmt.Errorf("invalid project name %q: %s", at.Project, err )
213 } 213 }
214 214
215 // Get the local time. If we are within the dispatchThreshold, retry thi s 215 // Get the local time. If we are within the dispatchThreshold, retry thi s
216 // archival later. 216 // archival later.
217 if ad := at.DispatchedAt.Time(); !ad.IsZero() { 217 if ad := at.DispatchedAt.Time(); !ad.IsZero() {
218 now := clock.Now(c) 218 now := clock.Now(c)
219 delta := now.Sub(ad) 219 delta := now.Sub(ad)
220 if delta < 0 { 220 if delta < 0 {
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
297 age := ls.Age.Duration() 297 age := ls.Age.Duration()
298 if age < at.SettleDelay.Duration() { 298 if age < at.SettleDelay.Duration() {
299 log.Fields{ 299 log.Fields{
300 "age": age, 300 "age": age,
301 "settleDelay": at.SettleDelay.Duration(), 301 "settleDelay": at.SettleDelay.Duration(),
302 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.") 302 }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.")
303 return statusErr(errors.New("log stream is within settle delay") ) 303 return statusErr(errors.New("log stream is within settle delay") )
304 } 304 }
305 305
306 // Load archival settings for this project. 306 // Load archival settings for this project.
307 » settings, err := a.loadSettings(c, config.ProjectName(at.Project)) 307 » settings, err := a.loadSettings(c, cfgtypes.ProjectName(at.Project))
308 if err != nil { 308 if err != nil {
309 log.Fields{ 309 log.Fields{
310 log.ErrorKey: err, 310 log.ErrorKey: err,
311 "project": at.Project, 311 "project": at.Project,
312 }.Errorf(c, "Failed to load settings for project.") 312 }.Errorf(c, "Failed to load settings for project.")
313 return err 313 return err
314 } 314 }
315 315
316 ar := logdog.ArchiveStreamRequest{ 316 ar := logdog.ArchiveStreamRequest{
317 Project: at.Project, 317 Project: at.Project,
318 Id: at.Id, 318 Id: at.Id,
319 } 319 }
320 320
321 // Build our staged archival plan. This doesn't actually do any archivin g. 321 // Build our staged archival plan. This doesn't actually do any archivin g.
322 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), s ettings, ls, task.UniqueID()) 322 » staged, err := a.makeStagedArchival(c, cfgtypes.ProjectName(at.Project), settings, ls, task.UniqueID())
323 if err != nil { 323 if err != nil {
324 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") 324 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.")
325 return err 325 return err
326 } 326 }
327 327
328 // Are we required to archive a complete log stream? 328 // Are we required to archive a complete log stream?
329 if age <= at.CompletePeriod.Duration() { 329 if age <= at.CompletePeriod.Duration() {
330 // If we're requiring completeness, perform a keys-only scan of intermediate 330 // If we're requiring completeness, perform a keys-only scan of intermediate
331 // storage to ensure that we have all of the records before we b other 331 // storage to ensure that we have all of the records before we b other
332 // streaming to storage only to find that we are missing data. 332 // streaming to storage only to find that we are missing data.
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
410 return err 410 return err
411 } 411 }
412 412
413 // Archival is complete and acknowledged by Coordinator. Consume the arc hival 413 // Archival is complete and acknowledged by Coordinator. Consume the arc hival
414 // task. 414 // task.
415 task.Consume() 415 task.Consume()
416 return nil 416 return nil
417 } 417 }
418 418
419 // loadSettings loads and validates archival settings. 419 // loadSettings loads and validates archival settings.
420 func (a *Archivist) loadSettings(c context.Context, project config.ProjectName) (*Settings, error) { 420 func (a *Archivist) loadSettings(c context.Context, project cfgtypes.ProjectName ) (*Settings, error) {
421 if a.SettingsLoader == nil { 421 if a.SettingsLoader == nil {
422 panic("no settings loader configured") 422 panic("no settings loader configured")
423 } 423 }
424 424
425 st, err := a.SettingsLoader(c, project) 425 st, err := a.SettingsLoader(c, project)
426 switch { 426 switch {
427 case err != nil: 427 case err != nil:
428 return nil, err 428 return nil, err
429 429
430 case st.GSBase.Bucket() == "": 430 case st.GSBase.Bucket() == "":
431 log.Fields{ 431 log.Fields{
432 log.ErrorKey: err, 432 log.ErrorKey: err,
433 "gsBase": st.GSBase, 433 "gsBase": st.GSBase,
434 }.Errorf(c, "Invalid storage base.") 434 }.Errorf(c, "Invalid storage base.")
435 return nil, errors.New("invalid storage base") 435 return nil, errors.New("invalid storage base")
436 436
437 case st.GSStagingBase.Bucket() == "": 437 case st.GSStagingBase.Bucket() == "":
438 log.Fields{ 438 log.Fields{
439 log.ErrorKey: err, 439 log.ErrorKey: err,
440 "gsStagingBase": st.GSStagingBase, 440 "gsStagingBase": st.GSStagingBase,
441 }.Errorf(c, "Invalid storage staging base.") 441 }.Errorf(c, "Invalid storage staging base.")
442 return nil, errors.New("invalid storage staging base") 442 return nil, errors.New("invalid storage staging base")
443 443
444 default: 444 default:
445 return st, nil 445 return st, nil
446 } 446 }
447 } 447 }
448 448
449 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name, 449 func (a *Archivist) makeStagedArchival(c context.Context, project cfgtypes.Proje ctName,
450 st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva l, error) { 450 st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva l, error) {
451 451
452 sa := stagedArchival{ 452 sa := stagedArchival{
453 Archivist: a, 453 Archivist: a,
454 Settings: st, 454 Settings: st,
455 project: project, 455 project: project,
456 456
457 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), 457 terminalIndex: types.MessageIndex(ls.State.TerminalIndex),
458 } 458 }
459 459
(...skipping 22 matching lines...) Expand all
482 482
483 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) 483 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid)
484 } 484 }
485 return &sa, nil 485 return &sa, nil
486 } 486 }
487 487
488 type stagedArchival struct { 488 type stagedArchival struct {
489 *Archivist 489 *Archivist
490 *Settings 490 *Settings
491 491
492 » project config.ProjectName 492 » project cfgtypes.ProjectName
493 path types.StreamPath 493 path types.StreamPath
494 desc logpb.LogStreamDescriptor 494 desc logpb.LogStreamDescriptor
495 495
496 stream stagingPaths 496 stream stagingPaths
497 index stagingPaths 497 index stagingPaths
498 data stagingPaths 498 data stagingPaths
499 499
500 finalized bool 500 finalized bool
501 terminalIndex types.MessageIndex 501 terminalIndex types.MessageIndex
502 logEntryCount int64 502 logEntryCount int64
(...skipping 295 matching lines...) Expand 10 before | Expand all | Expand 10 after
798 return e.inner 798 return e.inner
799 } 799 }
800 800
801 func isFailure(err error) bool { 801 func isFailure(err error) bool {
802 if err == nil { 802 if err == nil {
803 return false 803 return false
804 } 804 }
805 _, ok := err.(*statusErrorWrapper) 805 _, ok := err.(*statusErrorWrapper)
806 return !ok 806 return !ok
807 } 807 }
OLDNEW
« no previous file with comments | « logdog/common/viewer/url_test.go ('k') | logdog/server/archivist/archivist_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698