| Index: logdog/common/storage/archive/storage.go
|
| diff --git a/logdog/common/storage/archive/storage.go b/logdog/common/storage/archive/storage.go
|
| index d69d728bc803b5df21a047f044fc6fbc88a2c663..1c7b9d6208c0e12cf3bc89b38a9c5bcc096f7dbb 100644
|
| --- a/logdog/common/storage/archive/storage.go
|
| +++ b/logdog/common/storage/archive/storage.go
|
| @@ -18,7 +18,7 @@ import (
|
| "io"
|
| "io/ioutil"
|
| "sort"
|
| - "sync"
|
| + "sync/atomic"
|
|
|
| "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/data/recordio"
|
| @@ -28,6 +28,7 @@ import (
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/logdog/api/logpb"
|
| "github.com/luci/luci-go/logdog/common/storage"
|
| + "github.com/luci/luci-go/logdog/common/storage/caching"
|
| "github.com/luci/luci-go/logdog/common/types"
|
|
|
| cloudStorage "cloud.google.com/go/storage"
|
| @@ -59,6 +60,9 @@ type Options struct {
|
| //
|
| // Closing this Storage instance does not close the underlying Client.
|
| Client gs.Client
|
| +
|
| + // Cache, if not nil, will be used to cache data.
|
| + Cache caching.Cache
|
| }
|
|
|
| type storageImpl struct {
|
| @@ -68,8 +72,7 @@ type storageImpl struct {
|
| streamPath gs.Path
|
| indexPath gs.Path
|
|
|
| - indexMu sync.Mutex
|
| - index *logpb.LogIndex
|
| + index atomic.Value
|
| }
|
|
|
| // New instantiates a new Storage instance, bound to the supplied Options.
|
| @@ -284,50 +287,65 @@ func (s *storageImpl) Tail(project config.ProjectName, path types.StreamPath) (*
|
|
|
| // getIndex returns the cached log stream index, fetching it if necessary.
|
| func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
|
| - s.indexMu.Lock()
|
| - defer s.indexMu.Unlock()
|
| -
|
| - if s.index == nil {
|
| - index, err := loadIndex(s, s.Client, s.indexPath)
|
| - switch errors.Unwrap(err) {
|
| - case nil:
|
| - break
|
| + idx := s.index.Load()
|
| + if idx != nil {
|
| + return idx.(*logpb.LogIndex), nil
|
| + }
|
|
|
| - case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotExist:
|
| - // Treat a missing index the same as an empty index.
|
| - log.WithError(err).Warningf(s, "Index is invalid, using empty index.")
|
| - index = &logpb.LogIndex{}
|
| + index, err := loadIndex(s, s.Client, s.indexPath, s.Cache)
|
| + switch errors.Unwrap(err) {
|
| + case nil:
|
| + break
|
|
|
| - default:
|
| - return nil, err
|
| - }
|
| + case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotExist:
|
| + // Treat a missing index the same as an empty index.
|
| + log.WithError(err).Warningf(s, "Index is invalid, using empty index.")
|
| + index = &logpb.LogIndex{}
|
|
|
| - s.index = index
|
| + default:
|
| + return nil, err
|
| }
|
| - return s.index, nil
|
| +
|
| + s.index.Store(index)
|
| + return index, nil
|
| }
|
|
|
| -func loadIndex(c context.Context, client gs.Client, path gs.Path) (*logpb.LogIndex, error) {
|
| +func loadIndex(c context.Context, client gs.Client, path gs.Path, cache caching.Cache) (*logpb.LogIndex, error) {
|
| // If there is no path, then return an empty index.
|
| if path == "" {
|
| log.Infof(c, "No index path, using empty index.")
|
| return &logpb.LogIndex{}, nil
|
| }
|
|
|
| - r, err := client.NewReader(path, 0, -1)
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to create index Reader.")
|
| - return nil, errors.Annotate(err).Reason("failed to create index Reader").Err()
|
| + // If we have a cache, see if the index is cached.
|
| + var (
|
| + indexData []byte
|
| + cached bool
|
| + )
|
| + if cache != nil {
|
| + indexData = getCachedLogIndexData(c, cache, path)
|
| + if indexData != nil {
|
| + cached = true
|
| + }
|
| }
|
| - defer func() {
|
| - if err := r.Close(); err != nil {
|
| - log.WithError(err).Warningf(c, "Error closing index Reader.")
|
| +
|
| + if indexData == nil {
|
| + // No cache, or no cached entry. Load from storage.
|
| + r, err := client.NewReader(path, 0, -1)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create index Reader.")
|
| + return nil, errors.Annotate(err).Reason("failed to create index Reader").Err()
|
| + }
|
| + defer func() {
|
| + if err := r.Close(); err != nil {
|
| + log.WithError(err).Warningf(c, "Error closing index Reader.")
|
| + }
|
| + }()
|
| +
|
| + if indexData, err = ioutil.ReadAll(r); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to read index.")
|
| + return nil, errors.Annotate(err).Reason("failed to read index").Err()
|
| }
|
| - }()
|
| - indexData, err := ioutil.ReadAll(r)
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to read index.")
|
| - return nil, errors.Annotate(err).Reason("failed to read index").Err()
|
| }
|
|
|
| index := logpb.LogIndex{}
|
| @@ -336,6 +354,11 @@ func loadIndex(c context.Context, client gs.Client, path gs.Path) (*logpb.LogInd
|
| return nil, errors.Annotate(err).Reason("failed to unmarshal index").Err()
|
| }
|
|
|
| + // If the index is valid, but wasn't cached previously, then cache it.
|
| + if cache != nil && !cached {
|
| + putCachedLogIndexData(c, cache, path, indexData)
|
| + }
|
| +
|
| return &index, nil
|
| }
|
|
|
|
|