Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(172)

Unified Diff: logdog/common/storage/archive/storage.go

Issue 2435883002: LogDog: Fix archival Get/Tail implementations. (Closed)
Patch Set: LogDog: Fix archival Get/Tail implementations. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: logdog/common/storage/archive/storage.go
diff --git a/logdog/common/storage/archive/storage.go b/logdog/common/storage/archive/storage.go
index 6800adfb8bb50d2ce01a0ecff9ec59268e441fc0..d69d728bc803b5df21a047f044fc6fbc88a2c663 100644
--- a/logdog/common/storage/archive/storage.go
+++ b/logdog/common/storage/archive/storage.go
@@ -20,24 +20,28 @@ import (
"sort"
"sync"
- "golang.org/x/net/context"
-
- "github.com/golang/protobuf/proto"
"github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/data/recordio"
+ "github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/gcloud/gs"
"github.com/luci/luci-go/common/iotools"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/logdog/api/logpb"
"github.com/luci/luci-go/logdog/common/storage"
"github.com/luci/luci-go/logdog/common/types"
+
+ cloudStorage "cloud.google.com/go/storage"
+ "github.com/golang/protobuf/proto"
+ "golang.org/x/net/context"
)
const (
// maxStreamRecordSize is the maximum record size we're willing to read from
// our archived log stream. This will help prevent out-of-memory errors if the
// arhived log stream is malicious or corrupt.
- maxStreamRecordSize = 16 * 1024 * 1024
+ //
+ // 16MB is larger than the maximum log entry size
+ maxStreamRecordSize = 2 * types.MaxLogEntryDataSize
)
// Options is the set of configuration options for this Storage instance.
@@ -55,15 +59,6 @@ type Options struct {
//
// Closing this Storage instance does not close the underlying Client.
Client gs.Client
-
- // MaxBytes, if >0, is the maximum number of bytes to fetch in any given
- // request. This should be set for GAE fetches, as large log streams may
- // exceed the urlfetch system's maximum response size otherwise.
- //
- // This is the number of bytes to request, not the number of bytes of log data
- // to return. The difference is that the former includes the RecordIO frame
- // headers.
- MaxBytes int
}
type storageImpl struct {
@@ -73,9 +68,8 @@ type storageImpl struct {
streamPath gs.Path
indexPath gs.Path
- indexMu sync.Mutex
- index *logpb.LogIndex
- closeClient bool
+ indexMu sync.Mutex
+ index *logpb.LogIndex
}
// New instantiates a new Storage instance, bound to the supplied Options.
@@ -91,18 +85,14 @@ func New(ctx context.Context, o Options) (storage.Storage, error) {
if !s.streamPath.IsFullPath() {
return nil, fmt.Errorf("invalid stream URL: %q", s.streamPath)
}
- if !s.indexPath.IsFullPath() {
+ if s.indexPath != "" && !s.indexPath.IsFullPath() {
return nil, fmt.Errorf("invalid index URL: %v", s.indexPath)
}
return &s, nil
}
-func (s *storageImpl) Close() {
- if s.closeClient {
- _ = s.Client.Close()
- }
-}
+func (s *storageImpl) Close() {}
func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly }
func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly }
@@ -114,133 +104,182 @@ func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error
}
// Identify the byte offsets that we want to fetch from the entries stream.
- st := s.buildGetStrategy(&req, idx)
- if st.lastIndex >= 0 && req.Index > st.lastIndex {
- // We know the last index, and the user requested logs past it, so there are
- // no records to read.
+ st := buildGetStrategy(&req, idx)
+ if st == nil {
+ // No more records to read.
return nil
}
- offset := int64(st.startOffset)
+ switch err := s.getLogEntriesIter(st, cb); errors.Unwrap(err) {
+ case nil, io.EOF:
+ // We hit the end of our log stream.
+ return nil
+
+ case cloudStorage.ErrObjectNotExist, cloudStorage.ErrBucketNotExist:
+ return storage.ErrDoesNotExist
+
+ default:
+ return errors.Annotate(err).Reason("failed to read log stream").Err()
+ }
+}
+
+// getLogEntriesImpl retrieves log entries from archive until complete.
+func (s *storageImpl) getLogEntriesIter(st *getStrategy, cb storage.GetCallback) error {
+ // Get our maximum byte limit. If we are externally constrained via MaxBytes,
+ // apply that limit too.
+ // Get an archive reader.
+ var (
+ offset = st.startOffset
+ length = st.length()
+ )
+
log.Fields{
"offset": offset,
- "length": st.length(),
+ "length": length,
"path": s.streamPath,
}.Debugf(s, "Creating stream reader for range.")
- r, err := s.Client.NewReader(s.streamPath, offset, st.length())
+ storageReader, err := s.Client.NewReader(s.streamPath, int64(offset), length)
if err != nil {
log.WithError(err).Errorf(s, "Failed to create stream Reader.")
- return err
+ return errors.Annotate(err).Reason("failed to create stream Reader").Err()
}
defer func() {
- if err := r.Close(); err != nil {
- log.WithError(err).Warningf(s, "Error closing stream Reader.")
+ if tmpErr := storageReader.Close(); tmpErr != nil {
+ // (Non-fatal)
+ log.WithError(tmpErr).Warningf(s, "Error closing stream Reader.")
}
}()
- cr := iotools.CountingReader{Reader: r}
- rio := recordio.NewReader(&cr, maxStreamRecordSize)
- buf := bytes.Buffer{}
- le := logpb.LogEntry{}
- max := st.count
+ // Count how many bytes we've read.
+ cr := iotools.CountingReader{Reader: storageReader}
+
+ // Iteratively update our strategy's start offset each time we read a complete
+ // frame.
+ var (
+ rio = recordio.NewReader(&cr, maxStreamRecordSize)
+ buf bytes.Buffer
+ remaining = st.count
+ )
for {
- offset += cr.Count()
+ // Reset the count so we know how much we read for this frame.
+ cr.Count = 0
sz, r, err := rio.ReadFrame()
- switch err {
- case nil:
- break
-
- case io.EOF:
- return nil
-
- default:
- log.Fields{
- log.ErrorKey: err,
- "index": idx,
- "offset": offset,
- }.Errorf(s, "Failed to read next frame.")
- return err
+ if err != nil {
+ return errors.Annotate(err).Reason("failed to read frame").Err()
}
buf.Reset()
buf.Grow(int(sz))
- if _, err := buf.ReadFrom(r); err != nil {
+
+ switch amt, err := buf.ReadFrom(r); {
+ case err != nil:
log.Fields{
- log.ErrorKey: err,
- "offset": offset,
- "frameSize": sz,
+ log.ErrorKey: err,
+ "frameOffset": offset,
+ "frameSize": sz,
}.Errorf(s, "Failed to read frame data.")
- return err
+ return errors.Annotate(err).Reason("failed to read frame data").Err()
+
+ case amt != sz:
+ // If we didn't buffer the complete frame, we hit a premature EOF.
+ return errors.Annotate(io.EOF).Reason("incomplete frame read").Err()
}
- if err := proto.Unmarshal(buf.Bytes(), &le); err != nil {
- log.Fields{
- log.ErrorKey: err,
- "offset": offset,
- "frameSize": sz,
- }.Errorf(s, "Failed to unmarshal log data.")
- return err
+ // If we read from offset 0, the first frame will be the log stream's
+ // descriptor, which we can discard.
+ discardFrame := (offset == 0)
+ offset += uint64(cr.Count)
+ if discardFrame {
+ continue
}
- idx := types.MessageIndex(le.StreamIndex)
- if idx < req.Index {
+ // Punt this log entry to our callback, if appropriate.
+ entry := storage.MakeEntry(buf.Bytes(), -1)
+ switch idx, err := entry.GetStreamIndex(); {
+ case err != nil:
+ log.Fields{
+ log.ErrorKey: err,
+ "frameOffset": offset,
+ "frameSize": sz,
+ }.Errorf(s, "Failed to get log entry index.")
+ return errors.Annotate(err).Reason("failed to get log entry index").Err()
+
+ case idx < st.startIndex:
// Skip this entry, as it's before the first requested entry.
continue
}
- d := make([]byte, buf.Len())
- copy(d, buf.Bytes())
- if !cb(idx, d) {
- break
+ // We want to punt this entry, but we also want to re-use our Buffer. Clone
+ // its data so it is independent.
+ entry.D = make([]byte, len(entry.D))
+ copy(entry.D, buf.Bytes())
+ if !cb(entry) {
+ return nil
}
// Enforce our limit, if one is supplied.
- if max > 0 {
- max--
- if max == 0 {
- break
+ if remaining > 0 {
+ remaining--
+ if remaining == 0 {
+ return nil
}
}
}
- return nil
}
-func (s *storageImpl) Tail(project config.ProjectName, path types.StreamPath) ([]byte, types.MessageIndex, error) {
+func (s *storageImpl) Tail(project config.ProjectName, path types.StreamPath) (*storage.Entry, error) {
idx, err := s.getIndex()
if err != nil {
- return nil, 0, err
+ return nil, err
}
- // Get the offset of the last record.
- if len(idx.Entries) == 0 {
- return nil, 0, nil
+ // Get the offset that is as close to our tail record as possible. If we know
+ // what that index is (from "idx"), we can request it directly. Otherwise, we
+ // will get as close as possible and read forwards from there.
+ req := storage.GetRequest{}
+ switch {
+ case idx.LastStreamIndex > 0:
+ req.Index = types.MessageIndex(idx.LastStreamIndex)
+ req.Limit = 1
+
+ case len(idx.Entries) > 0:
+ req.Index = types.MessageIndex(idx.Entries[len(idx.Entries)-1].StreamIndex)
}
- lle := idx.Entries[len(idx.Entries)-1]
- // Get a Reader for the Tail entry.
- r, err := s.Client.NewReader(s.streamPath, int64(lle.Offset), -1)
- if err != nil {
- log.Fields{
- log.ErrorKey: err,
- "offset": lle.Offset,
- }.Errorf(s, "Failed to create reader.")
- return nil, 0, err
+ // Build a Get strategy for our closest-to-Tail index.
+ st := buildGetStrategy(&req, idx)
+ if st == nil {
+ return nil, storage.ErrDoesNotExist
}
- defer func() {
- if err := r.Close(); err != nil {
- log.WithError(err).Warningf(s, "Failed to close Reader.")
+
+ // Read forwards to EOF. Retain the last entry that we read.
+ var lastEntry *storage.Entry
+ err = s.Get(req, func(e *storage.Entry) bool {
+ lastEntry = e
+
+ // We can stop if we have the last stream index and this is that index.
+ if idx.LastStreamIndex > 0 {
+ // Get the index for this entry.
+ //
+ // We can ignore this error, since "Get" will have already resolved the
+ // index successfully.
+ if sidx, _ := e.GetStreamIndex(); sidx == types.MessageIndex(idx.LastStreamIndex) {
+ return false
+ }
}
- }()
+ return true
+ })
+ switch {
+ case err != nil:
+ return nil, err
- rio := recordio.NewReader(r, maxStreamRecordSize)
- d, err := rio.ReadFrameAll()
- if err != nil {
- log.WithError(err).Errorf(s, "Failed to read log frame.")
- return nil, 0, err
- }
+ case lastEntry == nil:
+ return nil, storage.ErrDoesNotExist
- return d, types.MessageIndex(lle.StreamIndex), nil
+ default:
+ return lastEntry, nil
+ }
}
// getIndex returns the cached log stream index, fetching it if necessary.
@@ -249,44 +288,71 @@ func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
defer s.indexMu.Unlock()
if s.index == nil {
- r, err := s.Client.NewReader(s.indexPath, 0, -1)
- if err != nil {
- log.WithError(err).Errorf(s, "Failed to create index Reader.")
- return nil, err
- }
- defer func() {
- if err := r.Close(); err != nil {
- log.WithError(err).Warningf(s, "Error closing index Reader.")
- }
- }()
- indexData, err := ioutil.ReadAll(r)
- if err != nil {
- log.WithError(err).Errorf(s, "Failed to read index.")
- return nil, err
- }
+ index, err := loadIndex(s, s.Client, s.indexPath)
+ switch errors.Unwrap(err) {
+ case nil:
+ break
- index := logpb.LogIndex{}
- if err := proto.Unmarshal(indexData, &index); err != nil {
- log.WithError(err).Errorf(s, "Failed to unmarshal index.")
+ case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotExist:
+ // Treat a missing index the same as an empty index.
+ log.WithError(err).Warningf(s, "Index is invalid, using empty index.")
+ index = &logpb.LogIndex{}
+
+ default:
return nil, err
}
- s.index = &index
+ s.index = index
}
return s.index, nil
}
+func loadIndex(c context.Context, client gs.Client, path gs.Path) (*logpb.LogIndex, error) {
+ // If there is no path, then return an empty index.
+ if path == "" {
+ log.Infof(c, "No index path, using empty index.")
+ return &logpb.LogIndex{}, nil
+ }
+
+ r, err := client.NewReader(path, 0, -1)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create index Reader.")
+ return nil, errors.Annotate(err).Reason("failed to create index Reader").Err()
+ }
+ defer func() {
+ if err := r.Close(); err != nil {
+ log.WithError(err).Warningf(c, "Error closing index Reader.")
+ }
+ }()
+ indexData, err := ioutil.ReadAll(r)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to read index.")
+ return nil, errors.Annotate(err).Reason("failed to read index").Err()
+ }
+
+ index := logpb.LogIndex{}
+ if err := proto.Unmarshal(indexData, &index); err != nil {
+ log.WithError(err).Errorf(c, "Failed to unmarshal index.")
+ return nil, errors.Annotate(err).Reason("failed to unmarshal index").Err()
+ }
+
+ return &index, nil
+}
+
type getStrategy struct {
- // startOffset is the beginning byte offset of the log entry stream.
+ // startIndex is desired initial log entry index.
+ startIndex types.MessageIndex
+
+ // startOffset is the beginning byte offset of the log entry stream. This may
+ // be lower than the offset of the starting record if the index is sparse.
startOffset uint64
- // endOffset is the ending byte offset of the log entry stream.
+ // endOffset is the ending byte offset of the log entry stream. This will be
+ // 0 if an end offset is not known.
endOffset uint64
- // count is the number of log entries that will be fetched.
- count int
- // lastIndex is the last log entry index in the stream. This will be -1 if
- // there are no entries in the stream.
- lastIndex types.MessageIndex
+ // count is the number of log entries that will be fetched. If 0, no upper
+ // bound was calculated.
+ count uint64
}
func (gs *getStrategy) length() int64 {
@@ -298,74 +364,78 @@ func (gs *getStrategy) length() int64 {
// setCount sets the `count` field. If called multiple times, the smallest
// assigned value will be retained.
-func (gs *getStrategy) setCount(v int) {
- if gs.count <= 0 || gs.count > v {
+func (gs *getStrategy) setCount(v uint64) {
+ if gs.count == 0 || gs.count > v {
gs.count = v
}
}
-// setEndOffset sets the `length` field. If called multiple times, the smallest
-// assigned value will be retained.
-func (gs *getStrategy) setEndOffset(v uint64) {
- if gs.endOffset == 0 || gs.endOffset > v {
- gs.endOffset = v
- }
-}
-
-func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIndex) *getStrategy {
+func buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIndex) *getStrategy {
st := getStrategy{
- lastIndex: -1,
+ startIndex: req.Index,
}
- if len(idx.Entries) == 0 {
- return &st
- }
-
- // If we have a log entry count, mark the last log index.
- if idx.LogEntryCount > 0 {
- st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1].StreamIndex)
+ // If the user has requested an index past the end of the stream, return no
+ // entries (count == 0). This only works if the last stream index is known.
+ if idx.LastStreamIndex > 0 && req.Index > types.MessageIndex(idx.LastStreamIndex) {
+ return nil
}
- startIdx := indexEntryFor(idx.Entries, req.Index)
- if startIdx < 0 {
- startIdx = 0
+ // Identify the closest index entry to the requested log.
+ //
+ // If the requested log starts before the first index entry, we must read from
+ // record #0.
+ startIndexEntry := indexEntryFor(idx.Entries, req.Index)
+ if startIndexEntry >= 0 {
+ st.startOffset = idx.Entries[startIndexEntry].Offset
}
- le := idx.Entries[startIdx]
- st.startOffset = le.Offset
// Determine an upper bound based on our limits.
//
- // If we have a count limit, and we have enough index entries to upper-bound
- // our stream based on that limit, use that. Note that this may overshoot if
- // the index and/or stream is sparse. We know for sure that we have one
- // LogEntry per index entry, so that's the best we can do.
+ // If we have a count limit, identify the maximum entry that can be loaded,
+ // find the index entry closest to it, and use that to determine our upper
+ // bound.
if req.Limit > 0 {
- if ub := startIdx + req.Limit; ub < len(idx.Entries) {
- st.setEndOffset(idx.Entries[ub].Offset)
- }
- st.setCount(req.Limit)
- }
+ st.setCount(uint64(req.Limit))
+
+ // Find the index entry for the stream entry AFTER the last one we are going
+ // to return.
+ entryAfterGetBlock := req.Index + types.MessageIndex(req.Limit)
+ endIndexEntry := indexEntryFor(idx.Entries, entryAfterGetBlock)
+ switch {
+ case endIndexEntry < 0:
+ // The last possible request log entry is before the first index entry.
+ // Read up to the first index entry.
+ endIndexEntry = 0
+
+ case endIndexEntry <= startIndexEntry:
+ // The last possible request log entry is closest to the start index
+ // entry. Use the index entry immediately after it.
+ endIndexEntry = startIndexEntry + 1
- // If we have a byte limit, count the entry sizes until we reach that limit.
- if mb := int64(s.MaxBytes); mb > 0 {
- mb := uint64(mb)
-
- for i, e := range idx.Entries[startIdx:] {
- if e.Offset < st.startOffset {
- // This shouldn't really happen, but it could happen if there is a
- // corrupt index.
- continue
+ default:
+ // We have the index entry <= the stream entry after the last one that we
+ // will return.
+ //
+ // If we're sparse, this could be the index at or before our last entry.
+ // If this is the case, use the next index entry, which will be after
+ // "entryAfterGetBlock" (EAGB).
+ //
+ // START ------ LIMIT (LIMIT+1)
+ // | [IDX] | [IDX]
+ // index | entryAfterGetBlock |
+ // endIndexEntry (endIndexEntry+1)
+ if types.MessageIndex(idx.Entries[endIndexEntry].StreamIndex) < entryAfterGetBlock {
+ endIndexEntry++
}
+ }
- // Calculate the request offset and truncate if we've exceeded our maximum
- // request bytes.
- if size := (e.Offset - st.startOffset); size > mb {
- st.setEndOffset(e.Offset)
- st.setCount(i)
- break
- }
+ // If we're pointing to a valid index entry, set our upper bound.
+ if endIndexEntry < len(idx.Entries) {
+ st.endOffset = idx.Entries[endIndexEntry].Offset
}
}
+
return &st
}

Powered by Google App Engine
This is Rietveld 408576698