Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(11)

Side by Side Diff: logdog/common/storage/archive/storage.go

Issue 2435113002: LogDog: Add Storage-layer data caching. (Closed)
Patch Set: Fix byteLimit bug. Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/common/storage/archive/cache.go ('k') | logdog/common/storage/archive/storage_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 // Package archive implements a storage.Storage instance that retrieves logs 5 // Package archive implements a storage.Storage instance that retrieves logs
6 // from a Google Storage archive. 6 // from a Google Storage archive.
7 // 7 //
8 // This is a special implementation of storage.Storage, and does not fully 8 // This is a special implementation of storage.Storage, and does not fully
9 // conform to the API expecations. Namely: 9 // conform to the API expecations. Namely:
10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly.
11 // - Storage methods ignore the supplied Path argument, instead opting for 11 // - Storage methods ignore the supplied Path argument, instead opting for
12 // the archive configured in its Options. 12 // the archive configured in its Options.
13 package archive 13 package archive
14 14
15 import ( 15 import (
16 "bytes" 16 "bytes"
17 "fmt" 17 "fmt"
18 "io" 18 "io"
19 "io/ioutil" 19 "io/ioutil"
20 "sort" 20 "sort"
21 » "sync" 21 » "sync/atomic"
22 22
23 "github.com/luci/luci-go/common/config" 23 "github.com/luci/luci-go/common/config"
24 "github.com/luci/luci-go/common/data/recordio" 24 "github.com/luci/luci-go/common/data/recordio"
25 "github.com/luci/luci-go/common/errors" 25 "github.com/luci/luci-go/common/errors"
26 "github.com/luci/luci-go/common/gcloud/gs" 26 "github.com/luci/luci-go/common/gcloud/gs"
27 "github.com/luci/luci-go/common/iotools" 27 "github.com/luci/luci-go/common/iotools"
28 log "github.com/luci/luci-go/common/logging" 28 log "github.com/luci/luci-go/common/logging"
29 "github.com/luci/luci-go/logdog/api/logpb" 29 "github.com/luci/luci-go/logdog/api/logpb"
30 "github.com/luci/luci-go/logdog/common/storage" 30 "github.com/luci/luci-go/logdog/common/storage"
31 "github.com/luci/luci-go/logdog/common/storage/caching"
31 "github.com/luci/luci-go/logdog/common/types" 32 "github.com/luci/luci-go/logdog/common/types"
32 33
33 cloudStorage "cloud.google.com/go/storage" 34 cloudStorage "cloud.google.com/go/storage"
34 "github.com/golang/protobuf/proto" 35 "github.com/golang/protobuf/proto"
35 "golang.org/x/net/context" 36 "golang.org/x/net/context"
36 ) 37 )
37 38
38 const ( 39 const (
39 // maxStreamRecordSize is the maximum record size we're willing to read from 40 // maxStreamRecordSize is the maximum record size we're willing to read from
40 // our archived log stream. This will help prevent out-of-memory errors if the 41 // our archived log stream. This will help prevent out-of-memory errors if the
(...skipping 11 matching lines...) Expand all
52 type Options struct { 53 type Options struct {
53 // IndexURL is the Google Storage URL for the stream's index. 54 // IndexURL is the Google Storage URL for the stream's index.
54 IndexURL string 55 IndexURL string
55 // StreamURL is the Google Storage URL for the stream's entries. 56 // StreamURL is the Google Storage URL for the stream's entries.
56 StreamURL string 57 StreamURL string
57 58
58 // Client is the HTTP client to use for authentication. 59 // Client is the HTTP client to use for authentication.
59 // 60 //
60 // Closing this Storage instance does not close the underlying Client. 61 // Closing this Storage instance does not close the underlying Client.
61 Client gs.Client 62 Client gs.Client
63
64 // Cache, if not nil, will be used to cache data.
65 Cache caching.Cache
62 } 66 }
63 67
64 type storageImpl struct { 68 type storageImpl struct {
65 *Options 69 *Options
66 context.Context 70 context.Context
67 71
68 streamPath gs.Path 72 streamPath gs.Path
69 indexPath gs.Path 73 indexPath gs.Path
70 74
71 » indexMu sync.Mutex 75 » index atomic.Value
72 » index *logpb.LogIndex
73 } 76 }
74 77
75 // New instantiates a new Storage instance, bound to the supplied Options. 78 // New instantiates a new Storage instance, bound to the supplied Options.
76 func New(ctx context.Context, o Options) (storage.Storage, error) { 79 func New(ctx context.Context, o Options) (storage.Storage, error) {
77 s := storageImpl{ 80 s := storageImpl{
78 Options: &o, 81 Options: &o,
79 Context: ctx, 82 Context: ctx,
80 83
81 streamPath: gs.Path(o.StreamURL), 84 streamPath: gs.Path(o.StreamURL),
82 indexPath: gs.Path(o.IndexURL), 85 indexPath: gs.Path(o.IndexURL),
(...skipping 194 matching lines...) Expand 10 before | Expand all | Expand 10 after
277 case lastEntry == nil: 280 case lastEntry == nil:
278 return nil, storage.ErrDoesNotExist 281 return nil, storage.ErrDoesNotExist
279 282
280 default: 283 default:
281 return lastEntry, nil 284 return lastEntry, nil
282 } 285 }
283 } 286 }
284 287
285 // getIndex returns the cached log stream index, fetching it if necessary. 288 // getIndex returns the cached log stream index, fetching it if necessary.
286 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { 289 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
287 » s.indexMu.Lock() 290 » idx := s.index.Load()
288 » defer s.indexMu.Unlock() 291 » if idx != nil {
292 » » return idx.(*logpb.LogIndex), nil
293 » }
289 294
290 » if s.index == nil { 295 » index, err := loadIndex(s, s.Client, s.indexPath, s.Cache)
291 » » index, err := loadIndex(s, s.Client, s.indexPath) 296 » switch errors.Unwrap(err) {
292 » » switch errors.Unwrap(err) { 297 » case nil:
293 » » case nil: 298 » » break
294 » » » break
295 299
296 » » case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotEx ist: 300 » case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotExist:
297 » » » // Treat a missing index the same as an empty index. 301 » » // Treat a missing index the same as an empty index.
298 » » » log.WithError(err).Warningf(s, "Index is invalid, using empty index.") 302 » » log.WithError(err).Warningf(s, "Index is invalid, using empty in dex.")
299 » » » index = &logpb.LogIndex{} 303 » » index = &logpb.LogIndex{}
300 304
301 » » default: 305 » default:
302 » » » return nil, err 306 » » return nil, err
303 » » } 307 » }
304 308
305 » » s.index = index 309 » s.index.Store(index)
306 » } 310 » return index, nil
307 » return s.index, nil
308 } 311 }
309 312
310 func loadIndex(c context.Context, client gs.Client, path gs.Path) (*logpb.LogInd ex, error) { 313 func loadIndex(c context.Context, client gs.Client, path gs.Path, cache caching. Cache) (*logpb.LogIndex, error) {
311 // If there is no path, then return an empty index. 314 // If there is no path, then return an empty index.
312 if path == "" { 315 if path == "" {
313 log.Infof(c, "No index path, using empty index.") 316 log.Infof(c, "No index path, using empty index.")
314 return &logpb.LogIndex{}, nil 317 return &logpb.LogIndex{}, nil
315 } 318 }
316 319
317 » r, err := client.NewReader(path, 0, -1) 320 » // If we have a cache, see if the index is cached.
318 » if err != nil { 321 » var (
319 » » log.WithError(err).Errorf(c, "Failed to create index Reader.") 322 » » indexData []byte
320 » » return nil, errors.Annotate(err).Reason("failed to create index Reader").Err() 323 » » cached bool
324 » )
325 » if cache != nil {
326 » » indexData = getCachedLogIndexData(c, cache, path)
327 » » if indexData != nil {
328 » » » cached = true
329 » » }
321 } 330 }
322 » defer func() { 331
323 » » if err := r.Close(); err != nil { 332 » if indexData == nil {
324 » » » log.WithError(err).Warningf(c, "Error closing index Read er.") 333 » » // No cache, or no cached entry. Load from storage.
334 » » r, err := client.NewReader(path, 0, -1)
335 » » if err != nil {
336 » » » log.WithError(err).Errorf(c, "Failed to create index Rea der.")
337 » » » return nil, errors.Annotate(err).Reason("failed to creat e index Reader").Err()
325 } 338 }
326 » }() 339 » » defer func() {
327 » indexData, err := ioutil.ReadAll(r) 340 » » » if err := r.Close(); err != nil {
328 » if err != nil { 341 » » » » log.WithError(err).Warningf(c, "Error closing in dex Reader.")
329 » » log.WithError(err).Errorf(c, "Failed to read index.") 342 » » » }
330 » » return nil, errors.Annotate(err).Reason("failed to read index"). Err() 343 » » }()
344
345 » » if indexData, err = ioutil.ReadAll(r); err != nil {
346 » » » log.WithError(err).Errorf(c, "Failed to read index.")
347 » » » return nil, errors.Annotate(err).Reason("failed to read index").Err()
348 » » }
331 } 349 }
332 350
333 index := logpb.LogIndex{} 351 index := logpb.LogIndex{}
334 if err := proto.Unmarshal(indexData, &index); err != nil { 352 if err := proto.Unmarshal(indexData, &index); err != nil {
335 log.WithError(err).Errorf(c, "Failed to unmarshal index.") 353 log.WithError(err).Errorf(c, "Failed to unmarshal index.")
336 return nil, errors.Annotate(err).Reason("failed to unmarshal ind ex").Err() 354 return nil, errors.Annotate(err).Reason("failed to unmarshal ind ex").Err()
337 } 355 }
338 356
357 // If the index is valid, but wasn't cached previously, then cache it.
358 if cache != nil && !cached {
359 putCachedLogIndexData(c, cache, path, indexData)
360 }
361
339 return &index, nil 362 return &index, nil
340 } 363 }
341 364
342 type getStrategy struct { 365 type getStrategy struct {
343 // startIndex is desired initial log entry index. 366 // startIndex is desired initial log entry index.
344 startIndex types.MessageIndex 367 startIndex types.MessageIndex
345 368
346 // startOffset is the beginning byte offset of the log entry stream. Thi s may 369 // startOffset is the beginning byte offset of the log entry stream. Thi s may
347 // be lower than the offset of the starting record if the index is spars e. 370 // be lower than the offset of the starting record if the index is spars e.
348 startOffset uint64 371 startOffset uint64
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after
451 ui := uint64(i) 474 ui := uint64(i)
452 s := sort.Search(len(entries), func(i int) bool { 475 s := sort.Search(len(entries), func(i int) bool {
453 return entries[i].StreamIndex > ui 476 return entries[i].StreamIndex > ui
454 }) 477 })
455 478
456 // The returned index is the one immediately after the index that we wan t. If 479 // The returned index is the one immediately after the index that we wan t. If
457 // our search returned 0, the first index entry is > our search entry, a nd we 480 // our search returned 0, the first index entry is > our search entry, a nd we
458 // will return nil. 481 // will return nil.
459 return s - 1 482 return s - 1
460 } 483 }
OLDNEW
« no previous file with comments | « logdog/common/storage/archive/cache.go ('k') | logdog/common/storage/archive/storage_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698