| Index: logdog/client/cmd/logdog_cat/coordinatorSource.go
|
| diff --git a/logdog/client/cmd/logdog_cat/coordinatorSource.go b/logdog/client/cmd/logdog_cat/coordinatorSource.go
|
| index 99cef8ceda480cc474bf606aab86c17e49b96b23..3578eb4c50b438eadc8ae2855c3d15da7af78a52 100644
|
| --- a/logdog/client/cmd/logdog_cat/coordinatorSource.go
|
| +++ b/logdog/client/cmd/logdog_cat/coordinatorSource.go
|
| @@ -34,7 +34,7 @@ type coordinatorSource struct {
|
| stream *coordinator.Stream
|
| tidx types.MessageIndex
|
|
|
| - state coordinator.LogStream
|
| + streamState *coordinator.LogStream
|
| }
|
|
|
| func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogRequest) (
|
| @@ -42,20 +42,27 @@ func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques
|
| s.Lock()
|
| defer s.Unlock()
|
|
|
| - p := coordinator.NewGetParams().Limit(int(req.Bytes), req.Count).Index(req.Index)
|
| + params := append(make([]coordinator.GetParam, 0, 4),
|
| + coordinator.LimitBytes(int(req.Bytes)),
|
| + coordinator.LimitCount(req.Count),
|
| + coordinator.Index(req.Index),
|
| + )
|
|
|
| // If we haven't terminated, use this opportunity to fetch/update our stream
|
| // state.
|
| - if s.tidx < 0 {
|
| - p = p.State(&s.state)
|
| + var streamState coordinator.LogStream
|
| + reqStream := (s.streamState == nil || s.streamState.State.TerminalIndex < 0)
|
| + if reqStream {
|
| + params = append(params, coordinator.WithState(&streamState))
|
| }
|
|
|
| for {
|
| - logs, err := s.stream.Get(c, p)
|
| + logs, err := s.stream.Get(c, params...)
|
| switch err {
|
| case nil:
|
| - if s.state.State != nil && s.tidx < 0 {
|
| - s.tidx = s.state.State.TerminalIndex
|
| + if reqStream {
|
| + s.streamState = &streamState
|
| + s.tidx = streamState.State.TerminalIndex
|
| }
|
| return logs, s.tidx, nil
|
|
|
| @@ -75,15 +82,9 @@ func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques
|
| }
|
| }
|
|
|
| -func (s *coordinatorSource) getState() coordinator.LogStream {
|
| - s.Lock()
|
| - defer s.Unlock()
|
| - return s.state
|
| -}
|
| -
|
| func (s *coordinatorSource) descriptor() (*logpb.LogStreamDescriptor, error) {
|
| - if d := s.state.Desc; d != nil {
|
| - return d, nil
|
| + if s.streamState != nil {
|
| + return &s.streamState.Desc, nil
|
| }
|
| - return nil, errors.New("no descriptor loaded")
|
| + return nil, errors.New("no stream state loaded")
|
| }
|
|
|