| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "encoding/hex" | 9 "encoding/hex" |
| 10 "fmt" | 10 "fmt" |
| 11 "io" | 11 "io" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/golang/protobuf/proto" | 14 "github.com/golang/protobuf/proto" |
| 15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 16 | 16 |
| 17 "github.com/luci/luci-go/common/clock" | 17 "github.com/luci/luci-go/common/clock" |
| 18 "github.com/luci/luci-go/common/config" | |
| 19 "github.com/luci/luci-go/common/errors" | 18 "github.com/luci/luci-go/common/errors" |
| 20 "github.com/luci/luci-go/common/gcloud/gs" | 19 "github.com/luci/luci-go/common/gcloud/gs" |
| 21 log "github.com/luci/luci-go/common/logging" | 20 log "github.com/luci/luci-go/common/logging" |
| 22 "github.com/luci/luci-go/common/sync/parallel" | 21 "github.com/luci/luci-go/common/sync/parallel" |
| 23 "github.com/luci/luci-go/common/tsmon/distribution" | 22 "github.com/luci/luci-go/common/tsmon/distribution" |
| 24 "github.com/luci/luci-go/common/tsmon/field" | 23 "github.com/luci/luci-go/common/tsmon/field" |
| 25 "github.com/luci/luci-go/common/tsmon/metric" | 24 "github.com/luci/luci-go/common/tsmon/metric" |
| 26 tsmon_types "github.com/luci/luci-go/common/tsmon/types" | 25 tsmon_types "github.com/luci/luci-go/common/tsmon/types" |
| 27 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 26 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 28 "github.com/luci/luci-go/logdog/api/logpb" | 27 "github.com/luci/luci-go/logdog/api/logpb" |
| 29 "github.com/luci/luci-go/logdog/common/archive" | 28 "github.com/luci/luci-go/logdog/common/archive" |
| 30 "github.com/luci/luci-go/logdog/common/storage" | 29 "github.com/luci/luci-go/logdog/common/storage" |
| 31 "github.com/luci/luci-go/logdog/common/types" | 30 "github.com/luci/luci-go/logdog/common/types" |
| 31 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 32 ) | 32 ) |
| 33 | 33 |
| 34 const ( | 34 const ( |
| 35 tsEntriesField = "entries" | 35 tsEntriesField = "entries" |
| 36 tsIndexField = "index" | 36 tsIndexField = "index" |
| 37 tsDataField = "data" | 37 tsDataField = "data" |
| 38 | 38 |
| 39 // If the archive dispatch is within this range of the current time, we
will | 39 // If the archive dispatch is within this range of the current time, we
will |
| 40 // avoid archival. | 40 // avoid archival. |
| 41 dispatchThreshold = 5 * time.Minute | 41 dispatchThreshold = 5 * time.Minute |
| (...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 143 IndexStreamRange int | 143 IndexStreamRange int |
| 144 // IndexPrefixRange is the maximum number of prefix indexes in between i
ndex | 144 // IndexPrefixRange is the maximum number of prefix indexes in between i
ndex |
| 145 // entries. See archive.Manifest for more information. | 145 // entries. See archive.Manifest for more information. |
| 146 IndexPrefixRange int | 146 IndexPrefixRange int |
| 147 // IndexByteRange is the maximum number of stream data bytes in between
index | 147 // IndexByteRange is the maximum number of stream data bytes in between
index |
| 148 // entries. See archive.Manifest for more information. | 148 // entries. See archive.Manifest for more information. |
| 149 IndexByteRange int | 149 IndexByteRange int |
| 150 } | 150 } |
| 151 | 151 |
| 152 // SettingsLoader returns archival Settings for a given project. | 152 // SettingsLoader returns archival Settings for a given project. |
| 153 type SettingsLoader func(context.Context, config.ProjectName) (*Settings, error) | 153 type SettingsLoader func(context.Context, cfgtypes.ProjectName) (*Settings, erro
r) |
| 154 | 154 |
| 155 // Archivist is a stateless configuration capable of archiving individual log | 155 // Archivist is a stateless configuration capable of archiving individual log |
| 156 // streams. | 156 // streams. |
| 157 type Archivist struct { | 157 type Archivist struct { |
| 158 // Service is the client to use to communicate with Coordinator's Servic
es | 158 // Service is the client to use to communicate with Coordinator's Servic
es |
| 159 // endpoint. | 159 // endpoint. |
| 160 Service logdog.ServicesClient | 160 Service logdog.ServicesClient |
| 161 | 161 |
| 162 // SettingsLoader loads archival settings for a specific project. | 162 // SettingsLoader loads archival settings for a specific project. |
| 163 SettingsLoader SettingsLoader | 163 SettingsLoader SettingsLoader |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 200 | 200 |
| 201 // archiveTaskImpl performs the actual task archival. | 201 // archiveTaskImpl performs the actual task archival. |
| 202 // | 202 // |
| 203 // Its error return value is used to indicate how the archive failed. isFailure | 203 // Its error return value is used to indicate how the archive failed. isFailure |
| 204 // will be called to determine if the returned error value is a failure or a | 204 // will be called to determine if the returned error value is a failure or a |
| 205 // status error. | 205 // status error. |
| 206 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error { | 206 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error { |
| 207 at := task.Task() | 207 at := task.Task() |
| 208 | 208 |
| 209 // Validate the project name. | 209 // Validate the project name. |
| 210 » if err := config.ProjectName(at.Project).Validate(); err != nil { | 210 » if err := cfgtypes.ProjectName(at.Project).Validate(); err != nil { |
| 211 task.Consume() | 211 task.Consume() |
| 212 return fmt.Errorf("invalid project name %q: %s", at.Project, err
) | 212 return fmt.Errorf("invalid project name %q: %s", at.Project, err
) |
| 213 } | 213 } |
| 214 | 214 |
| 215 // Get the local time. If we are within the dispatchThreshold, retry thi
s | 215 // Get the local time. If we are within the dispatchThreshold, retry thi
s |
| 216 // archival later. | 216 // archival later. |
| 217 if ad := at.DispatchedAt.Time(); !ad.IsZero() { | 217 if ad := at.DispatchedAt.Time(); !ad.IsZero() { |
| 218 now := clock.Now(c) | 218 now := clock.Now(c) |
| 219 delta := now.Sub(ad) | 219 delta := now.Sub(ad) |
| 220 if delta < 0 { | 220 if delta < 0 { |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 297 age := ls.Age.Duration() | 297 age := ls.Age.Duration() |
| 298 if age < at.SettleDelay.Duration() { | 298 if age < at.SettleDelay.Duration() { |
| 299 log.Fields{ | 299 log.Fields{ |
| 300 "age": age, | 300 "age": age, |
| 301 "settleDelay": at.SettleDelay.Duration(), | 301 "settleDelay": at.SettleDelay.Duration(), |
| 302 }.Infof(c, "Log stream is younger than the settle delay. Returni
ng task to queue.") | 302 }.Infof(c, "Log stream is younger than the settle delay. Returni
ng task to queue.") |
| 303 return statusErr(errors.New("log stream is within settle delay")
) | 303 return statusErr(errors.New("log stream is within settle delay")
) |
| 304 } | 304 } |
| 305 | 305 |
| 306 // Load archival settings for this project. | 306 // Load archival settings for this project. |
| 307 » settings, err := a.loadSettings(c, config.ProjectName(at.Project)) | 307 » settings, err := a.loadSettings(c, cfgtypes.ProjectName(at.Project)) |
| 308 if err != nil { | 308 if err != nil { |
| 309 log.Fields{ | 309 log.Fields{ |
| 310 log.ErrorKey: err, | 310 log.ErrorKey: err, |
| 311 "project": at.Project, | 311 "project": at.Project, |
| 312 }.Errorf(c, "Failed to load settings for project.") | 312 }.Errorf(c, "Failed to load settings for project.") |
| 313 return err | 313 return err |
| 314 } | 314 } |
| 315 | 315 |
| 316 ar := logdog.ArchiveStreamRequest{ | 316 ar := logdog.ArchiveStreamRequest{ |
| 317 Project: at.Project, | 317 Project: at.Project, |
| 318 Id: at.Id, | 318 Id: at.Id, |
| 319 } | 319 } |
| 320 | 320 |
| 321 // Build our staged archival plan. This doesn't actually do any archivin
g. | 321 // Build our staged archival plan. This doesn't actually do any archivin
g. |
| 322 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), s
ettings, ls, task.UniqueID()) | 322 » staged, err := a.makeStagedArchival(c, cfgtypes.ProjectName(at.Project),
settings, ls, task.UniqueID()) |
| 323 if err != nil { | 323 if err != nil { |
| 324 log.WithError(err).Errorf(c, "Failed to create staged archival p
lan.") | 324 log.WithError(err).Errorf(c, "Failed to create staged archival p
lan.") |
| 325 return err | 325 return err |
| 326 } | 326 } |
| 327 | 327 |
| 328 // Are we required to archive a complete log stream? | 328 // Are we required to archive a complete log stream? |
| 329 if age <= at.CompletePeriod.Duration() { | 329 if age <= at.CompletePeriod.Duration() { |
| 330 // If we're requiring completeness, perform a keys-only scan of
intermediate | 330 // If we're requiring completeness, perform a keys-only scan of
intermediate |
| 331 // storage to ensure that we have all of the records before we b
other | 331 // storage to ensure that we have all of the records before we b
other |
| 332 // streaming to storage only to find that we are missing data. | 332 // streaming to storage only to find that we are missing data. |
| (...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 410 return err | 410 return err |
| 411 } | 411 } |
| 412 | 412 |
| 413 // Archival is complete and acknowledged by Coordinator. Consume the arc
hival | 413 // Archival is complete and acknowledged by Coordinator. Consume the arc
hival |
| 414 // task. | 414 // task. |
| 415 task.Consume() | 415 task.Consume() |
| 416 return nil | 416 return nil |
| 417 } | 417 } |
| 418 | 418 |
| 419 // loadSettings loads and validates archival settings. | 419 // loadSettings loads and validates archival settings. |
| 420 func (a *Archivist) loadSettings(c context.Context, project config.ProjectName)
(*Settings, error) { | 420 func (a *Archivist) loadSettings(c context.Context, project cfgtypes.ProjectName
) (*Settings, error) { |
| 421 if a.SettingsLoader == nil { | 421 if a.SettingsLoader == nil { |
| 422 panic("no settings loader configured") | 422 panic("no settings loader configured") |
| 423 } | 423 } |
| 424 | 424 |
| 425 st, err := a.SettingsLoader(c, project) | 425 st, err := a.SettingsLoader(c, project) |
| 426 switch { | 426 switch { |
| 427 case err != nil: | 427 case err != nil: |
| 428 return nil, err | 428 return nil, err |
| 429 | 429 |
| 430 case st.GSBase.Bucket() == "": | 430 case st.GSBase.Bucket() == "": |
| 431 log.Fields{ | 431 log.Fields{ |
| 432 log.ErrorKey: err, | 432 log.ErrorKey: err, |
| 433 "gsBase": st.GSBase, | 433 "gsBase": st.GSBase, |
| 434 }.Errorf(c, "Invalid storage base.") | 434 }.Errorf(c, "Invalid storage base.") |
| 435 return nil, errors.New("invalid storage base") | 435 return nil, errors.New("invalid storage base") |
| 436 | 436 |
| 437 case st.GSStagingBase.Bucket() == "": | 437 case st.GSStagingBase.Bucket() == "": |
| 438 log.Fields{ | 438 log.Fields{ |
| 439 log.ErrorKey: err, | 439 log.ErrorKey: err, |
| 440 "gsStagingBase": st.GSStagingBase, | 440 "gsStagingBase": st.GSStagingBase, |
| 441 }.Errorf(c, "Invalid storage staging base.") | 441 }.Errorf(c, "Invalid storage staging base.") |
| 442 return nil, errors.New("invalid storage staging base") | 442 return nil, errors.New("invalid storage staging base") |
| 443 | 443 |
| 444 default: | 444 default: |
| 445 return st, nil | 445 return st, nil |
| 446 } | 446 } |
| 447 } | 447 } |
| 448 | 448 |
| 449 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project
Name, | 449 func (a *Archivist) makeStagedArchival(c context.Context, project cfgtypes.Proje
ctName, |
| 450 st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva
l, error) { | 450 st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva
l, error) { |
| 451 | 451 |
| 452 sa := stagedArchival{ | 452 sa := stagedArchival{ |
| 453 Archivist: a, | 453 Archivist: a, |
| 454 Settings: st, | 454 Settings: st, |
| 455 project: project, | 455 project: project, |
| 456 | 456 |
| 457 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), | 457 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), |
| 458 } | 458 } |
| 459 | 459 |
| (...skipping 22 matching lines...) Expand all Loading... |
| 482 | 482 |
| 483 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) | 483 sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) |
| 484 } | 484 } |
| 485 return &sa, nil | 485 return &sa, nil |
| 486 } | 486 } |
| 487 | 487 |
| 488 type stagedArchival struct { | 488 type stagedArchival struct { |
| 489 *Archivist | 489 *Archivist |
| 490 *Settings | 490 *Settings |
| 491 | 491 |
| 492 » project config.ProjectName | 492 » project cfgtypes.ProjectName |
| 493 path types.StreamPath | 493 path types.StreamPath |
| 494 desc logpb.LogStreamDescriptor | 494 desc logpb.LogStreamDescriptor |
| 495 | 495 |
| 496 stream stagingPaths | 496 stream stagingPaths |
| 497 index stagingPaths | 497 index stagingPaths |
| 498 data stagingPaths | 498 data stagingPaths |
| 499 | 499 |
| 500 finalized bool | 500 finalized bool |
| 501 terminalIndex types.MessageIndex | 501 terminalIndex types.MessageIndex |
| 502 logEntryCount int64 | 502 logEntryCount int64 |
| (...skipping 295 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 798 return e.inner | 798 return e.inner |
| 799 } | 799 } |
| 800 | 800 |
| 801 func isFailure(err error) bool { | 801 func isFailure(err error) bool { |
| 802 if err == nil { | 802 if err == nil { |
| 803 return false | 803 return false |
| 804 } | 804 } |
| 805 _, ok := err.(*statusErrorWrapper) | 805 _, ok := err.(*statusErrorWrapper) |
| 806 return !ok | 806 return !ok |
| 807 } | 807 } |
| OLD | NEW |