| Index: server/internal/logdog/archivist/storageSource.go
|
| diff --git a/server/internal/logdog/archivist/storageSource.go b/server/internal/logdog/archivist/storageSource.go
|
| index 490658dd1e35b7204cd702be2a8b5a91e6600aed..ae49d172d74c04d1a367b7a6ec51ccd673066e0d 100644
|
| --- a/server/internal/logdog/archivist/storageSource.go
|
| +++ b/server/internal/logdog/archivist/storageSource.go
|
| @@ -21,13 +21,11 @@ type storageSource struct {
|
|
|
| st storage.Storage // the storage instance to read from
|
| path types.StreamPath // the path of the log stream
|
| - contiguous bool // if true, enforce contiguous entries
|
| terminalIndex types.MessageIndex // if >= 0, discard logs beyond this
|
|
|
| - buf []*logpb.LogEntry
|
| - lastIndex types.MessageIndex
|
| - logEntryCount int64
|
| - hasMissingEntries bool // true if some log entries were missing.
|
| + buf []*logpb.LogEntry
|
| + lastIndex types.MessageIndex
|
| + logEntryCount int64
|
| }
|
|
|
| func (s *storageSource) bufferEntries(start types.MessageIndex) error {
|
| @@ -60,9 +58,6 @@ func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
|
| if err := s.bufferEntries(s.lastIndex + 1); err != nil {
|
| if err == storage.ErrDoesNotExist {
|
| log.Warningf(s, "Archive target stream does not exist in intermediate storage.")
|
| - if s.terminalIndex >= 0 {
|
| - s.hasMissingEntries = true
|
| - }
|
| return nil, archive.ErrEndOfStream
|
| }
|
|
|
| @@ -80,7 +75,6 @@ func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
|
| "terminalIndex": s.terminalIndex,
|
| "lastIndex": s.lastIndex,
|
| }.Warningf(s, "Log stream stopped before terminal index.")
|
| - s.hasMissingEntries = true
|
| } else {
|
| log.Fields{
|
| "lastIndex": s.lastIndex,
|
| @@ -94,23 +88,9 @@ func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
|
| var le *logpb.LogEntry
|
| le, s.buf = s.buf[0], s.buf[1:]
|
|
|
| - // If we're enforcing a contiguous log stream, error if this LogEntry is not
|
| - // contiguous.
|
| - sidx := types.MessageIndex(le.StreamIndex)
|
| - nidx := (s.lastIndex + 1)
|
| - if sidx != nidx {
|
| - s.hasMissingEntries = true
|
| - }
|
| - if s.contiguous && s.hasMissingEntries {
|
| - log.Fields{
|
| - "index": sidx,
|
| - "nextIndex": nidx,
|
| - }.Warningf(s, "Non-contiguous log stream while enforcing.")
|
| - return nil, archive.ErrEndOfStream
|
| - }
|
| -
|
| // If we're enforcing a maximum terminal index, return end of stream if this
|
| // LogEntry exceeds that index.
|
| + sidx := types.MessageIndex(le.StreamIndex)
|
| if s.terminalIndex >= 0 && sidx > s.terminalIndex {
|
| log.Fields{
|
| "index": sidx,
|
|
|