| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
| 6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
| 7 // | 7 // |
| 8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
| 9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
| 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
| 11 // - Storage methods ignore the supplied Path argument, instead opting for | 11 // - Storage methods ignore the supplied Path argument, instead opting for |
| 12 // the archive configured in its Options. | 12 // the archive configured in its Options. |
| 13 package archive | 13 package archive |
| 14 | 14 |
| 15 import ( | 15 import ( |
| 16 "bytes" | 16 "bytes" |
| 17 "fmt" | 17 "fmt" |
| 18 "io" | 18 "io" |
| 19 "io/ioutil" | 19 "io/ioutil" |
| 20 "sort" | 20 "sort" |
| 21 » "sync" | 21 » "sync/atomic" |
| 22 | 22 |
| 23 "github.com/luci/luci-go/common/config" | 23 "github.com/luci/luci-go/common/config" |
| 24 "github.com/luci/luci-go/common/data/recordio" | 24 "github.com/luci/luci-go/common/data/recordio" |
| 25 "github.com/luci/luci-go/common/errors" | 25 "github.com/luci/luci-go/common/errors" |
| 26 "github.com/luci/luci-go/common/gcloud/gs" | 26 "github.com/luci/luci-go/common/gcloud/gs" |
| 27 "github.com/luci/luci-go/common/iotools" | 27 "github.com/luci/luci-go/common/iotools" |
| 28 log "github.com/luci/luci-go/common/logging" | 28 log "github.com/luci/luci-go/common/logging" |
| 29 "github.com/luci/luci-go/logdog/api/logpb" | 29 "github.com/luci/luci-go/logdog/api/logpb" |
| 30 "github.com/luci/luci-go/logdog/common/storage" | 30 "github.com/luci/luci-go/logdog/common/storage" |
| 31 "github.com/luci/luci-go/logdog/common/storage/caching" |
| 31 "github.com/luci/luci-go/logdog/common/types" | 32 "github.com/luci/luci-go/logdog/common/types" |
| 32 | 33 |
| 33 cloudStorage "cloud.google.com/go/storage" | 34 cloudStorage "cloud.google.com/go/storage" |
| 34 "github.com/golang/protobuf/proto" | 35 "github.com/golang/protobuf/proto" |
| 35 "golang.org/x/net/context" | 36 "golang.org/x/net/context" |
| 36 ) | 37 ) |
| 37 | 38 |
| 38 const ( | 39 const ( |
| 39 // maxStreamRecordSize is the maximum record size we're willing to read
from | 40 // maxStreamRecordSize is the maximum record size we're willing to read
from |
| 40 // our archived log stream. This will help prevent out-of-memory errors
if the | 41 // our archived log stream. This will help prevent out-of-memory errors
if the |
| (...skipping 11 matching lines...) Expand all Loading... |
| 52 type Options struct { | 53 type Options struct { |
| 53 // IndexURL is the Google Storage URL for the stream's index. | 54 // IndexURL is the Google Storage URL for the stream's index. |
| 54 IndexURL string | 55 IndexURL string |
| 55 // StreamURL is the Google Storage URL for the stream's entries. | 56 // StreamURL is the Google Storage URL for the stream's entries. |
| 56 StreamURL string | 57 StreamURL string |
| 57 | 58 |
| 58 // Client is the HTTP client to use for authentication. | 59 // Client is the HTTP client to use for authentication. |
| 59 // | 60 // |
| 60 // Closing this Storage instance does not close the underlying Client. | 61 // Closing this Storage instance does not close the underlying Client. |
| 61 Client gs.Client | 62 Client gs.Client |
| 63 |
| 64 // Cache, if not nil, will be used to cache data. |
| 65 Cache caching.Cache |
| 62 } | 66 } |
| 63 | 67 |
| 64 type storageImpl struct { | 68 type storageImpl struct { |
| 65 *Options | 69 *Options |
| 66 context.Context | 70 context.Context |
| 67 | 71 |
| 68 streamPath gs.Path | 72 streamPath gs.Path |
| 69 indexPath gs.Path | 73 indexPath gs.Path |
| 70 | 74 |
| 71 » indexMu sync.Mutex | 75 » index atomic.Value |
| 72 » index *logpb.LogIndex | |
| 73 } | 76 } |
| 74 | 77 |
| 75 // New instantiates a new Storage instance, bound to the supplied Options. | 78 // New instantiates a new Storage instance, bound to the supplied Options. |
| 76 func New(ctx context.Context, o Options) (storage.Storage, error) { | 79 func New(ctx context.Context, o Options) (storage.Storage, error) { |
| 77 s := storageImpl{ | 80 s := storageImpl{ |
| 78 Options: &o, | 81 Options: &o, |
| 79 Context: ctx, | 82 Context: ctx, |
| 80 | 83 |
| 81 streamPath: gs.Path(o.StreamURL), | 84 streamPath: gs.Path(o.StreamURL), |
| 82 indexPath: gs.Path(o.IndexURL), | 85 indexPath: gs.Path(o.IndexURL), |
| (...skipping 194 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 277 case lastEntry == nil: | 280 case lastEntry == nil: |
| 278 return nil, storage.ErrDoesNotExist | 281 return nil, storage.ErrDoesNotExist |
| 279 | 282 |
| 280 default: | 283 default: |
| 281 return lastEntry, nil | 284 return lastEntry, nil |
| 282 } | 285 } |
| 283 } | 286 } |
| 284 | 287 |
| 285 // getIndex returns the cached log stream index, fetching it if necessary. | 288 // getIndex returns the cached log stream index, fetching it if necessary. |
| 286 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { | 289 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { |
| 287 » s.indexMu.Lock() | 290 » idx := s.index.Load() |
| 288 » defer s.indexMu.Unlock() | 291 » if idx != nil { |
| 292 » » return idx.(*logpb.LogIndex), nil |
| 293 » } |
| 289 | 294 |
| 290 » if s.index == nil { | 295 » index, err := loadIndex(s, s.Client, s.indexPath, s.Cache) |
| 291 » » index, err := loadIndex(s, s.Client, s.indexPath) | 296 » switch errors.Unwrap(err) { |
| 292 » » switch errors.Unwrap(err) { | 297 » case nil: |
| 293 » » case nil: | 298 » » break |
| 294 » » » break | |
| 295 | 299 |
| 296 » » case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotEx
ist: | 300 » case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotExist: |
| 297 » » » // Treat a missing index the same as an empty index. | 301 » » // Treat a missing index the same as an empty index. |
| 298 » » » log.WithError(err).Warningf(s, "Index is invalid, using
empty index.") | 302 » » log.WithError(err).Warningf(s, "Index is invalid, using empty in
dex.") |
| 299 » » » index = &logpb.LogIndex{} | 303 » » index = &logpb.LogIndex{} |
| 300 | 304 |
| 301 » » default: | 305 » default: |
| 302 » » » return nil, err | 306 » » return nil, err |
| 303 » » } | 307 » } |
| 304 | 308 |
| 305 » » s.index = index | 309 » s.index.Store(index) |
| 306 » } | 310 » return index, nil |
| 307 » return s.index, nil | |
| 308 } | 311 } |
| 309 | 312 |
| 310 func loadIndex(c context.Context, client gs.Client, path gs.Path) (*logpb.LogInd
ex, error) { | 313 func loadIndex(c context.Context, client gs.Client, path gs.Path, cache caching.
Cache) (*logpb.LogIndex, error) { |
| 311 // If there is no path, then return an empty index. | 314 // If there is no path, then return an empty index. |
| 312 if path == "" { | 315 if path == "" { |
| 313 log.Infof(c, "No index path, using empty index.") | 316 log.Infof(c, "No index path, using empty index.") |
| 314 return &logpb.LogIndex{}, nil | 317 return &logpb.LogIndex{}, nil |
| 315 } | 318 } |
| 316 | 319 |
| 317 » r, err := client.NewReader(path, 0, -1) | 320 » // If we have a cache, see if the index is cached. |
| 318 » if err != nil { | 321 » var ( |
| 319 » » log.WithError(err).Errorf(c, "Failed to create index Reader.") | 322 » » indexData []byte |
| 320 » » return nil, errors.Annotate(err).Reason("failed to create index
Reader").Err() | 323 » » cached bool |
| 324 » ) |
| 325 » if cache != nil { |
| 326 » » indexData = getCachedLogIndexData(c, cache, path) |
| 327 » » if indexData != nil { |
| 328 » » » cached = true |
| 329 » » } |
| 321 } | 330 } |
| 322 » defer func() { | 331 |
| 323 » » if err := r.Close(); err != nil { | 332 » if indexData == nil { |
| 324 » » » log.WithError(err).Warningf(c, "Error closing index Read
er.") | 333 » » // No cache, or no cached entry. Load from storage. |
| 334 » » r, err := client.NewReader(path, 0, -1) |
| 335 » » if err != nil { |
| 336 » » » log.WithError(err).Errorf(c, "Failed to create index Rea
der.") |
| 337 » » » return nil, errors.Annotate(err).Reason("failed to creat
e index Reader").Err() |
| 325 } | 338 } |
| 326 » }() | 339 » » defer func() { |
| 327 » indexData, err := ioutil.ReadAll(r) | 340 » » » if err := r.Close(); err != nil { |
| 328 » if err != nil { | 341 » » » » log.WithError(err).Warningf(c, "Error closing in
dex Reader.") |
| 329 » » log.WithError(err).Errorf(c, "Failed to read index.") | 342 » » » } |
| 330 » » return nil, errors.Annotate(err).Reason("failed to read index").
Err() | 343 » » }() |
| 344 |
| 345 » » if indexData, err = ioutil.ReadAll(r); err != nil { |
| 346 » » » log.WithError(err).Errorf(c, "Failed to read index.") |
| 347 » » » return nil, errors.Annotate(err).Reason("failed to read
index").Err() |
| 348 » » } |
| 331 } | 349 } |
| 332 | 350 |
| 333 index := logpb.LogIndex{} | 351 index := logpb.LogIndex{} |
| 334 if err := proto.Unmarshal(indexData, &index); err != nil { | 352 if err := proto.Unmarshal(indexData, &index); err != nil { |
| 335 log.WithError(err).Errorf(c, "Failed to unmarshal index.") | 353 log.WithError(err).Errorf(c, "Failed to unmarshal index.") |
| 336 return nil, errors.Annotate(err).Reason("failed to unmarshal ind
ex").Err() | 354 return nil, errors.Annotate(err).Reason("failed to unmarshal ind
ex").Err() |
| 337 } | 355 } |
| 338 | 356 |
| 357 // If the index is valid, but wasn't cached previously, then cache it. |
| 358 if cache != nil && !cached { |
| 359 putCachedLogIndexData(c, cache, path, indexData) |
| 360 } |
| 361 |
| 339 return &index, nil | 362 return &index, nil |
| 340 } | 363 } |
| 341 | 364 |
| 342 type getStrategy struct { | 365 type getStrategy struct { |
| 343 // startIndex is desired initial log entry index. | 366 // startIndex is desired initial log entry index. |
| 344 startIndex types.MessageIndex | 367 startIndex types.MessageIndex |
| 345 | 368 |
| 346 // startOffset is the beginning byte offset of the log entry stream. Thi
s may | 369 // startOffset is the beginning byte offset of the log entry stream. Thi
s may |
| 347 // be lower than the offset of the starting record if the index is spars
e. | 370 // be lower than the offset of the starting record if the index is spars
e. |
| 348 startOffset uint64 | 371 startOffset uint64 |
| (...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 451 ui := uint64(i) | 474 ui := uint64(i) |
| 452 s := sort.Search(len(entries), func(i int) bool { | 475 s := sort.Search(len(entries), func(i int) bool { |
| 453 return entries[i].StreamIndex > ui | 476 return entries[i].StreamIndex > ui |
| 454 }) | 477 }) |
| 455 | 478 |
| 456 // The returned index is the one immediately after the index that we wan
t. If | 479 // The returned index is the one immediately after the index that we wan
t. If |
| 457 // our search returned 0, the first index entry is > our search entry, a
nd we | 480 // our search returned 0, the first index entry is > our search entry, a
nd we |
| 458 // will return nil. | 481 // will return nil. |
| 459 return s - 1 | 482 return s - 1 |
| 460 } | 483 } |
| OLD | NEW |