Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(747)

Side by Side Diff: logdog/client/cmd/logdog_cat/coordinatorSource.go

Issue 2341113002: Update Coordinator client, add datagram assembly. (Closed)
Patch Set: Comments. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | logdog/client/cmd/logdog_cat/subcommandList.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "sync" 9 "sync"
10 "time" 10 "time"
(...skipping 16 matching lines...) Expand all
27 ) 27 )
28 28
29 // coordinatorSource is a fetcher.Source implementation that uses the 29 // coordinatorSource is a fetcher.Source implementation that uses the
30 // Coordiantor API. 30 // Coordiantor API.
31 type coordinatorSource struct { 31 type coordinatorSource struct {
32 sync.Mutex 32 sync.Mutex
33 33
34 stream *coordinator.Stream 34 stream *coordinator.Stream
35 tidx types.MessageIndex 35 tidx types.MessageIndex
36 36
37 » state coordinator.LogStream 37 » streamState *coordinator.LogStream
38 } 38 }
39 39
40 func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques t) ( 40 func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques t) (
41 []*logpb.LogEntry, types.MessageIndex, error) { 41 []*logpb.LogEntry, types.MessageIndex, error) {
42 s.Lock() 42 s.Lock()
43 defer s.Unlock() 43 defer s.Unlock()
44 44
45 » p := coordinator.NewGetParams().Limit(int(req.Bytes), req.Count).Index(r eq.Index) 45 » params := append(make([]coordinator.GetParam, 0, 4),
46 » » coordinator.LimitBytes(int(req.Bytes)),
47 » » coordinator.LimitCount(req.Count),
48 » » coordinator.Index(req.Index),
49 » )
46 50
47 // If we haven't terminated, use this opportunity to fetch/update our st ream 51 // If we haven't terminated, use this opportunity to fetch/update our st ream
48 // state. 52 // state.
49 » if s.tidx < 0 { 53 » var streamState coordinator.LogStream
50 » » p = p.State(&s.state) 54 » reqStream := (s.streamState == nil || s.streamState.State.TerminalIndex < 0)
55 » if reqStream {
56 » » params = append(params, coordinator.WithState(&streamState))
51 } 57 }
52 58
53 for { 59 for {
54 » » logs, err := s.stream.Get(c, p) 60 » » logs, err := s.stream.Get(c, params...)
55 switch err { 61 switch err {
56 case nil: 62 case nil:
57 » » » if s.state.State != nil && s.tidx < 0 { 63 » » » if reqStream {
58 » » » » s.tidx = s.state.State.TerminalIndex 64 » » » » s.streamState = &streamState
65 » » » » s.tidx = streamState.State.TerminalIndex
59 } 66 }
60 return logs, s.tidx, nil 67 return logs, s.tidx, nil
61 68
62 case coordinator.ErrNoSuchStream: 69 case coordinator.ErrNoSuchStream:
63 log.WithError(err).Warningf(c, "Stream does not exist. S leeping pending registration.") 70 log.WithError(err).Warningf(c, "Stream does not exist. S leeping pending registration.")
64 71
65 // Delay, interrupting if our Context is interrupted. 72 // Delay, interrupting if our Context is interrupted.
66 if tr := <-clock.After(c, noStreamDelay); tr.Incomplete( ) { 73 if tr := <-clock.After(c, noStreamDelay); tr.Incomplete( ) {
67 return nil, 0, tr.Err 74 return nil, 0, tr.Err
68 } 75 }
69 76
70 default: 77 default:
71 if err != nil { 78 if err != nil {
72 return nil, 0, err 79 return nil, 0, err
73 } 80 }
74 } 81 }
75 } 82 }
76 } 83 }
77 84
78 func (s *coordinatorSource) getState() coordinator.LogStream { 85 func (s *coordinatorSource) descriptor() (*logpb.LogStreamDescriptor, error) {
79 » s.Lock() 86 » if s.streamState != nil {
80 » defer s.Unlock() 87 » » return &s.streamState.Desc, nil
81 » return s.state 88 » }
89 » return nil, errors.New("no stream state loaded")
82 } 90 }
83
84 func (s *coordinatorSource) descriptor() (*logpb.LogStreamDescriptor, error) {
85 if d := s.state.Desc; d != nil {
86 return d, nil
87 }
88 return nil, errors.New("no descriptor loaded")
89 }
OLDNEW
« no previous file with comments | « no previous file | logdog/client/cmd/logdog_cat/subcommandList.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698