| Index: logdog/common/storage/archive/storage.go
|
| diff --git a/logdog/common/storage/archive/storage.go b/logdog/common/storage/archive/storage.go
|
| index 6800adfb8bb50d2ce01a0ecff9ec59268e441fc0..d69d728bc803b5df21a047f044fc6fbc88a2c663 100644
|
| --- a/logdog/common/storage/archive/storage.go
|
| +++ b/logdog/common/storage/archive/storage.go
|
| @@ -20,24 +20,28 @@ import (
|
| "sort"
|
| "sync"
|
|
|
| - "golang.org/x/net/context"
|
| -
|
| - "github.com/golang/protobuf/proto"
|
| "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/data/recordio"
|
| + "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/gcloud/gs"
|
| "github.com/luci/luci-go/common/iotools"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/logdog/api/logpb"
|
| "github.com/luci/luci-go/logdog/common/storage"
|
| "github.com/luci/luci-go/logdog/common/types"
|
| +
|
| + cloudStorage "cloud.google.com/go/storage"
|
| + "github.com/golang/protobuf/proto"
|
| + "golang.org/x/net/context"
|
| )
|
|
|
| const (
|
| // maxStreamRecordSize is the maximum record size we're willing to read from
|
| // our archived log stream. This will help prevent out-of-memory errors if the
|
| // arhived log stream is malicious or corrupt.
|
| - maxStreamRecordSize = 16 * 1024 * 1024
|
| + //
|
| + // 16MB is larger than the maximum log entry size
|
| + maxStreamRecordSize = 2 * types.MaxLogEntryDataSize
|
| )
|
|
|
| // Options is the set of configuration options for this Storage instance.
|
| @@ -55,15 +59,6 @@ type Options struct {
|
| //
|
| // Closing this Storage instance does not close the underlying Client.
|
| Client gs.Client
|
| -
|
| - // MaxBytes, if >0, is the maximum number of bytes to fetch in any given
|
| - // request. This should be set for GAE fetches, as large log streams may
|
| - // exceed the urlfetch system's maximum response size otherwise.
|
| - //
|
| - // This is the number of bytes to request, not the number of bytes of log data
|
| - // to return. The difference is that the former includes the RecordIO frame
|
| - // headers.
|
| - MaxBytes int
|
| }
|
|
|
| type storageImpl struct {
|
| @@ -73,9 +68,8 @@ type storageImpl struct {
|
| streamPath gs.Path
|
| indexPath gs.Path
|
|
|
| - indexMu sync.Mutex
|
| - index *logpb.LogIndex
|
| - closeClient bool
|
| + indexMu sync.Mutex
|
| + index *logpb.LogIndex
|
| }
|
|
|
| // New instantiates a new Storage instance, bound to the supplied Options.
|
| @@ -91,18 +85,14 @@ func New(ctx context.Context, o Options) (storage.Storage, error) {
|
| if !s.streamPath.IsFullPath() {
|
| return nil, fmt.Errorf("invalid stream URL: %q", s.streamPath)
|
| }
|
| - if !s.indexPath.IsFullPath() {
|
| + if s.indexPath != "" && !s.indexPath.IsFullPath() {
|
| return nil, fmt.Errorf("invalid index URL: %v", s.indexPath)
|
| }
|
|
|
| return &s, nil
|
| }
|
|
|
| -func (s *storageImpl) Close() {
|
| - if s.closeClient {
|
| - _ = s.Client.Close()
|
| - }
|
| -}
|
| +func (s *storageImpl) Close() {}
|
|
|
| func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly }
|
| func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly }
|
| @@ -114,133 +104,182 @@ func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error
|
| }
|
|
|
| // Identify the byte offsets that we want to fetch from the entries stream.
|
| - st := s.buildGetStrategy(&req, idx)
|
| - if st.lastIndex >= 0 && req.Index > st.lastIndex {
|
| - // We know the last index, and the user requested logs past it, so there are
|
| - // no records to read.
|
| + st := buildGetStrategy(&req, idx)
|
| + if st == nil {
|
| + // No more records to read.
|
| return nil
|
| }
|
|
|
| - offset := int64(st.startOffset)
|
| + switch err := s.getLogEntriesIter(st, cb); errors.Unwrap(err) {
|
| + case nil, io.EOF:
|
| + // We hit the end of our log stream.
|
| + return nil
|
| +
|
| + case cloudStorage.ErrObjectNotExist, cloudStorage.ErrBucketNotExist:
|
| + return storage.ErrDoesNotExist
|
| +
|
| + default:
|
| + return errors.Annotate(err).Reason("failed to read log stream").Err()
|
| + }
|
| +}
|
| +
|
| +// getLogEntriesImpl retrieves log entries from archive until complete.
|
| +func (s *storageImpl) getLogEntriesIter(st *getStrategy, cb storage.GetCallback) error {
|
| + // Get our maximum byte limit. If we are externally constrained via MaxBytes,
|
| + // apply that limit too.
|
| + // Get an archive reader.
|
| + var (
|
| + offset = st.startOffset
|
| + length = st.length()
|
| + )
|
| +
|
| log.Fields{
|
| "offset": offset,
|
| - "length": st.length(),
|
| + "length": length,
|
| "path": s.streamPath,
|
| }.Debugf(s, "Creating stream reader for range.")
|
| - r, err := s.Client.NewReader(s.streamPath, offset, st.length())
|
| + storageReader, err := s.Client.NewReader(s.streamPath, int64(offset), length)
|
| if err != nil {
|
| log.WithError(err).Errorf(s, "Failed to create stream Reader.")
|
| - return err
|
| + return errors.Annotate(err).Reason("failed to create stream Reader").Err()
|
| }
|
| defer func() {
|
| - if err := r.Close(); err != nil {
|
| - log.WithError(err).Warningf(s, "Error closing stream Reader.")
|
| + if tmpErr := storageReader.Close(); tmpErr != nil {
|
| + // (Non-fatal)
|
| + log.WithError(tmpErr).Warningf(s, "Error closing stream Reader.")
|
| }
|
| }()
|
| - cr := iotools.CountingReader{Reader: r}
|
| - rio := recordio.NewReader(&cr, maxStreamRecordSize)
|
|
|
| - buf := bytes.Buffer{}
|
| - le := logpb.LogEntry{}
|
| - max := st.count
|
| + // Count how many bytes we've read.
|
| + cr := iotools.CountingReader{Reader: storageReader}
|
| +
|
| + // Iteratively update our strategy's start offset each time we read a complete
|
| + // frame.
|
| + var (
|
| + rio = recordio.NewReader(&cr, maxStreamRecordSize)
|
| + buf bytes.Buffer
|
| + remaining = st.count
|
| + )
|
| for {
|
| - offset += cr.Count()
|
| + // Reset the count so we know how much we read for this frame.
|
| + cr.Count = 0
|
|
|
| sz, r, err := rio.ReadFrame()
|
| - switch err {
|
| - case nil:
|
| - break
|
| -
|
| - case io.EOF:
|
| - return nil
|
| -
|
| - default:
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "index": idx,
|
| - "offset": offset,
|
| - }.Errorf(s, "Failed to read next frame.")
|
| - return err
|
| + if err != nil {
|
| + return errors.Annotate(err).Reason("failed to read frame").Err()
|
| }
|
|
|
| buf.Reset()
|
| buf.Grow(int(sz))
|
| - if _, err := buf.ReadFrom(r); err != nil {
|
| +
|
| + switch amt, err := buf.ReadFrom(r); {
|
| + case err != nil:
|
| log.Fields{
|
| - log.ErrorKey: err,
|
| - "offset": offset,
|
| - "frameSize": sz,
|
| + log.ErrorKey: err,
|
| + "frameOffset": offset,
|
| + "frameSize": sz,
|
| }.Errorf(s, "Failed to read frame data.")
|
| - return err
|
| + return errors.Annotate(err).Reason("failed to read frame data").Err()
|
| +
|
| + case amt != sz:
|
| + // If we didn't buffer the complete frame, we hit a premature EOF.
|
| + return errors.Annotate(io.EOF).Reason("incomplete frame read").Err()
|
| }
|
|
|
| - if err := proto.Unmarshal(buf.Bytes(), &le); err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "offset": offset,
|
| - "frameSize": sz,
|
| - }.Errorf(s, "Failed to unmarshal log data.")
|
| - return err
|
| + // If we read from offset 0, the first frame will be the log stream's
|
| + // descriptor, which we can discard.
|
| + discardFrame := (offset == 0)
|
| + offset += uint64(cr.Count)
|
| + if discardFrame {
|
| + continue
|
| }
|
|
|
| - idx := types.MessageIndex(le.StreamIndex)
|
| - if idx < req.Index {
|
| + // Punt this log entry to our callback, if appropriate.
|
| + entry := storage.MakeEntry(buf.Bytes(), -1)
|
| + switch idx, err := entry.GetStreamIndex(); {
|
| + case err != nil:
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "frameOffset": offset,
|
| + "frameSize": sz,
|
| + }.Errorf(s, "Failed to get log entry index.")
|
| + return errors.Annotate(err).Reason("failed to get log entry index").Err()
|
| +
|
| + case idx < st.startIndex:
|
| // Skip this entry, as it's before the first requested entry.
|
| continue
|
| }
|
|
|
| - d := make([]byte, buf.Len())
|
| - copy(d, buf.Bytes())
|
| - if !cb(idx, d) {
|
| - break
|
| + // We want to punt this entry, but we also want to re-use our Buffer. Clone
|
| + // its data so it is independent.
|
| + entry.D = make([]byte, len(entry.D))
|
| + copy(entry.D, buf.Bytes())
|
| + if !cb(entry) {
|
| + return nil
|
| }
|
|
|
| // Enforce our limit, if one is supplied.
|
| - if max > 0 {
|
| - max--
|
| - if max == 0 {
|
| - break
|
| + if remaining > 0 {
|
| + remaining--
|
| + if remaining == 0 {
|
| + return nil
|
| }
|
| }
|
| }
|
| - return nil
|
| }
|
|
|
| -func (s *storageImpl) Tail(project config.ProjectName, path types.StreamPath) ([]byte, types.MessageIndex, error) {
|
| +func (s *storageImpl) Tail(project config.ProjectName, path types.StreamPath) (*storage.Entry, error) {
|
| idx, err := s.getIndex()
|
| if err != nil {
|
| - return nil, 0, err
|
| + return nil, err
|
| }
|
|
|
| - // Get the offset of the last record.
|
| - if len(idx.Entries) == 0 {
|
| - return nil, 0, nil
|
| + // Get the offset that is as close to our tail record as possible. If we know
|
| + // what that index is (from "idx"), we can request it directly. Otherwise, we
|
| + // will get as close as possible and read forwards from there.
|
| + req := storage.GetRequest{}
|
| + switch {
|
| + case idx.LastStreamIndex > 0:
|
| + req.Index = types.MessageIndex(idx.LastStreamIndex)
|
| + req.Limit = 1
|
| +
|
| + case len(idx.Entries) > 0:
|
| + req.Index = types.MessageIndex(idx.Entries[len(idx.Entries)-1].StreamIndex)
|
| }
|
| - lle := idx.Entries[len(idx.Entries)-1]
|
|
|
| - // Get a Reader for the Tail entry.
|
| - r, err := s.Client.NewReader(s.streamPath, int64(lle.Offset), -1)
|
| - if err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "offset": lle.Offset,
|
| - }.Errorf(s, "Failed to create reader.")
|
| - return nil, 0, err
|
| + // Build a Get strategy for our closest-to-Tail index.
|
| + st := buildGetStrategy(&req, idx)
|
| + if st == nil {
|
| + return nil, storage.ErrDoesNotExist
|
| }
|
| - defer func() {
|
| - if err := r.Close(); err != nil {
|
| - log.WithError(err).Warningf(s, "Failed to close Reader.")
|
| +
|
| + // Read forwards to EOF. Retain the last entry that we read.
|
| + var lastEntry *storage.Entry
|
| + err = s.Get(req, func(e *storage.Entry) bool {
|
| + lastEntry = e
|
| +
|
| + // We can stop if we have the last stream index and this is that index.
|
| + if idx.LastStreamIndex > 0 {
|
| + // Get the index for this entry.
|
| + //
|
| + // We can ignore this error, since "Get" will have already resolved the
|
| + // index successfully.
|
| + if sidx, _ := e.GetStreamIndex(); sidx == types.MessageIndex(idx.LastStreamIndex) {
|
| + return false
|
| + }
|
| }
|
| - }()
|
| + return true
|
| + })
|
| + switch {
|
| + case err != nil:
|
| + return nil, err
|
|
|
| - rio := recordio.NewReader(r, maxStreamRecordSize)
|
| - d, err := rio.ReadFrameAll()
|
| - if err != nil {
|
| - log.WithError(err).Errorf(s, "Failed to read log frame.")
|
| - return nil, 0, err
|
| - }
|
| + case lastEntry == nil:
|
| + return nil, storage.ErrDoesNotExist
|
|
|
| - return d, types.MessageIndex(lle.StreamIndex), nil
|
| + default:
|
| + return lastEntry, nil
|
| + }
|
| }
|
|
|
| // getIndex returns the cached log stream index, fetching it if necessary.
|
| @@ -249,44 +288,71 @@ func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
|
| defer s.indexMu.Unlock()
|
|
|
| if s.index == nil {
|
| - r, err := s.Client.NewReader(s.indexPath, 0, -1)
|
| - if err != nil {
|
| - log.WithError(err).Errorf(s, "Failed to create index Reader.")
|
| - return nil, err
|
| - }
|
| - defer func() {
|
| - if err := r.Close(); err != nil {
|
| - log.WithError(err).Warningf(s, "Error closing index Reader.")
|
| - }
|
| - }()
|
| - indexData, err := ioutil.ReadAll(r)
|
| - if err != nil {
|
| - log.WithError(err).Errorf(s, "Failed to read index.")
|
| - return nil, err
|
| - }
|
| + index, err := loadIndex(s, s.Client, s.indexPath)
|
| + switch errors.Unwrap(err) {
|
| + case nil:
|
| + break
|
|
|
| - index := logpb.LogIndex{}
|
| - if err := proto.Unmarshal(indexData, &index); err != nil {
|
| - log.WithError(err).Errorf(s, "Failed to unmarshal index.")
|
| + case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotExist:
|
| + // Treat a missing index the same as an empty index.
|
| + log.WithError(err).Warningf(s, "Index is invalid, using empty index.")
|
| + index = &logpb.LogIndex{}
|
| +
|
| + default:
|
| return nil, err
|
| }
|
|
|
| - s.index = &index
|
| + s.index = index
|
| }
|
| return s.index, nil
|
| }
|
|
|
| +func loadIndex(c context.Context, client gs.Client, path gs.Path) (*logpb.LogIndex, error) {
|
| + // If there is no path, then return an empty index.
|
| + if path == "" {
|
| + log.Infof(c, "No index path, using empty index.")
|
| + return &logpb.LogIndex{}, nil
|
| + }
|
| +
|
| + r, err := client.NewReader(path, 0, -1)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create index Reader.")
|
| + return nil, errors.Annotate(err).Reason("failed to create index Reader").Err()
|
| + }
|
| + defer func() {
|
| + if err := r.Close(); err != nil {
|
| + log.WithError(err).Warningf(c, "Error closing index Reader.")
|
| + }
|
| + }()
|
| + indexData, err := ioutil.ReadAll(r)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to read index.")
|
| + return nil, errors.Annotate(err).Reason("failed to read index").Err()
|
| + }
|
| +
|
| + index := logpb.LogIndex{}
|
| + if err := proto.Unmarshal(indexData, &index); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to unmarshal index.")
|
| + return nil, errors.Annotate(err).Reason("failed to unmarshal index").Err()
|
| + }
|
| +
|
| + return &index, nil
|
| +}
|
| +
|
| type getStrategy struct {
|
| - // startOffset is the beginning byte offset of the log entry stream.
|
| + // startIndex is desired initial log entry index.
|
| + startIndex types.MessageIndex
|
| +
|
| + // startOffset is the beginning byte offset of the log entry stream. This may
|
| + // be lower than the offset of the starting record if the index is sparse.
|
| startOffset uint64
|
| - // endOffset is the ending byte offset of the log entry stream.
|
| + // endOffset is the ending byte offset of the log entry stream. This will be
|
| + // 0 if an end offset is not known.
|
| endOffset uint64
|
|
|
| - // count is the number of log entries that will be fetched.
|
| - count int
|
| - // lastIndex is the last log entry index in the stream. This will be -1 if
|
| - // there are no entries in the stream.
|
| - lastIndex types.MessageIndex
|
| + // count is the number of log entries that will be fetched. If 0, no upper
|
| + // bound was calculated.
|
| + count uint64
|
| }
|
|
|
| func (gs *getStrategy) length() int64 {
|
| @@ -298,74 +364,78 @@ func (gs *getStrategy) length() int64 {
|
|
|
| // setCount sets the `count` field. If called multiple times, the smallest
|
| // assigned value will be retained.
|
| -func (gs *getStrategy) setCount(v int) {
|
| - if gs.count <= 0 || gs.count > v {
|
| +func (gs *getStrategy) setCount(v uint64) {
|
| + if gs.count == 0 || gs.count > v {
|
| gs.count = v
|
| }
|
| }
|
|
|
| -// setEndOffset sets the `length` field. If called multiple times, the smallest
|
| -// assigned value will be retained.
|
| -func (gs *getStrategy) setEndOffset(v uint64) {
|
| - if gs.endOffset == 0 || gs.endOffset > v {
|
| - gs.endOffset = v
|
| - }
|
| -}
|
| -
|
| -func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIndex) *getStrategy {
|
| +func buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIndex) *getStrategy {
|
| st := getStrategy{
|
| - lastIndex: -1,
|
| + startIndex: req.Index,
|
| }
|
|
|
| - if len(idx.Entries) == 0 {
|
| - return &st
|
| - }
|
| -
|
| - // If we have a log entry count, mark the last log index.
|
| - if idx.LogEntryCount > 0 {
|
| - st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1].StreamIndex)
|
| + // If the user has requested an index past the end of the stream, return no
|
| + // entries (count == 0). This only works if the last stream index is known.
|
| + if idx.LastStreamIndex > 0 && req.Index > types.MessageIndex(idx.LastStreamIndex) {
|
| + return nil
|
| }
|
|
|
| - startIdx := indexEntryFor(idx.Entries, req.Index)
|
| - if startIdx < 0 {
|
| - startIdx = 0
|
| + // Identify the closest index entry to the requested log.
|
| + //
|
| + // If the requested log starts before the first index entry, we must read from
|
| + // record #0.
|
| + startIndexEntry := indexEntryFor(idx.Entries, req.Index)
|
| + if startIndexEntry >= 0 {
|
| + st.startOffset = idx.Entries[startIndexEntry].Offset
|
| }
|
| - le := idx.Entries[startIdx]
|
| - st.startOffset = le.Offset
|
|
|
| // Determine an upper bound based on our limits.
|
| //
|
| - // If we have a count limit, and we have enough index entries to upper-bound
|
| - // our stream based on that limit, use that. Note that this may overshoot if
|
| - // the index and/or stream is sparse. We know for sure that we have one
|
| - // LogEntry per index entry, so that's the best we can do.
|
| + // If we have a count limit, identify the maximum entry that can be loaded,
|
| + // find the index entry closest to it, and use that to determine our upper
|
| + // bound.
|
| if req.Limit > 0 {
|
| - if ub := startIdx + req.Limit; ub < len(idx.Entries) {
|
| - st.setEndOffset(idx.Entries[ub].Offset)
|
| - }
|
| - st.setCount(req.Limit)
|
| - }
|
| + st.setCount(uint64(req.Limit))
|
| +
|
| + // Find the index entry for the stream entry AFTER the last one we are going
|
| + // to return.
|
| + entryAfterGetBlock := req.Index + types.MessageIndex(req.Limit)
|
| + endIndexEntry := indexEntryFor(idx.Entries, entryAfterGetBlock)
|
| + switch {
|
| + case endIndexEntry < 0:
|
| + // The last possible request log entry is before the first index entry.
|
| + // Read up to the first index entry.
|
| + endIndexEntry = 0
|
| +
|
| + case endIndexEntry <= startIndexEntry:
|
| + // The last possible request log entry is closest to the start index
|
| + // entry. Use the index entry immediately after it.
|
| + endIndexEntry = startIndexEntry + 1
|
|
|
| - // If we have a byte limit, count the entry sizes until we reach that limit.
|
| - if mb := int64(s.MaxBytes); mb > 0 {
|
| - mb := uint64(mb)
|
| -
|
| - for i, e := range idx.Entries[startIdx:] {
|
| - if e.Offset < st.startOffset {
|
| - // This shouldn't really happen, but it could happen if there is a
|
| - // corrupt index.
|
| - continue
|
| + default:
|
| + // We have the index entry <= the stream entry after the last one that we
|
| + // will return.
|
| + //
|
| + // If we're sparse, this could be the index at or before our last entry.
|
| + // If this is the case, use the next index entry, which will be after
|
| + // "entryAfterGetBlock" (EAGB).
|
| + //
|
| + // START ------ LIMIT (LIMIT+1)
|
| + // | [IDX] | [IDX]
|
| + // index | entryAfterGetBlock |
|
| + // endIndexEntry (endIndexEntry+1)
|
| + if types.MessageIndex(idx.Entries[endIndexEntry].StreamIndex) < entryAfterGetBlock {
|
| + endIndexEntry++
|
| }
|
| + }
|
|
|
| - // Calculate the request offset and truncate if we've exceeded our maximum
|
| - // request bytes.
|
| - if size := (e.Offset - st.startOffset); size > mb {
|
| - st.setEndOffset(e.Offset)
|
| - st.setCount(i)
|
| - break
|
| - }
|
| + // If we're pointing to a valid index entry, set our upper bound.
|
| + if endIndexEntry < len(idx.Entries) {
|
| + st.endOffset = idx.Entries[endIndexEntry].Offset
|
| }
|
| }
|
| +
|
| return &st
|
| }
|
|
|
|
|