| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" | 8 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" |
| 9 "github.com/luci/luci-go/logdog/common/types" | 9 "github.com/luci/luci-go/logdog/common/types" |
| 10 ) | 10 ) |
| 11 | 11 |
| 12 // StreamGetParams is an accumulating set of Stream Get request parameters. | 12 // getParamsInst is an internal struct that accumulates a Get request and |
| 13 type StreamGetParams struct { | 13 // associated instructions from a series of iterative GetParam applications. |
| 14 type getParamsInst struct { |
| 14 // r is the Get request to populate. | 15 // r is the Get request to populate. |
| 15 r logdog.GetRequest | 16 r logdog.GetRequest |
| 16 | 17 |
| 17 // stateP is the stream state pointer. It is set by State, and, if suppl
ied, | 18 // stateP is the stream state pointer. It is set by State, and, if suppl
ied, |
| 18 // will cause the log request to return stream state. | 19 // will cause the log request to return stream state. |
| 19 stateP *LogStream | 20 stateP *LogStream |
| 20 } | 21 } |
| 21 | 22 |
| 22 // NewGetParams returns a new StreamGetParams instance. | 23 // tailParamsInst is an internal struct that accumulates a Tail request and |
| 23 func NewGetParams() *StreamGetParams { | 24 // associated instructions from a series of iterative TailParam applications. |
| 24 » return &StreamGetParams{} | 25 type tailParamsInst struct { |
| 26 » // r is the Tail request to populate. |
| 27 » r logdog.TailRequest |
| 28 |
| 29 » // stateP is the stream state pointer. It is set by State, and, if suppl
ied, |
| 30 » // will cause the log request to return stream state. |
| 31 » stateP *LogStream |
| 32 |
| 33 » // complete instructs the tail call to fetch a complete entry, instead o
f just |
| 34 » // the last log record. |
| 35 » complete bool |
| 25 } | 36 } |
| 26 | 37 |
| 27 func (p *StreamGetParams) clone() *StreamGetParams { | 38 // GetParam is a condition or parameter to apply to a Get request. |
| 28 » clone := *p | 39 type GetParam interface { |
| 29 » return &clone | 40 » applyGet(p *getParamsInst) |
| 41 } |
| 42 |
| 43 // TailParam is a condition or parameter to apply to a Tail request. |
| 44 type TailParam interface { |
| 45 » applyTail(p *tailParamsInst) |
| 46 } |
| 47 |
| 48 type loadStateParam struct { |
| 49 » stateP *LogStream |
| 50 } |
| 51 |
| 52 // WithState returns a Get/Tail parameter that loads the log stream's state into |
| 53 // the supplied LogState pointer. |
| 54 func WithState(stateP *LogStream) interface { |
| 55 » GetParam |
| 56 » TailParam |
| 57 } { |
| 58 » return &loadStateParam{stateP} |
| 59 } |
| 60 |
| 61 func (p *loadStateParam) applyGet(param *getParamsInst) { |
| 62 » param.stateP = p.stateP |
| 63 » param.r.State = (p.stateP != nil) |
| 64 } |
| 65 |
| 66 func (p *loadStateParam) applyTail(param *tailParamsInst) { |
| 67 » param.stateP = p.stateP |
| 68 » param.r.State = (p.stateP != nil) |
| 69 } |
| 70 |
| 71 type indexGetParam struct { |
| 72 » index types.MessageIndex |
| 30 } | 73 } |
| 31 | 74 |
| 32 // Index returns a stream Get parameter that causes the Get request to | 75 // Index returns a stream Get parameter that causes the Get request to |
| 33 // retrieve logs starting at the requested stream index instead of the default, | 76 // retrieve logs starting at the requested stream index instead of the default, |
| 34 // zero. | 77 // zero. |
| 35 func (p *StreamGetParams) Index(i types.MessageIndex) *StreamGetParams { | 78 func Index(i types.MessageIndex) GetParam { return &indexGetParam{i} } |
| 36 » p = p.clone() | 79 |
| 37 » p.r.Index = int64(i) | 80 func (p *indexGetParam) applyGet(param *getParamsInst) { param.r.Index = int64(p
.index) } |
| 38 » return p | 81 |
| 82 type limitBytesGetParam struct { |
| 83 » limit int |
| 39 } | 84 } |
| 40 | 85 |
| 41 // Limit limits the returned logs either by count or by byte count. If either | 86 // LimitBytes applies a byte constraint to the returned logs. If the supplied |
| 42 // limit is <= 0, then no limit will be applied and the server will choose how | 87 // limit is <= 0, then no byte constraint will be applied and the server will |
| 43 // many logs to return. | 88 // choose how many logs to return. |
| 44 func (p *StreamGetParams) Limit(bytes, count int) *StreamGetParams { | 89 func LimitBytes(limit int) GetParam { |
| 45 » p = p.clone() | 90 » if limit < 0 { |
| 91 » » limit = 0 |
| 92 » } |
| 93 » return &limitBytesGetParam{limit} |
| 94 } |
| 46 | 95 |
| 47 » if bytes < 0 { | 96 func (p *limitBytesGetParam) applyGet(param *getParamsInst) { param.r.ByteCount
= int32(p.limit) } |
| 48 » » bytes = 0 | 97 |
| 98 type limitCountGetParam struct { |
| 99 » limit int |
| 100 } |
| 101 |
| 102 // LimitCount applies a count constraint to the returned logs. If the supplied |
| 103 // limit is <= 0, then no count constraint will be applied and the server will |
| 104 // choose how many logs to return. |
| 105 func LimitCount(limit int) GetParam { |
| 106 » if limit < 0 { |
| 107 » » limit = 0 |
| 49 } | 108 } |
| 50 » if count < 0 { | 109 » return &limitCountGetParam{limit} |
| 51 » » count = 0 | 110 } |
| 52 » } | |
| 53 | 111 |
| 54 » p.r.ByteCount, p.r.LogCount = int32(bytes), int32(count) | 112 func (p *limitCountGetParam) applyGet(param *getParamsInst) { param.r.LogCount =
int32(p.limit) } |
| 55 » return p | 113 |
| 56 } | 114 type nonContiguousGetParam struct{} |
| 57 | 115 |
| 58 // NonContiguous returns a stream Get parameter that causes the Get request | 116 // NonContiguous returns a stream Get parameter that causes the Get request |
| 59 // to allow non-contiguous records to be returned. By default, only contiguous | 117 // to allow non-contiguous records to be returned. By default, only contiguous |
| 60 // records starting from the specific Index will be returned. | 118 // records starting from the specific Index will be returned. |
| 61 // | 119 // |
| 62 // By default, a log stream will return only contiguous records starting at the | 120 // By default, a log stream will return only contiguous records starting at the |
| 63 // requested index. For example, if a stream had: {0, 1, 2, 4, 5} and a request | 121 // requested index. For example, if a stream had: {0, 1, 2, 4, 5} and a request |
| 64 // was made for index 0, Get will return {0, 1, 2}, for index 3 {}, and for | 122 // was made for index 0, Get will return {0, 1, 2}, for index 3 {}, and for |
| 65 // index 4 {4, 5}. | 123 // index 4 {4, 5}. |
| 66 // | 124 // |
| 67 // If NonContiguous is true, a request for 0 will return {0, 1, 2, 4, 5} and so | 125 // If NonContiguous is true, a request for 0 will return {0, 1, 2, 4, 5} and so |
| 68 // on. | 126 // on. |
| 69 // | 127 // |
| 70 // Log entries generally should not be missing, but may be if either the logs | 128 // Log entries generally should not be missing, but may be if either the logs |
| 71 // are still streaming (since they can be ingested out of order) or if a data | 129 // are still streaming (since they can be ingested out of order) or if a data |
| 72 // loss or corruption occurs. | 130 // loss or corruption occurs. |
| 73 func (p *StreamGetParams) NonContiguous() *StreamGetParams { | 131 func NonContiguous() GetParam { return nonContiguousGetParam{} } |
| 74 » p = p.clone() | |
| 75 » p.r.NonContiguous = true | |
| 76 » return p | |
| 77 } | |
| 78 | 132 |
| 79 // State returns a stream Get parameter that causes the Get request to return | 133 func (nonContiguousGetParam) applyGet(param *getParamsInst) { param.r.NonContigu
ous = true } |
| 80 // its stream state and log stream descriptor. | 134 |
| 81 func (p *StreamGetParams) State(stateP *LogStream) *StreamGetParams { | 135 type completeTailParam struct{} |
| 82 » p = p.clone() | 136 |
| 83 » p.stateP = stateP | 137 // Complete instructs the Tail call to retrieve a complete record. |
| 84 » return p | 138 // |
| 85 } | 139 // If frgmented, the resulting record will be manufactured from its composite |
| 140 // parts, and will not actually represent any single record in the log stream. |
| 141 // The time offset, prefix and stream indices, sequence number, and content will |
| 142 // be derived from the initial log entry in the composite set. |
| 143 // |
| 144 // If the log stream is a TEXT or BINARY stream, no behavior change will |
| 145 // occur, and the last log record will be returned. |
| 146 // |
| 147 // If the log stream is a DATAGRAM stream and the Tail record is parked partial, |
| 148 // additional log entries will be fetched via Get and the full log stream will |
| 149 // be assembled. If the partial datagram entry is the "last" in its sequeence, |
| 150 // the full datagram ending with it will be returned. If it's partial in the |
| 151 // middle of a sequence, the previous complete datagram will be returned. |
| 152 func Complete() TailParam { return completeTailParam{} } |
| 153 |
| 154 func (completeTailParam) applyTail(param *tailParamsInst) { param.complete = tru
e } |
| OLD | NEW |