Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(158)

Unified Diff: logdog/common/storage/archive/storage.go

Issue 2435113002: LogDog: Add Storage-layer data caching. (Closed)
Patch Set: Fix byteLimit bug. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « logdog/common/storage/archive/cache.go ('k') | logdog/common/storage/archive/storage_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: logdog/common/storage/archive/storage.go
diff --git a/logdog/common/storage/archive/storage.go b/logdog/common/storage/archive/storage.go
index d69d728bc803b5df21a047f044fc6fbc88a2c663..1c7b9d6208c0e12cf3bc89b38a9c5bcc096f7dbb 100644
--- a/logdog/common/storage/archive/storage.go
+++ b/logdog/common/storage/archive/storage.go
@@ -18,7 +18,7 @@ import (
"io"
"io/ioutil"
"sort"
- "sync"
+ "sync/atomic"
"github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/data/recordio"
@@ -28,6 +28,7 @@ import (
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/logdog/api/logpb"
"github.com/luci/luci-go/logdog/common/storage"
+ "github.com/luci/luci-go/logdog/common/storage/caching"
"github.com/luci/luci-go/logdog/common/types"
cloudStorage "cloud.google.com/go/storage"
@@ -59,6 +60,9 @@ type Options struct {
//
// Closing this Storage instance does not close the underlying Client.
Client gs.Client
+
+ // Cache, if not nil, will be used to cache data.
+ Cache caching.Cache
}
type storageImpl struct {
@@ -68,8 +72,7 @@ type storageImpl struct {
streamPath gs.Path
indexPath gs.Path
- indexMu sync.Mutex
- index *logpb.LogIndex
+ index atomic.Value
}
// New instantiates a new Storage instance, bound to the supplied Options.
@@ -284,50 +287,65 @@ func (s *storageImpl) Tail(project config.ProjectName, path types.StreamPath) (*
// getIndex returns the cached log stream index, fetching it if necessary.
func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
- s.indexMu.Lock()
- defer s.indexMu.Unlock()
-
- if s.index == nil {
- index, err := loadIndex(s, s.Client, s.indexPath)
- switch errors.Unwrap(err) {
- case nil:
- break
+ idx := s.index.Load()
+ if idx != nil {
+ return idx.(*logpb.LogIndex), nil
+ }
- case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotExist:
- // Treat a missing index the same as an empty index.
- log.WithError(err).Warningf(s, "Index is invalid, using empty index.")
- index = &logpb.LogIndex{}
+ index, err := loadIndex(s, s.Client, s.indexPath, s.Cache)
+ switch errors.Unwrap(err) {
+ case nil:
+ break
- default:
- return nil, err
- }
+ case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotExist:
+ // Treat a missing index the same as an empty index.
+ log.WithError(err).Warningf(s, "Index is invalid, using empty index.")
+ index = &logpb.LogIndex{}
- s.index = index
+ default:
+ return nil, err
}
- return s.index, nil
+
+ s.index.Store(index)
+ return index, nil
}
-func loadIndex(c context.Context, client gs.Client, path gs.Path) (*logpb.LogIndex, error) {
+func loadIndex(c context.Context, client gs.Client, path gs.Path, cache caching.Cache) (*logpb.LogIndex, error) {
// If there is no path, then return an empty index.
if path == "" {
log.Infof(c, "No index path, using empty index.")
return &logpb.LogIndex{}, nil
}
- r, err := client.NewReader(path, 0, -1)
- if err != nil {
- log.WithError(err).Errorf(c, "Failed to create index Reader.")
- return nil, errors.Annotate(err).Reason("failed to create index Reader").Err()
+ // If we have a cache, see if the index is cached.
+ var (
+ indexData []byte
+ cached bool
+ )
+ if cache != nil {
+ indexData = getCachedLogIndexData(c, cache, path)
+ if indexData != nil {
+ cached = true
+ }
}
- defer func() {
- if err := r.Close(); err != nil {
- log.WithError(err).Warningf(c, "Error closing index Reader.")
+
+ if indexData == nil {
+ // No cache, or no cached entry. Load from storage.
+ r, err := client.NewReader(path, 0, -1)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create index Reader.")
+ return nil, errors.Annotate(err).Reason("failed to create index Reader").Err()
+ }
+ defer func() {
+ if err := r.Close(); err != nil {
+ log.WithError(err).Warningf(c, "Error closing index Reader.")
+ }
+ }()
+
+ if indexData, err = ioutil.ReadAll(r); err != nil {
+ log.WithError(err).Errorf(c, "Failed to read index.")
+ return nil, errors.Annotate(err).Reason("failed to read index").Err()
}
- }()
- indexData, err := ioutil.ReadAll(r)
- if err != nil {
- log.WithError(err).Errorf(c, "Failed to read index.")
- return nil, errors.Annotate(err).Reason("failed to read index").Err()
}
index := logpb.LogIndex{}
@@ -336,6 +354,11 @@ func loadIndex(c context.Context, client gs.Client, path gs.Path) (*logpb.LogInd
return nil, errors.Annotate(err).Reason("failed to unmarshal index").Err()
}
+ // If the index is valid, but wasn't cached previously, then cache it.
+ if cache != nil && !cached {
+ putCachedLogIndexData(c, cache, path, indexData)
+ }
+
return &index, nil
}
« no previous file with comments | « logdog/common/storage/archive/cache.go ('k') | logdog/common/storage/archive/storage_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698