Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(560)

Unified Diff: logdog/client/coordinator/stream_params.go

Issue 2341113002: Update Coordinator client, add datagram assembly. (Closed)
Patch Set: Comments. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « logdog/client/coordinator/stream.go ('k') | logdog/client/coordinator/stream_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: logdog/client/coordinator/stream_params.go
diff --git a/logdog/client/coordinator/stream_params.go b/logdog/client/coordinator/stream_params.go
index 56851d43c46055051a26f3ff0a5dbe4390389ef8..c68c1823e4a805357ab18edc5aa2e942454c1441 100644
--- a/logdog/client/coordinator/stream_params.go
+++ b/logdog/client/coordinator/stream_params.go
@@ -9,8 +9,9 @@ import (
"github.com/luci/luci-go/logdog/common/types"
)
-// StreamGetParams is an accumulating set of Stream Get request parameters.
-type StreamGetParams struct {
+// getParamsInst is an internal struct that accumulates a Get request and
+// associated instructions from a series of iterative GetParam applications.
+type getParamsInst struct {
// r is the Get request to populate.
r logdog.GetRequest
@@ -19,42 +20,99 @@ type StreamGetParams struct {
stateP *LogStream
}
-// NewGetParams returns a new StreamGetParams instance.
-func NewGetParams() *StreamGetParams {
- return &StreamGetParams{}
+// tailParamsInst is an internal struct that accumulates a Tail request and
+// associated instructions from a series of iterative TailParam applications.
+type tailParamsInst struct {
+ // r is the Tail request to populate.
+ r logdog.TailRequest
+
+ // stateP is the stream state pointer. It is set by State, and, if supplied,
+ // will cause the log request to return stream state.
+ stateP *LogStream
+
+ // complete instructs the tail call to fetch a complete entry, instead of just
+ // the last log record.
+ complete bool
}
-func (p *StreamGetParams) clone() *StreamGetParams {
- clone := *p
- return &clone
+// GetParam is a condition or parameter to apply to a Get request.
+type GetParam interface {
+ applyGet(p *getParamsInst)
+}
+
+// TailParam is a condition or parameter to apply to a Tail request.
+type TailParam interface {
+ applyTail(p *tailParamsInst)
+}
+
+type loadStateParam struct {
+ stateP *LogStream
+}
+
+// WithState returns a Get/Tail parameter that loads the log stream's state into
+// the supplied LogState pointer.
+func WithState(stateP *LogStream) interface {
+ GetParam
+ TailParam
+} {
+ return &loadStateParam{stateP}
+}
+
+func (p *loadStateParam) applyGet(param *getParamsInst) {
+ param.stateP = p.stateP
+ param.r.State = (p.stateP != nil)
+}
+
+func (p *loadStateParam) applyTail(param *tailParamsInst) {
+ param.stateP = p.stateP
+ param.r.State = (p.stateP != nil)
+}
+
+type indexGetParam struct {
+ index types.MessageIndex
}
// Index returns a stream Get parameter that causes the Get request to
// retrieve logs starting at the requested stream index instead of the default,
// zero.
-func (p *StreamGetParams) Index(i types.MessageIndex) *StreamGetParams {
- p = p.clone()
- p.r.Index = int64(i)
- return p
-}
+func Index(i types.MessageIndex) GetParam { return &indexGetParam{i} }
-// Limit limits the returned logs either by count or by byte count. If either
-// limit is <= 0, then no limit will be applied and the server will choose how
-// many logs to return.
-func (p *StreamGetParams) Limit(bytes, count int) *StreamGetParams {
- p = p.clone()
+func (p *indexGetParam) applyGet(param *getParamsInst) { param.r.Index = int64(p.index) }
- if bytes < 0 {
- bytes = 0
- }
- if count < 0 {
- count = 0
+type limitBytesGetParam struct {
+ limit int
+}
+
+// LimitBytes applies a byte constraint to the returned logs. If the supplied
+// limit is <= 0, then no byte constraint will be applied and the server will
+// choose how many logs to return.
+func LimitBytes(limit int) GetParam {
+ if limit < 0 {
+ limit = 0
}
+ return &limitBytesGetParam{limit}
+}
+
+func (p *limitBytesGetParam) applyGet(param *getParamsInst) { param.r.ByteCount = int32(p.limit) }
- p.r.ByteCount, p.r.LogCount = int32(bytes), int32(count)
- return p
+type limitCountGetParam struct {
+ limit int
}
+// LimitCount applies a count constraint to the returned logs. If the supplied
+// limit is <= 0, then no count constraint will be applied and the server will
+// choose how many logs to return.
+func LimitCount(limit int) GetParam {
+ if limit < 0 {
+ limit = 0
+ }
+ return &limitCountGetParam{limit}
+}
+
+func (p *limitCountGetParam) applyGet(param *getParamsInst) { param.r.LogCount = int32(p.limit) }
+
+type nonContiguousGetParam struct{}
+
// NonContiguous returns a stream Get parameter that causes the Get request
// to allow non-contiguous records to be returned. By default, only contiguous
// records starting from the specific Index will be returned.
@@ -70,16 +128,27 @@ func (p *StreamGetParams) Limit(bytes, count int) *StreamGetParams {
// Log entries generally should not be missing, but may be if either the logs
// are still streaming (since they can be ingested out of order) or if a data
// loss or corruption occurs.
-func (p *StreamGetParams) NonContiguous() *StreamGetParams {
- p = p.clone()
- p.r.NonContiguous = true
- return p
-}
+func NonContiguous() GetParam { return nonContiguousGetParam{} }
-// State returns a stream Get parameter that causes the Get request to return
-// its stream state and log stream descriptor.
-func (p *StreamGetParams) State(stateP *LogStream) *StreamGetParams {
- p = p.clone()
- p.stateP = stateP
- return p
-}
+func (nonContiguousGetParam) applyGet(param *getParamsInst) { param.r.NonContiguous = true }
+
+type completeTailParam struct{}
+
+// Complete instructs the Tail call to retrieve a complete record.
+//
+// If frgmented, the resulting record will be manufactured from its composite
+// parts, and will not actually represent any single record in the log stream.
+// The time offset, prefix and stream indices, sequence number, and content will
+// be derived from the initial log entry in the composite set.
+//
+// If the log stream is a TEXT or BINARY stream, no behavior change will
+// occur, and the last log record will be returned.
+//
+// If the log stream is a DATAGRAM stream and the Tail record is parked partial,
+// additional log entries will be fetched via Get and the full log stream will
+// be assembled. If the partial datagram entry is the "last" in its sequeence,
+// the full datagram ending with it will be returned. If it's partial in the
+// middle of a sequence, the previous complete datagram will be returned.
+func Complete() TailParam { return completeTailParam{} }
+
+func (completeTailParam) applyTail(param *tailParamsInst) { param.complete = true }
« no previous file with comments | « logdog/client/coordinator/stream.go ('k') | logdog/client/coordinator/stream_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698