| Index: server/logdog/storage/archive/storage.go
|
| diff --git a/server/logdog/storage/archive/storage.go b/server/logdog/storage/archive/storage.go
|
| index be5f556f55f3a85f1d1e9f134feac0d4e489b78f..a6ed17e06b696e1a980c549ae42c45e2dbc49ac9 100644
|
| --- a/server/logdog/storage/archive/storage.go
|
| +++ b/server/logdog/storage/archive/storage.go
|
| @@ -54,6 +54,15 @@ type Options struct {
|
| //
|
| // Closing this Storage instance does not close the underlying Client.
|
| Client gs.Client
|
| +
|
| + // MaxBytes, if >0, is the maximum number of bytes to fetch in any given
|
| + // request. This should be set for GAE fetches, as large log streams may
|
| + // exceed the urlfetch system's maximum response size otherwise.
|
| + //
|
| + // This is the number of bytes to request, not the number of bytes of log data
|
| + // to return. The difference is that the former includes the RecordIO frame
|
| + // headers.
|
| + MaxBytes int
|
| }
|
|
|
| type storageImpl struct {
|
| @@ -110,10 +119,13 @@ func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error
|
| return nil
|
| }
|
|
|
| - r, err := s.Client.NewReader(s.streamPath, gs.Options{
|
| - From: st.from,
|
| - To: st.to,
|
| - })
|
| + offset := int64(st.startOffset)
|
| + log.Fields{
|
| + "offset": offset,
|
| + "length": st.length(),
|
| + "path": s.streamPath,
|
| + }.Debugf(s, "Creating stream reader for range.")
|
| + r, err := s.Client.NewReader(s.streamPath, offset, st.length())
|
| if err != nil {
|
| log.WithError(err).Errorf(s, "Failed to create stream Reader.")
|
| return err
|
| @@ -128,9 +140,9 @@ func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error
|
|
|
| buf := bytes.Buffer{}
|
| le := logpb.LogEntry{}
|
| - max := req.Limit
|
| + max := st.count
|
| for {
|
| - offset := st.from + cr.Count()
|
| + offset += cr.Count()
|
|
|
| sz, r, err := rio.ReadFrame()
|
| switch err {
|
| @@ -205,9 +217,7 @@ func (s *storageImpl) Tail(path types.StreamPath) ([]byte, types.MessageIndex, e
|
| lle := idx.Entries[len(idx.Entries)-1]
|
|
|
| // Get a Reader for the Tail entry.
|
| - r, err := s.Client.NewReader(s.streamPath, gs.Options{
|
| - From: int64(lle.Offset),
|
| - })
|
| + r, err := s.Client.NewReader(s.streamPath, int64(lle.Offset), -1)
|
| if err != nil {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| @@ -237,7 +247,7 @@ func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
|
| defer s.indexMu.Unlock()
|
|
|
| if s.index == nil {
|
| - r, err := s.Client.NewReader(s.indexPath, gs.Options{})
|
| + r, err := s.Client.NewReader(s.indexPath, 0, -1)
|
| if err != nil {
|
| log.WithError(err).Errorf(s, "Failed to create index Reader.")
|
| return nil, err
|
| @@ -265,16 +275,41 @@ func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
|
| }
|
|
|
| type getStrategy struct {
|
| - // from is the beginning byte offset of the log entry stream.
|
| - from int64
|
| - // to is the ending byte offset of the log entry stream.
|
| - to int64
|
| + // startOffset is the beginning byte offset of the log entry stream.
|
| + startOffset uint64
|
| + // endOffset is the ending byte offset of the log entry stream.
|
| + endOffset uint64
|
|
|
| + // count is the number of log entries that will be fetched.
|
| + count int
|
| // lastIndex is the last log entry index in the stream. This will be -1 if
|
| // there are no entries in the stream.
|
| lastIndex types.MessageIndex
|
| }
|
|
|
| +func (gs *getStrategy) length() int64 {
|
| + if gs.startOffset < gs.endOffset {
|
| + return int64(gs.endOffset - gs.startOffset)
|
| + }
|
| + return -1
|
| +}
|
| +
|
| +// setCount sets the `count` field. If called multiple times, the smallest
|
| +// assigned value will be retained.
|
| +func (gs *getStrategy) setCount(v int) {
|
| + if gs.count <= 0 || gs.count > v {
|
| + gs.count = v
|
| + }
|
| +}
|
| +
|
| +// setEndOffset sets the `length` field. If called multiple times, the smallest
|
| +// assigned value will be retained.
|
| +func (gs *getStrategy) setEndOffset(v uint64) {
|
| + if gs.endOffset == 0 || gs.endOffset > v {
|
| + gs.endOffset = v
|
| + }
|
| +}
|
| +
|
| func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIndex) *getStrategy {
|
| st := getStrategy{}
|
|
|
| @@ -289,17 +324,39 @@ func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn
|
| startIdx = 0
|
| }
|
| le := idx.Entries[startIdx]
|
| - st.from = int64(le.Offset)
|
| + st.startOffset = le.Offset
|
|
|
| - // If we have a limit, and we have enough index entries to upper-bound our
|
| - // stream based on that limit, use that.
|
| + // Determine an upper bound based on our limits.
|
| //
|
| - // Note that this may overshoot if the index and/or stream is sparse. We know
|
| - // for sure that we have one LogEntry per index entry, so that's the best we
|
| - // can do.
|
| + // If we have a count limit, and we have enough index entries to upper-bound
|
| + // our stream based on that limit, use that. Note that this may overshoot if
|
| + // the index and/or stream is sparse. We know for sure that we have one
|
| + // LogEntry per index entry, so that's the best we can do.
|
| if req.Limit > 0 {
|
| if ub := startIdx + req.Limit; ub < len(idx.Entries) {
|
| - st.to = int64(idx.Entries[ub].Offset)
|
| + st.setEndOffset(idx.Entries[ub].Offset)
|
| + }
|
| + st.setCount(req.Limit)
|
| + }
|
| +
|
| + // If we have a byte limit, count the entry sizes until we reach that limit.
|
| + if mb := int64(s.MaxBytes); mb > 0 {
|
| + mb := uint64(mb)
|
| +
|
| + for i, e := range idx.Entries[startIdx:] {
|
| + if e.Offset < st.startOffset {
|
| + // This shouldn't really happen, but it could happen if there is a
|
| + // corrupt index.
|
| + continue
|
| + }
|
| +
|
| + // Calculate the request offset and truncate if we've exceeded our maximum
|
| + // request bytes.
|
| + if size := (e.Offset - st.startOffset); size > mb {
|
| + st.setEndOffset(e.Offset)
|
| + st.setCount(i)
|
| + break
|
| + }
|
| }
|
| }
|
| return &st
|
|
|