Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(254)

Unified Diff: logdog/client/cmd/logdog_cat/coordinatorSource.go

Issue 2341113002: Update Coordinator client, add datagram assembly. (Closed)
Patch Set: Comments. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | logdog/client/cmd/logdog_cat/subcommandList.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: logdog/client/cmd/logdog_cat/coordinatorSource.go
diff --git a/logdog/client/cmd/logdog_cat/coordinatorSource.go b/logdog/client/cmd/logdog_cat/coordinatorSource.go
index 99cef8ceda480cc474bf606aab86c17e49b96b23..3578eb4c50b438eadc8ae2855c3d15da7af78a52 100644
--- a/logdog/client/cmd/logdog_cat/coordinatorSource.go
+++ b/logdog/client/cmd/logdog_cat/coordinatorSource.go
@@ -34,7 +34,7 @@ type coordinatorSource struct {
stream *coordinator.Stream
tidx types.MessageIndex
- state coordinator.LogStream
+ streamState *coordinator.LogStream
}
func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogRequest) (
@@ -42,20 +42,27 @@ func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques
s.Lock()
defer s.Unlock()
- p := coordinator.NewGetParams().Limit(int(req.Bytes), req.Count).Index(req.Index)
+ params := append(make([]coordinator.GetParam, 0, 4),
+ coordinator.LimitBytes(int(req.Bytes)),
+ coordinator.LimitCount(req.Count),
+ coordinator.Index(req.Index),
+ )
// If we haven't terminated, use this opportunity to fetch/update our stream
// state.
- if s.tidx < 0 {
- p = p.State(&s.state)
+ var streamState coordinator.LogStream
+ reqStream := (s.streamState == nil || s.streamState.State.TerminalIndex < 0)
+ if reqStream {
+ params = append(params, coordinator.WithState(&streamState))
}
for {
- logs, err := s.stream.Get(c, p)
+ logs, err := s.stream.Get(c, params...)
switch err {
case nil:
- if s.state.State != nil && s.tidx < 0 {
- s.tidx = s.state.State.TerminalIndex
+ if reqStream {
+ s.streamState = &streamState
+ s.tidx = streamState.State.TerminalIndex
}
return logs, s.tidx, nil
@@ -75,15 +82,9 @@ func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques
}
}
-func (s *coordinatorSource) getState() coordinator.LogStream {
- s.Lock()
- defer s.Unlock()
- return s.state
-}
-
func (s *coordinatorSource) descriptor() (*logpb.LogStreamDescriptor, error) {
- if d := s.state.Desc; d != nil {
- return d, nil
+ if s.streamState != nil {
+ return &s.streamState.Desc, nil
}
- return nil, errors.New("no descriptor loaded")
+ return nil, errors.New("no stream state loaded")
}
« no previous file with comments | « no previous file | logdog/client/cmd/logdog_cat/subcommandList.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698