| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "sync" | 9 "sync" |
| 10 "time" | 10 "time" |
| (...skipping 16 matching lines...) Expand all Loading... |
| 27 ) | 27 ) |
| 28 | 28 |
| 29 // coordinatorSource is a fetcher.Source implementation that uses the | 29 // coordinatorSource is a fetcher.Source implementation that uses the |
| 30 // Coordiantor API. | 30 // Coordiantor API. |
| 31 type coordinatorSource struct { | 31 type coordinatorSource struct { |
| 32 sync.Mutex | 32 sync.Mutex |
| 33 | 33 |
| 34 stream *coordinator.Stream | 34 stream *coordinator.Stream |
| 35 tidx types.MessageIndex | 35 tidx types.MessageIndex |
| 36 | 36 |
| 37 » state coordinator.LogStream | 37 » streamState *coordinator.LogStream |
| 38 } | 38 } |
| 39 | 39 |
| 40 func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques
t) ( | 40 func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques
t) ( |
| 41 []*logpb.LogEntry, types.MessageIndex, error) { | 41 []*logpb.LogEntry, types.MessageIndex, error) { |
| 42 s.Lock() | 42 s.Lock() |
| 43 defer s.Unlock() | 43 defer s.Unlock() |
| 44 | 44 |
| 45 » p := coordinator.NewGetParams().Limit(int(req.Bytes), req.Count).Index(r
eq.Index) | 45 » params := append(make([]coordinator.GetParam, 0, 4), |
| 46 » » coordinator.LimitBytes(int(req.Bytes)), |
| 47 » » coordinator.LimitCount(req.Count), |
| 48 » » coordinator.Index(req.Index), |
| 49 » ) |
| 46 | 50 |
| 47 // If we haven't terminated, use this opportunity to fetch/update our st
ream | 51 // If we haven't terminated, use this opportunity to fetch/update our st
ream |
| 48 // state. | 52 // state. |
| 49 » if s.tidx < 0 { | 53 » var streamState coordinator.LogStream |
| 50 » » p = p.State(&s.state) | 54 » reqStream := (s.streamState == nil || s.streamState.State.TerminalIndex
< 0) |
| 55 » if reqStream { |
| 56 » » params = append(params, coordinator.WithState(&streamState)) |
| 51 } | 57 } |
| 52 | 58 |
| 53 for { | 59 for { |
| 54 » » logs, err := s.stream.Get(c, p) | 60 » » logs, err := s.stream.Get(c, params...) |
| 55 switch err { | 61 switch err { |
| 56 case nil: | 62 case nil: |
| 57 » » » if s.state.State != nil && s.tidx < 0 { | 63 » » » if reqStream { |
| 58 » » » » s.tidx = s.state.State.TerminalIndex | 64 » » » » s.streamState = &streamState |
| 65 » » » » s.tidx = streamState.State.TerminalIndex |
| 59 } | 66 } |
| 60 return logs, s.tidx, nil | 67 return logs, s.tidx, nil |
| 61 | 68 |
| 62 case coordinator.ErrNoSuchStream: | 69 case coordinator.ErrNoSuchStream: |
| 63 log.WithError(err).Warningf(c, "Stream does not exist. S
leeping pending registration.") | 70 log.WithError(err).Warningf(c, "Stream does not exist. S
leeping pending registration.") |
| 64 | 71 |
| 65 // Delay, interrupting if our Context is interrupted. | 72 // Delay, interrupting if our Context is interrupted. |
| 66 if tr := <-clock.After(c, noStreamDelay); tr.Incomplete(
) { | 73 if tr := <-clock.After(c, noStreamDelay); tr.Incomplete(
) { |
| 67 return nil, 0, tr.Err | 74 return nil, 0, tr.Err |
| 68 } | 75 } |
| 69 | 76 |
| 70 default: | 77 default: |
| 71 if err != nil { | 78 if err != nil { |
| 72 return nil, 0, err | 79 return nil, 0, err |
| 73 } | 80 } |
| 74 } | 81 } |
| 75 } | 82 } |
| 76 } | 83 } |
| 77 | 84 |
| 78 func (s *coordinatorSource) getState() coordinator.LogStream { | 85 func (s *coordinatorSource) descriptor() (*logpb.LogStreamDescriptor, error) { |
| 79 » s.Lock() | 86 » if s.streamState != nil { |
| 80 » defer s.Unlock() | 87 » » return &s.streamState.Desc, nil |
| 81 » return s.state | 88 » } |
| 89 » return nil, errors.New("no stream state loaded") |
| 82 } | 90 } |
| 83 | |
| 84 func (s *coordinatorSource) descriptor() (*logpb.LogStreamDescriptor, error) { | |
| 85 if d := s.state.Desc; d != nil { | |
| 86 return d, nil | |
| 87 } | |
| 88 return nil, errors.New("no descriptor loaded") | |
| 89 } | |
| OLD | NEW |