Chromium Code Reviews| Index: server/logdog/storage/archive/storage.go |
| diff --git a/server/logdog/storage/archive/storage.go b/server/logdog/storage/archive/storage.go |
| index be5f556f55f3a85f1d1e9f134feac0d4e489b78f..cfd018bf4c808e55e6c04ac24ba2eb7ec8618c85 100644 |
| --- a/server/logdog/storage/archive/storage.go |
| +++ b/server/logdog/storage/archive/storage.go |
| @@ -54,6 +54,15 @@ type Options struct { |
| // |
| // Closing this Storage instance does not close the underlying Client. |
| Client gs.Client |
| + |
| + // MaxBytes, if >0, is the maximum number of bytes to fetch in any given |
| + // request. This should be set for GAE fetches, as large log streams may |
| + // exceed the urlfetch system's maximum response size otherwise. |
| + // |
| + // This is the number of bytes to request, not the number of bytes of log data |
| + // to return. The difference is that the former includes the RecordIO frame |
| + // headers. |
| + MaxBytes int |
| } |
| type storageImpl struct { |
| @@ -110,10 +119,13 @@ func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error |
| return nil |
| } |
| - r, err := s.Client.NewReader(s.streamPath, gs.Options{ |
| - From: st.from, |
| - To: st.to, |
| - }) |
| + offset := int64(st.startOffset) |
|
nodir
2016/04/19 23:41:17
st.offset()?
inline it to be consistent with lengt
dnj
2016/04/20 00:45:07
length is calculated, offset is not. There's no re
nodir
2016/04/20 16:42:57
make a function? It is already made. Otherwise why
dnj
2016/04/20 17:07:25
st.startOffset is not a struct member variable, no
dnj
2016/04/20 17:08:41
This should read, "st.startOffset *is* a struct me
nodir
2016/04/20 17:09:48
st variable is of type *getStrategy
*getStrategy h
dnj
2016/04/20 17:16:14
Ah I see, I didn't realize that was there. This is
|
| + log.Fields{ |
| + "offset": offset, |
| + "length": st.length(), |
| + "path": s.streamPath, |
| + }.Debugf(s, "Creating stream reader for range.") |
| + r, err := s.Client.NewReader(s.streamPath, offset, st.length()) |
| if err != nil { |
| log.WithError(err).Errorf(s, "Failed to create stream Reader.") |
| return err |
| @@ -128,9 +140,9 @@ func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error |
| buf := bytes.Buffer{} |
| le := logpb.LogEntry{} |
| - max := req.Limit |
| + max := st.count |
| for { |
| - offset := st.from + cr.Count() |
| + offset += cr.Count() |
| sz, r, err := rio.ReadFrame() |
| switch err { |
| @@ -205,9 +217,7 @@ func (s *storageImpl) Tail(path types.StreamPath) ([]byte, types.MessageIndex, e |
| lle := idx.Entries[len(idx.Entries)-1] |
| // Get a Reader for the Tail entry. |
| - r, err := s.Client.NewReader(s.streamPath, gs.Options{ |
| - From: int64(lle.Offset), |
| - }) |
| + r, err := s.Client.NewReader(s.streamPath, int64(lle.Offset), -1) |
| if err != nil { |
| log.Fields{ |
| log.ErrorKey: err, |
| @@ -237,7 +247,7 @@ func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { |
| defer s.indexMu.Unlock() |
| if s.index == nil { |
| - r, err := s.Client.NewReader(s.indexPath, gs.Options{}) |
| + r, err := s.Client.NewReader(s.indexPath, 0, -1) |
| if err != nil { |
| log.WithError(err).Errorf(s, "Failed to create index Reader.") |
| return nil, err |
| @@ -265,16 +275,45 @@ func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { |
| } |
| type getStrategy struct { |
| - // from is the beginning byte offset of the log entry stream. |
| - from int64 |
| - // to is the ending byte offset of the log entry stream. |
| - to int64 |
| + // startOffset is the beginning byte offset of the log entry stream. |
| + startOffset uint64 |
| + // endOffset is the ending byte offset of the log entry stream. |
| + endOffset uint64 |
| + // count is the number of log entries that will be fetched. |
| + count int |
| // lastIndex is the last log entry index in the stream. This will be -1 if |
| // there are no entries in the stream. |
| lastIndex types.MessageIndex |
| } |
| +func (gs *getStrategy) offset() int64 { |
| + return int64(gs.startOffset) |
| +} |
| + |
| +func (gs *getStrategy) length() int64 { |
| + if gs.startOffset < gs.endOffset { |
| + return int64(gs.endOffset - gs.startOffset) |
| + } |
| + return -1 |
| +} |
| + |
| +// setCount sets the `count` field. If called multiple times, the smallest |
| +// assigned value will be retained. |
| +func (gs *getStrategy) setCount(v int) { |
| + if gs.count <= 0 || gs.count > v { |
| + gs.count = v |
| + } |
| +} |
| + |
| +// setEndOffset sets the `length` field. If called multiple times, the smallest |
| +// assigned value will be retained. |
| +func (gs *getStrategy) setEndOffset(v uint64) { |
| + if gs.endOffset == 0 || gs.endOffset > v { |
| + gs.endOffset = v |
| + } |
| +} |
| + |
| func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIndex) *getStrategy { |
| st := getStrategy{} |
| @@ -289,17 +328,39 @@ func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn |
| startIdx = 0 |
| } |
| le := idx.Entries[startIdx] |
| - st.from = int64(le.Offset) |
| + st.startOffset = le.Offset |
| - // If we have a limit, and we have enough index entries to upper-bound our |
| - // stream based on that limit, use that. |
| + // Determine an upper bound based on our limits. |
| // |
| - // Note that this may overshoot if the index and/or stream is sparse. We know |
| - // for sure that we have one LogEntry per index entry, so that's the best we |
| - // can do. |
| + // If we have a count limit, and we have enough index entries to upper-bound |
| + // our stream based on that limit, use that. Note that this may overshoot if |
| + // the index and/or stream is sparse. We know for sure that we have one |
| + // LogEntry per index entry, so that's the best we can do. |
| if req.Limit > 0 { |
| if ub := startIdx + req.Limit; ub < len(idx.Entries) { |
| - st.to = int64(idx.Entries[ub].Offset) |
| + st.setEndOffset(idx.Entries[ub].Offset) |
| + } |
| + st.setCount(req.Limit) |
| + } |
| + |
| + // If we have a byte limit, count the entry sizes until we reach that limit. |
| + if mb := int64(s.MaxBytes); mb > 0 { |
| + mb := uint64(mb) |
| + |
| + for i, e := range idx.Entries[startIdx:] { |
| + if e.Offset < st.startOffset { |
| + // This shouldn't really happen, but it could happen if there is a |
| + // corrupt index. |
| + continue |
| + } |
| + |
| + // Calculate the request offset and truncate if we've exceeded our maximum |
| + // request bytes. |
| + if size := (e.Offset - st.startOffset); size > mb { |
| + st.setEndOffset(e.Offset) |
| + st.setCount(i) |
| + break |
| + } |
| } |
| } |
| return &st |