| Index: logdog/client/coordinator/stream_params.go
|
| diff --git a/logdog/client/coordinator/stream_params.go b/logdog/client/coordinator/stream_params.go
|
| index 56851d43c46055051a26f3ff0a5dbe4390389ef8..c68c1823e4a805357ab18edc5aa2e942454c1441 100644
|
| --- a/logdog/client/coordinator/stream_params.go
|
| +++ b/logdog/client/coordinator/stream_params.go
|
| @@ -9,8 +9,9 @@ import (
|
| "github.com/luci/luci-go/logdog/common/types"
|
| )
|
|
|
| -// StreamGetParams is an accumulating set of Stream Get request parameters.
|
| -type StreamGetParams struct {
|
| +// getParamsInst is an internal struct that accumulates a Get request and
|
| +// associated instructions from a series of iterative GetParam applications.
|
| +type getParamsInst struct {
|
| // r is the Get request to populate.
|
| r logdog.GetRequest
|
|
|
| @@ -19,42 +20,99 @@ type StreamGetParams struct {
|
| stateP *LogStream
|
| }
|
|
|
| -// NewGetParams returns a new StreamGetParams instance.
|
| -func NewGetParams() *StreamGetParams {
|
| - return &StreamGetParams{}
|
| +// tailParamsInst is an internal struct that accumulates a Tail request and
|
| +// associated instructions from a series of iterative TailParam applications.
|
| +type tailParamsInst struct {
|
| + // r is the Tail request to populate.
|
| + r logdog.TailRequest
|
| +
|
| + // stateP is the stream state pointer. It is set by State, and, if supplied,
|
| + // will cause the log request to return stream state.
|
| + stateP *LogStream
|
| +
|
| + // complete instructs the tail call to fetch a complete entry, instead of just
|
| + // the last log record.
|
| + complete bool
|
| }
|
|
|
| -func (p *StreamGetParams) clone() *StreamGetParams {
|
| - clone := *p
|
| - return &clone
|
| +// GetParam is a condition or parameter to apply to a Get request.
|
| +type GetParam interface {
|
| + applyGet(p *getParamsInst)
|
| +}
|
| +
|
| +// TailParam is a condition or parameter to apply to a Tail request.
|
| +type TailParam interface {
|
| + applyTail(p *tailParamsInst)
|
| +}
|
| +
|
| +type loadStateParam struct {
|
| + stateP *LogStream
|
| +}
|
| +
|
| +// WithState returns a Get/Tail parameter that loads the log stream's state into
|
| +// the supplied LogState pointer.
|
| +func WithState(stateP *LogStream) interface {
|
| + GetParam
|
| + TailParam
|
| +} {
|
| + return &loadStateParam{stateP}
|
| +}
|
| +
|
| +func (p *loadStateParam) applyGet(param *getParamsInst) {
|
| + param.stateP = p.stateP
|
| + param.r.State = (p.stateP != nil)
|
| +}
|
| +
|
| +func (p *loadStateParam) applyTail(param *tailParamsInst) {
|
| + param.stateP = p.stateP
|
| + param.r.State = (p.stateP != nil)
|
| +}
|
| +
|
| +type indexGetParam struct {
|
| + index types.MessageIndex
|
| }
|
|
|
| // Index returns a stream Get parameter that causes the Get request to
|
| // retrieve logs starting at the requested stream index instead of the default,
|
| // zero.
|
| -func (p *StreamGetParams) Index(i types.MessageIndex) *StreamGetParams {
|
| - p = p.clone()
|
| - p.r.Index = int64(i)
|
| - return p
|
| -}
|
| +func Index(i types.MessageIndex) GetParam { return &indexGetParam{i} }
|
|
|
| -// Limit limits the returned logs either by count or by byte count. If either
|
| -// limit is <= 0, then no limit will be applied and the server will choose how
|
| -// many logs to return.
|
| -func (p *StreamGetParams) Limit(bytes, count int) *StreamGetParams {
|
| - p = p.clone()
|
| +func (p *indexGetParam) applyGet(param *getParamsInst) { param.r.Index = int64(p.index) }
|
|
|
| - if bytes < 0 {
|
| - bytes = 0
|
| - }
|
| - if count < 0 {
|
| - count = 0
|
| +type limitBytesGetParam struct {
|
| + limit int
|
| +}
|
| +
|
| +// LimitBytes applies a byte constraint to the returned logs. If the supplied
|
| +// limit is <= 0, then no byte constraint will be applied and the server will
|
| +// choose how many logs to return.
|
| +func LimitBytes(limit int) GetParam {
|
| + if limit < 0 {
|
| + limit = 0
|
| }
|
| + return &limitBytesGetParam{limit}
|
| +}
|
| +
|
| +func (p *limitBytesGetParam) applyGet(param *getParamsInst) { param.r.ByteCount = int32(p.limit) }
|
|
|
| - p.r.ByteCount, p.r.LogCount = int32(bytes), int32(count)
|
| - return p
|
| +type limitCountGetParam struct {
|
| + limit int
|
| }
|
|
|
| +// LimitCount applies a count constraint to the returned logs. If the supplied
|
| +// limit is <= 0, then no count constraint will be applied and the server will
|
| +// choose how many logs to return.
|
| +func LimitCount(limit int) GetParam {
|
| + if limit < 0 {
|
| + limit = 0
|
| + }
|
| + return &limitCountGetParam{limit}
|
| +}
|
| +
|
| +func (p *limitCountGetParam) applyGet(param *getParamsInst) { param.r.LogCount = int32(p.limit) }
|
| +
|
| +type nonContiguousGetParam struct{}
|
| +
|
| // NonContiguous returns a stream Get parameter that causes the Get request
|
| // to allow non-contiguous records to be returned. By default, only contiguous
|
| // records starting from the specific Index will be returned.
|
| @@ -70,16 +128,27 @@ func (p *StreamGetParams) Limit(bytes, count int) *StreamGetParams {
|
| // Log entries generally should not be missing, but may be if either the logs
|
| // are still streaming (since they can be ingested out of order) or if a data
|
| // loss or corruption occurs.
|
| -func (p *StreamGetParams) NonContiguous() *StreamGetParams {
|
| - p = p.clone()
|
| - p.r.NonContiguous = true
|
| - return p
|
| -}
|
| +func NonContiguous() GetParam { return nonContiguousGetParam{} }
|
|
|
| -// State returns a stream Get parameter that causes the Get request to return
|
| -// its stream state and log stream descriptor.
|
| -func (p *StreamGetParams) State(stateP *LogStream) *StreamGetParams {
|
| - p = p.clone()
|
| - p.stateP = stateP
|
| - return p
|
| -}
|
| +func (nonContiguousGetParam) applyGet(param *getParamsInst) { param.r.NonContiguous = true }
|
| +
|
| +type completeTailParam struct{}
|
| +
|
| +// Complete instructs the Tail call to retrieve a complete record.
|
| +//
|
| +// If frgmented, the resulting record will be manufactured from its composite
|
| +// parts, and will not actually represent any single record in the log stream.
|
| +// The time offset, prefix and stream indices, sequence number, and content will
|
| +// be derived from the initial log entry in the composite set.
|
| +//
|
| +// If the log stream is a TEXT or BINARY stream, no behavior change will
|
| +// occur, and the last log record will be returned.
|
| +//
|
| +// If the log stream is a DATAGRAM stream and the Tail record is parked partial,
|
| +// additional log entries will be fetched via Get and the full log stream will
|
| +// be assembled. If the partial datagram entry is the "last" in its sequeence,
|
| +// the full datagram ending with it will be returned. If it's partial in the
|
| +// middle of a sequence, the previous complete datagram will be returned.
|
| +func Complete() TailParam { return completeTailParam{} }
|
| +
|
| +func (completeTailParam) applyTail(param *tailParamsInst) { param.complete = true }
|
|
|