Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Unified Diff: server/logdog/storage/archive/storage.go

Issue 1904503003: LogDog: Fix archived log stream read errors. (Closed) Base URL: https://github.com/luci/luci-go@hierarchy-check-first
Patch Set: Delete "offset()" method. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/recordio/size_test.go ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/logdog/storage/archive/storage.go
diff --git a/server/logdog/storage/archive/storage.go b/server/logdog/storage/archive/storage.go
index be5f556f55f3a85f1d1e9f134feac0d4e489b78f..a6ed17e06b696e1a980c549ae42c45e2dbc49ac9 100644
--- a/server/logdog/storage/archive/storage.go
+++ b/server/logdog/storage/archive/storage.go
@@ -54,6 +54,15 @@ type Options struct {
//
// Closing this Storage instance does not close the underlying Client.
Client gs.Client
+
+ // MaxBytes, if >0, is the maximum number of bytes to fetch in any given
+ // request. This should be set for GAE fetches, as large log streams may
+ // exceed the urlfetch system's maximum response size otherwise.
+ //
+ // This is the number of bytes to request, not the number of bytes of log data
+ // to return. The difference is that the former includes the RecordIO frame
+ // headers.
+ MaxBytes int
}
type storageImpl struct {
@@ -110,10 +119,13 @@ func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error
return nil
}
- r, err := s.Client.NewReader(s.streamPath, gs.Options{
- From: st.from,
- To: st.to,
- })
+ offset := int64(st.startOffset)
+ log.Fields{
+ "offset": offset,
+ "length": st.length(),
+ "path": s.streamPath,
+ }.Debugf(s, "Creating stream reader for range.")
+ r, err := s.Client.NewReader(s.streamPath, offset, st.length())
if err != nil {
log.WithError(err).Errorf(s, "Failed to create stream Reader.")
return err
@@ -128,9 +140,9 @@ func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error
buf := bytes.Buffer{}
le := logpb.LogEntry{}
- max := req.Limit
+ max := st.count
for {
- offset := st.from + cr.Count()
+ offset += cr.Count()
sz, r, err := rio.ReadFrame()
switch err {
@@ -205,9 +217,7 @@ func (s *storageImpl) Tail(path types.StreamPath) ([]byte, types.MessageIndex, e
lle := idx.Entries[len(idx.Entries)-1]
// Get a Reader for the Tail entry.
- r, err := s.Client.NewReader(s.streamPath, gs.Options{
- From: int64(lle.Offset),
- })
+ r, err := s.Client.NewReader(s.streamPath, int64(lle.Offset), -1)
if err != nil {
log.Fields{
log.ErrorKey: err,
@@ -237,7 +247,7 @@ func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
defer s.indexMu.Unlock()
if s.index == nil {
- r, err := s.Client.NewReader(s.indexPath, gs.Options{})
+ r, err := s.Client.NewReader(s.indexPath, 0, -1)
if err != nil {
log.WithError(err).Errorf(s, "Failed to create index Reader.")
return nil, err
@@ -265,16 +275,41 @@ func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
}
type getStrategy struct {
- // from is the beginning byte offset of the log entry stream.
- from int64
- // to is the ending byte offset of the log entry stream.
- to int64
+ // startOffset is the beginning byte offset of the log entry stream.
+ startOffset uint64
+ // endOffset is the ending byte offset of the log entry stream.
+ endOffset uint64
+ // count is the number of log entries that will be fetched.
+ count int
// lastIndex is the last log entry index in the stream. This will be -1 if
// there are no entries in the stream.
lastIndex types.MessageIndex
}
+func (gs *getStrategy) length() int64 {
+ if gs.startOffset < gs.endOffset {
+ return int64(gs.endOffset - gs.startOffset)
+ }
+ return -1
+}
+
+// setCount sets the `count` field. If called multiple times, the smallest
+// assigned value will be retained.
+func (gs *getStrategy) setCount(v int) {
+ if gs.count <= 0 || gs.count > v {
+ gs.count = v
+ }
+}
+
+// setEndOffset sets the `length` field. If called multiple times, the smallest
+// assigned value will be retained.
+func (gs *getStrategy) setEndOffset(v uint64) {
+ if gs.endOffset == 0 || gs.endOffset > v {
+ gs.endOffset = v
+ }
+}
+
func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIndex) *getStrategy {
st := getStrategy{}
@@ -289,17 +324,39 @@ func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn
startIdx = 0
}
le := idx.Entries[startIdx]
- st.from = int64(le.Offset)
+ st.startOffset = le.Offset
- // If we have a limit, and we have enough index entries to upper-bound our
- // stream based on that limit, use that.
+ // Determine an upper bound based on our limits.
//
- // Note that this may overshoot if the index and/or stream is sparse. We know
- // for sure that we have one LogEntry per index entry, so that's the best we
- // can do.
+ // If we have a count limit, and we have enough index entries to upper-bound
+ // our stream based on that limit, use that. Note that this may overshoot if
+ // the index and/or stream is sparse. We know for sure that we have one
+ // LogEntry per index entry, so that's the best we can do.
if req.Limit > 0 {
if ub := startIdx + req.Limit; ub < len(idx.Entries) {
- st.to = int64(idx.Entries[ub].Offset)
+ st.setEndOffset(idx.Entries[ub].Offset)
+ }
+ st.setCount(req.Limit)
+ }
+
+ // If we have a byte limit, count the entry sizes until we reach that limit.
+ if mb := int64(s.MaxBytes); mb > 0 {
+ mb := uint64(mb)
+
+ for i, e := range idx.Entries[startIdx:] {
+ if e.Offset < st.startOffset {
+ // This shouldn't really happen, but it could happen if there is a
+ // corrupt index.
+ continue
+ }
+
+ // Calculate the request offset and truncate if we've exceeded our maximum
+ // request bytes.
+ if size := (e.Offset - st.startOffset); size > mb {
+ st.setEndOffset(e.Offset)
+ st.setCount(i)
+ break
+ }
}
}
return &st
« no previous file with comments | « common/recordio/size_test.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698