Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(710)

Unified Diff: logdog/client/coordinator/stream.go

Issue 2341113002: Update Coordinator client, add datagram assembly. (Closed)
Patch Set: Comments. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « logdog/client/coordinator/query_test.go ('k') | logdog/client/coordinator/stream_params.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: logdog/client/coordinator/stream.go
diff --git a/logdog/client/coordinator/stream.go b/logdog/client/coordinator/stream.go
index b2f6a02b31055c2f2b5e070c3b8e6c8e7028c67a..7b7e927eca50878adc8353c91a070b147c67121b 100644
--- a/logdog/client/coordinator/stream.go
+++ b/logdog/client/coordinator/stream.go
@@ -59,34 +59,38 @@ type LogStream struct {
Path types.StreamPath
// Desc is the log stream's descriptor.
- Desc *logpb.LogStreamDescriptor
+ Desc logpb.LogStreamDescriptor
// State is the stream's current state.
- State *StreamState
+ State StreamState
}
-func loadLogStream(proj string, path types.StreamPath, s *logdog.LogStreamState, d *logpb.LogStreamDescriptor) *LogStream {
+func loadLogStream(proj string, path types.StreamPath, s *logdog.LogStreamState, d *logpb.LogStreamDescriptor) (
+ *LogStream, error) {
+ switch {
+ case s == nil:
+ return nil, errors.New("missing required log state")
+ case d == nil:
+ return nil, errors.New("missing required descriptor")
+ }
+
ls := LogStream{
Project: config.ProjectName(proj),
Path: path,
- Desc: d,
- }
- if s != nil {
- st := StreamState{
+ Desc: *d,
+ State: StreamState{
Created: s.Created.Time(),
TerminalIndex: types.MessageIndex(s.TerminalIndex),
Purged: s.Purged,
- }
- if a := s.Archive; a != nil {
- st.Archived = true
- st.ArchiveIndexURL = a.IndexUrl
- st.ArchiveStreamURL = a.StreamUrl
- st.ArchiveDataURL = a.DataUrl
- }
-
- ls.State = &st
+ },
+ }
+ if a := s.Archive; a != nil {
+ ls.State.Archived = true
+ ls.State.ArchiveIndexURL = a.IndexUrl
+ ls.State.ArchiveStreamURL = a.StreamUrl
+ ls.State.ArchiveDataURL = a.DataUrl
}
- return &ls
+ return &ls, nil
}
// Stream is an interface to Coordinator stream-level commands. It is bound to
@@ -119,25 +123,33 @@ func (s *Stream) State(ctx context.Context) (*LogStream, error) {
if desc := resp.Desc; desc != nil {
path = desc.Path()
}
- return loadLogStream(resp.Project, path, resp.State, resp.Desc), nil
+
+ st, err := loadLogStream(resp.Project, path, resp.State, resp.Desc)
+ if err != nil {
+ return nil, fmt.Errorf("failed to load stream state: %v", err)
+ }
+ return st, nil
}
// Get retrieves log stream entries from the Coordinator. The supplied
// parameters shape which entries are requested and what information is
// returned.
-func (s *Stream) Get(ctx context.Context, p *StreamGetParams) ([]*logpb.LogEntry, error) {
- if p == nil {
- p = &StreamGetParams{}
+func (s *Stream) Get(ctx context.Context, params ...GetParam) ([]*logpb.LogEntry, error) {
+ p := getParamsInst{
+ r: logdog.GetRequest{
+ Project: string(s.project),
+ Path: string(s.path),
+ },
+ }
+ for _, param := range params {
+ param.applyGet(&p)
}
- req := p.r
- req.Project = string(s.project)
- req.Path = string(s.path)
if p.stateP != nil {
- req.State = true
+ p.r.State = true
}
- resp, err := s.c.C.Get(ctx, &req)
+ resp, err := s.c.C.Get(ctx, &p.r)
if err != nil {
return nil, normalizeError(err)
}
@@ -150,20 +162,22 @@ func (s *Stream) Get(ctx context.Context, p *StreamGetParams) ([]*logpb.LogEntry
// Tail performs a tail call, returning the last log entry in the stream. If
// stateP is not nil, the stream's state will be requested and loaded into the
// variable.
-func (s *Stream) Tail(ctx context.Context, stateP *LogStream) (*logpb.LogEntry, error) {
- req := logdog.TailRequest{
- Project: string(s.project),
- Path: string(s.path),
+func (s *Stream) Tail(ctx context.Context, params ...TailParam) (*logpb.LogEntry, error) {
+ p := tailParamsInst{
+ r: logdog.TailRequest{
+ Project: string(s.project),
+ Path: string(s.path),
+ },
}
- if stateP != nil {
- req.State = true
+ for _, param := range params {
+ param.applyTail(&p)
}
- resp, err := s.c.C.Tail(ctx, &req)
+ resp, err := s.c.C.Tail(ctx, &p.r)
if err != nil {
return nil, normalizeError(err)
}
- if err := loadStatePointer(stateP, resp); err != nil {
+ if err := loadStatePointer(p.stateP, resp); err != nil {
return nil, err
}
@@ -172,25 +186,137 @@ func (s *Stream) Tail(ctx context.Context, stateP *LogStream) (*logpb.LogEntry,
return nil, nil
case 1:
- return resp.Logs[0], nil
+ le := resp.Logs[0]
+ if p.complete {
+ if dg := le.GetDatagram(); dg != nil && dg.Partial != nil {
+ // This is a partial; datagram. Fetch and assemble the full datagram.
+ return s.fetchFullDatagram(ctx, le, true)
+ }
+ }
+ return le, nil
default:
return nil, fmt.Errorf("tail call returned %d logs", len(resp.Logs))
}
}
+func (s *Stream) fetchFullDatagram(ctx context.Context, le *logpb.LogEntry, fetchIfMid bool) (*logpb.LogEntry, error) {
+ // Re-evaluate our partial state.
+ dg := le.GetDatagram()
+ if dg == nil {
+ return nil, fmt.Errorf("entry is not a datagram")
+ }
+
+ p := dg.Partial
+ if p == nil {
+ // Not partial, return the full message.
+ return le, nil
+ }
+
+ if uint64(p.Index) > le.StreamIndex {
+ // Something is wrong. The datagram identifies itself as an index in the
+ // stream that exceeds the actual number of entries in the stream.
+ return nil, fmt.Errorf("malformed partial datagram; index (%d) > stream index (%d)",
+ p.Index, le.StreamIndex)
+ }
+
+ if !p.Last {
+ // This is the last log entry (b/c we Tail'd), but it is part of a larger
+ // datagram. We can't fetch the full datagram since presumably the remainder
+ // doesn't exist. Therefore, fetch the previous datagram.
+ switch {
+ case !fetchIfMid:
+ return nil, fmt.Errorf("mid-fragment partial datagram not allowed")
+
+ case uint64(p.Index) == le.StreamIndex:
+ // If we equal the stream index, then we are the first datagram in the
+ // stream, so return nil.
+ return nil, nil
+
+ default:
+ // Perform a Get on the previous entry in the stream.
+ prevIdx := le.StreamIndex - uint64(p.Index) - 1
+ logs, err := s.Get(ctx, Index(types.MessageIndex(prevIdx)), LimitCount(1))
+ if err != nil {
+ return nil, fmt.Errorf("failed to get previous datagram (%d): %s", prevIdx, err)
+ }
+
+ if len(logs) != 1 || logs[0].StreamIndex != prevIdx {
+ return nil, fmt.Errorf("previous datagram (%d) not returned", prevIdx)
+ }
+ if le, err = s.fetchFullDatagram(ctx, logs[0], false); err != nil {
+ return nil, fmt.Errorf("failed to recurse to previous datagram (%d): %s", prevIdx, err)
+ }
+ return le, nil
+ }
+ }
+
+ // If this is "Last", but it's also index 0, then it is a partial datagram
+ // with one entry. Weird ... but whatever.
+ if p.Index == 0 {
+ dg.Partial = nil
+ return le, nil
+ }
+
+ // Get the intermediate logs.
+ startIdx := types.MessageIndex(le.StreamIndex - uint64(p.Index))
+ count := int(p.Index)
+ logs, err := s.Get(ctx, Index(startIdx), LimitCount(count))
+ if err != nil {
+ return nil, fmt.Errorf("failed to get intermediate logs [%d .. %d]: %s",
+ startIdx, startIdx+types.MessageIndex(count)-1, err)
+ }
+
+ if len(logs) < count {
+ return nil, fmt.Errorf("incomplete intermediate logs results (%d < %d)", len(logs), count)
+ }
+ logs = append(logs[:count], le)
+
+ // Construct the full datagram.
+ aggregate := make([]byte, 0, int(p.Size))
+ for i, ple := range logs {
+ chunkDg := ple.GetDatagram()
+ if chunkDg == nil {
+ return nil, fmt.Errorf("intermediate datagram #%d is not a datagram", i)
+ }
+ chunkP := chunkDg.Partial
+ if chunkP == nil {
+ return nil, fmt.Errorf("intermediate datagram #%d is not partial", i)
+ }
+ if int(chunkP.Index) != i {
+ return nil, fmt.Errorf("intermediate datagram #%d does not have a contiguous index (%d)", i, chunkP.Index)
+ }
+ if chunkP.Size != p.Size {
+ return nil, fmt.Errorf("inconsistent datagram size (%d != %d)", chunkP.Size, p.Size)
+ }
+ if uint64(len(aggregate))+uint64(len(chunkDg.Data)) > p.Size {
+ return nil, fmt.Errorf("appending chunk data would exceed the declared size (%d > %d)",
+ len(aggregate)+len(chunkDg.Data), p.Size)
+ }
+ aggregate = append(aggregate, chunkDg.Data...)
+ }
+
+ if uint64(len(aggregate)) != p.Size {
+ return nil, fmt.Errorf("reassembled datagram length (%d) differs from declared length (%d)", len(aggregate), p.Size)
+ }
+
+ le = logs[0]
+ dg = le.GetDatagram()
+ dg.Data = aggregate
+ dg.Partial = nil
+ return le, nil
+}
+
func loadStatePointer(stateP *LogStream, resp *logdog.GetResponse) error {
if stateP == nil {
return nil
}
- if resp.Desc == nil {
- return errors.New("Requested descriptor was not returned")
- }
- if resp.State == nil {
- return errors.New("Requested state was not returned")
+ ls, err := loadLogStream(resp.Project, resp.Desc.Path(), resp.State, resp.Desc)
+ if err != nil {
+ return fmt.Errorf("failde to load stream state: %v", err)
}
- ls := loadLogStream(resp.Project, resp.Desc.Path(), resp.State, resp.Desc)
+
*stateP = *ls
return nil
}
« no previous file with comments | « logdog/client/coordinator/query_test.go ('k') | logdog/client/coordinator/stream_params.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698