Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(406)

Side by Side Diff: logdog/client/coordinator/stream_params.go

Issue 2341113002: Update Coordinator client, add datagram assembly. (Closed)
Patch Set: Comments. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/client/coordinator/stream.go ('k') | logdog/client/coordinator/stream_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" 8 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
9 "github.com/luci/luci-go/logdog/common/types" 9 "github.com/luci/luci-go/logdog/common/types"
10 ) 10 )
11 11
12 // StreamGetParams is an accumulating set of Stream Get request parameters. 12 // getParamsInst is an internal struct that accumulates a Get request and
13 type StreamGetParams struct { 13 // associated instructions from a series of iterative GetParam applications.
14 type getParamsInst struct {
14 // r is the Get request to populate. 15 // r is the Get request to populate.
15 r logdog.GetRequest 16 r logdog.GetRequest
16 17
17 // stateP is the stream state pointer. It is set by State, and, if suppl ied, 18 // stateP is the stream state pointer. It is set by State, and, if suppl ied,
18 // will cause the log request to return stream state. 19 // will cause the log request to return stream state.
19 stateP *LogStream 20 stateP *LogStream
20 } 21 }
21 22
22 // NewGetParams returns a new StreamGetParams instance. 23 // tailParamsInst is an internal struct that accumulates a Tail request and
23 func NewGetParams() *StreamGetParams { 24 // associated instructions from a series of iterative TailParam applications.
24 » return &StreamGetParams{} 25 type tailParamsInst struct {
26 » // r is the Tail request to populate.
27 » r logdog.TailRequest
28
29 » // stateP is the stream state pointer. It is set by State, and, if suppl ied,
30 » // will cause the log request to return stream state.
31 » stateP *LogStream
32
33 » // complete instructs the tail call to fetch a complete entry, instead o f just
34 » // the last log record.
35 » complete bool
25 } 36 }
26 37
27 func (p *StreamGetParams) clone() *StreamGetParams { 38 // GetParam is a condition or parameter to apply to a Get request.
28 » clone := *p 39 type GetParam interface {
29 » return &clone 40 » applyGet(p *getParamsInst)
41 }
42
43 // TailParam is a condition or parameter to apply to a Tail request.
44 type TailParam interface {
45 » applyTail(p *tailParamsInst)
46 }
47
48 type loadStateParam struct {
49 » stateP *LogStream
50 }
51
52 // WithState returns a Get/Tail parameter that loads the log stream's state into
53 // the supplied LogState pointer.
54 func WithState(stateP *LogStream) interface {
55 » GetParam
56 » TailParam
57 } {
58 » return &loadStateParam{stateP}
59 }
60
61 func (p *loadStateParam) applyGet(param *getParamsInst) {
62 » param.stateP = p.stateP
63 » param.r.State = (p.stateP != nil)
64 }
65
66 func (p *loadStateParam) applyTail(param *tailParamsInst) {
67 » param.stateP = p.stateP
68 » param.r.State = (p.stateP != nil)
69 }
70
71 type indexGetParam struct {
72 » index types.MessageIndex
30 } 73 }
31 74
32 // Index returns a stream Get parameter that causes the Get request to 75 // Index returns a stream Get parameter that causes the Get request to
33 // retrieve logs starting at the requested stream index instead of the default, 76 // retrieve logs starting at the requested stream index instead of the default,
34 // zero. 77 // zero.
35 func (p *StreamGetParams) Index(i types.MessageIndex) *StreamGetParams { 78 func Index(i types.MessageIndex) GetParam { return &indexGetParam{i} }
36 » p = p.clone() 79
37 » p.r.Index = int64(i) 80 func (p *indexGetParam) applyGet(param *getParamsInst) { param.r.Index = int64(p .index) }
38 » return p 81
82 type limitBytesGetParam struct {
83 » limit int
39 } 84 }
40 85
41 // Limit limits the returned logs either by count or by byte count. If either 86 // LimitBytes applies a byte constraint to the returned logs. If the supplied
42 // limit is <= 0, then no limit will be applied and the server will choose how 87 // limit is <= 0, then no byte constraint will be applied and the server will
43 // many logs to return. 88 // choose how many logs to return.
44 func (p *StreamGetParams) Limit(bytes, count int) *StreamGetParams { 89 func LimitBytes(limit int) GetParam {
45 » p = p.clone() 90 » if limit < 0 {
91 » » limit = 0
92 » }
93 » return &limitBytesGetParam{limit}
94 }
46 95
47 » if bytes < 0 { 96 func (p *limitBytesGetParam) applyGet(param *getParamsInst) { param.r.ByteCount = int32(p.limit) }
48 » » bytes = 0 97
98 type limitCountGetParam struct {
99 » limit int
100 }
101
102 // LimitCount applies a count constraint to the returned logs. If the supplied
103 // limit is <= 0, then no count constraint will be applied and the server will
104 // choose how many logs to return.
105 func LimitCount(limit int) GetParam {
106 » if limit < 0 {
107 » » limit = 0
49 } 108 }
50 » if count < 0 { 109 » return &limitCountGetParam{limit}
51 » » count = 0 110 }
52 » }
53 111
54 » p.r.ByteCount, p.r.LogCount = int32(bytes), int32(count) 112 func (p *limitCountGetParam) applyGet(param *getParamsInst) { param.r.LogCount = int32(p.limit) }
55 » return p 113
56 } 114 type nonContiguousGetParam struct{}
57 115
58 // NonContiguous returns a stream Get parameter that causes the Get request 116 // NonContiguous returns a stream Get parameter that causes the Get request
59 // to allow non-contiguous records to be returned. By default, only contiguous 117 // to allow non-contiguous records to be returned. By default, only contiguous
60 // records starting from the specific Index will be returned. 118 // records starting from the specific Index will be returned.
61 // 119 //
62 // By default, a log stream will return only contiguous records starting at the 120 // By default, a log stream will return only contiguous records starting at the
63 // requested index. For example, if a stream had: {0, 1, 2, 4, 5} and a request 121 // requested index. For example, if a stream had: {0, 1, 2, 4, 5} and a request
64 // was made for index 0, Get will return {0, 1, 2}, for index 3 {}, and for 122 // was made for index 0, Get will return {0, 1, 2}, for index 3 {}, and for
65 // index 4 {4, 5}. 123 // index 4 {4, 5}.
66 // 124 //
67 // If NonContiguous is true, a request for 0 will return {0, 1, 2, 4, 5} and so 125 // If NonContiguous is true, a request for 0 will return {0, 1, 2, 4, 5} and so
68 // on. 126 // on.
69 // 127 //
70 // Log entries generally should not be missing, but may be if either the logs 128 // Log entries generally should not be missing, but may be if either the logs
71 // are still streaming (since they can be ingested out of order) or if a data 129 // are still streaming (since they can be ingested out of order) or if a data
72 // loss or corruption occurs. 130 // loss or corruption occurs.
73 func (p *StreamGetParams) NonContiguous() *StreamGetParams { 131 func NonContiguous() GetParam { return nonContiguousGetParam{} }
74 » p = p.clone()
75 » p.r.NonContiguous = true
76 » return p
77 }
78 132
79 // State returns a stream Get parameter that causes the Get request to return 133 func (nonContiguousGetParam) applyGet(param *getParamsInst) { param.r.NonContigu ous = true }
80 // its stream state and log stream descriptor. 134
81 func (p *StreamGetParams) State(stateP *LogStream) *StreamGetParams { 135 type completeTailParam struct{}
82 » p = p.clone() 136
83 » p.stateP = stateP 137 // Complete instructs the Tail call to retrieve a complete record.
84 » return p 138 //
85 } 139 // If frgmented, the resulting record will be manufactured from its composite
140 // parts, and will not actually represent any single record in the log stream.
141 // The time offset, prefix and stream indices, sequence number, and content will
142 // be derived from the initial log entry in the composite set.
143 //
144 // If the log stream is a TEXT or BINARY stream, no behavior change will
145 // occur, and the last log record will be returned.
146 //
147 // If the log stream is a DATAGRAM stream and the Tail record is parked partial,
148 // additional log entries will be fetched via Get and the full log stream will
149 // be assembled. If the partial datagram entry is the "last" in its sequeence,
150 // the full datagram ending with it will be returned. If it's partial in the
151 // middle of a sequence, the previous complete datagram will be returned.
152 func Complete() TailParam { return completeTailParam{} }
153
154 func (completeTailParam) applyTail(param *tailParamsInst) { param.complete = tru e }
OLDNEW
« no previous file with comments | « logdog/client/coordinator/stream.go ('k') | logdog/client/coordinator/stream_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698