| Index: logdog/client/coordinator/stream.go
|
| diff --git a/logdog/client/coordinator/stream.go b/logdog/client/coordinator/stream.go
|
| index b2f6a02b31055c2f2b5e070c3b8e6c8e7028c67a..7b7e927eca50878adc8353c91a070b147c67121b 100644
|
| --- a/logdog/client/coordinator/stream.go
|
| +++ b/logdog/client/coordinator/stream.go
|
| @@ -59,34 +59,38 @@ type LogStream struct {
|
| Path types.StreamPath
|
|
|
| // Desc is the log stream's descriptor.
|
| - Desc *logpb.LogStreamDescriptor
|
| + Desc logpb.LogStreamDescriptor
|
|
|
| // State is the stream's current state.
|
| - State *StreamState
|
| + State StreamState
|
| }
|
|
|
| -func loadLogStream(proj string, path types.StreamPath, s *logdog.LogStreamState, d *logpb.LogStreamDescriptor) *LogStream {
|
| +func loadLogStream(proj string, path types.StreamPath, s *logdog.LogStreamState, d *logpb.LogStreamDescriptor) (
|
| + *LogStream, error) {
|
| + switch {
|
| + case s == nil:
|
| + return nil, errors.New("missing required log state")
|
| + case d == nil:
|
| + return nil, errors.New("missing required descriptor")
|
| + }
|
| +
|
| ls := LogStream{
|
| Project: config.ProjectName(proj),
|
| Path: path,
|
| - Desc: d,
|
| - }
|
| - if s != nil {
|
| - st := StreamState{
|
| + Desc: *d,
|
| + State: StreamState{
|
| Created: s.Created.Time(),
|
| TerminalIndex: types.MessageIndex(s.TerminalIndex),
|
| Purged: s.Purged,
|
| - }
|
| - if a := s.Archive; a != nil {
|
| - st.Archived = true
|
| - st.ArchiveIndexURL = a.IndexUrl
|
| - st.ArchiveStreamURL = a.StreamUrl
|
| - st.ArchiveDataURL = a.DataUrl
|
| - }
|
| -
|
| - ls.State = &st
|
| + },
|
| + }
|
| + if a := s.Archive; a != nil {
|
| + ls.State.Archived = true
|
| + ls.State.ArchiveIndexURL = a.IndexUrl
|
| + ls.State.ArchiveStreamURL = a.StreamUrl
|
| + ls.State.ArchiveDataURL = a.DataUrl
|
| }
|
| - return &ls
|
| + return &ls, nil
|
| }
|
|
|
| // Stream is an interface to Coordinator stream-level commands. It is bound to
|
| @@ -119,25 +123,33 @@ func (s *Stream) State(ctx context.Context) (*LogStream, error) {
|
| if desc := resp.Desc; desc != nil {
|
| path = desc.Path()
|
| }
|
| - return loadLogStream(resp.Project, path, resp.State, resp.Desc), nil
|
| +
|
| + st, err := loadLogStream(resp.Project, path, resp.State, resp.Desc)
|
| + if err != nil {
|
| + return nil, fmt.Errorf("failed to load stream state: %v", err)
|
| + }
|
| + return st, nil
|
| }
|
|
|
| // Get retrieves log stream entries from the Coordinator. The supplied
|
| // parameters shape which entries are requested and what information is
|
| // returned.
|
| -func (s *Stream) Get(ctx context.Context, p *StreamGetParams) ([]*logpb.LogEntry, error) {
|
| - if p == nil {
|
| - p = &StreamGetParams{}
|
| +func (s *Stream) Get(ctx context.Context, params ...GetParam) ([]*logpb.LogEntry, error) {
|
| + p := getParamsInst{
|
| + r: logdog.GetRequest{
|
| + Project: string(s.project),
|
| + Path: string(s.path),
|
| + },
|
| + }
|
| + for _, param := range params {
|
| + param.applyGet(&p)
|
| }
|
|
|
| - req := p.r
|
| - req.Project = string(s.project)
|
| - req.Path = string(s.path)
|
| if p.stateP != nil {
|
| - req.State = true
|
| + p.r.State = true
|
| }
|
|
|
| - resp, err := s.c.C.Get(ctx, &req)
|
| + resp, err := s.c.C.Get(ctx, &p.r)
|
| if err != nil {
|
| return nil, normalizeError(err)
|
| }
|
| @@ -150,20 +162,22 @@ func (s *Stream) Get(ctx context.Context, p *StreamGetParams) ([]*logpb.LogEntry
|
| // Tail performs a tail call, returning the last log entry in the stream. If
|
| // stateP is not nil, the stream's state will be requested and loaded into the
|
| // variable.
|
| -func (s *Stream) Tail(ctx context.Context, stateP *LogStream) (*logpb.LogEntry, error) {
|
| - req := logdog.TailRequest{
|
| - Project: string(s.project),
|
| - Path: string(s.path),
|
| +func (s *Stream) Tail(ctx context.Context, params ...TailParam) (*logpb.LogEntry, error) {
|
| + p := tailParamsInst{
|
| + r: logdog.TailRequest{
|
| + Project: string(s.project),
|
| + Path: string(s.path),
|
| + },
|
| }
|
| - if stateP != nil {
|
| - req.State = true
|
| + for _, param := range params {
|
| + param.applyTail(&p)
|
| }
|
|
|
| - resp, err := s.c.C.Tail(ctx, &req)
|
| + resp, err := s.c.C.Tail(ctx, &p.r)
|
| if err != nil {
|
| return nil, normalizeError(err)
|
| }
|
| - if err := loadStatePointer(stateP, resp); err != nil {
|
| + if err := loadStatePointer(p.stateP, resp); err != nil {
|
| return nil, err
|
| }
|
|
|
| @@ -172,25 +186,137 @@ func (s *Stream) Tail(ctx context.Context, stateP *LogStream) (*logpb.LogEntry,
|
| return nil, nil
|
|
|
| case 1:
|
| - return resp.Logs[0], nil
|
| + le := resp.Logs[0]
|
| + if p.complete {
|
| + if dg := le.GetDatagram(); dg != nil && dg.Partial != nil {
|
| + // This is a partial; datagram. Fetch and assemble the full datagram.
|
| + return s.fetchFullDatagram(ctx, le, true)
|
| + }
|
| + }
|
| + return le, nil
|
|
|
| default:
|
| return nil, fmt.Errorf("tail call returned %d logs", len(resp.Logs))
|
| }
|
| }
|
|
|
| +func (s *Stream) fetchFullDatagram(ctx context.Context, le *logpb.LogEntry, fetchIfMid bool) (*logpb.LogEntry, error) {
|
| + // Re-evaluate our partial state.
|
| + dg := le.GetDatagram()
|
| + if dg == nil {
|
| + return nil, fmt.Errorf("entry is not a datagram")
|
| + }
|
| +
|
| + p := dg.Partial
|
| + if p == nil {
|
| + // Not partial, return the full message.
|
| + return le, nil
|
| + }
|
| +
|
| + if uint64(p.Index) > le.StreamIndex {
|
| + // Something is wrong. The datagram identifies itself as an index in the
|
| + // stream that exceeds the actual number of entries in the stream.
|
| + return nil, fmt.Errorf("malformed partial datagram; index (%d) > stream index (%d)",
|
| + p.Index, le.StreamIndex)
|
| + }
|
| +
|
| + if !p.Last {
|
| + // This is the last log entry (b/c we Tail'd), but it is part of a larger
|
| + // datagram. We can't fetch the full datagram since presumably the remainder
|
| + // doesn't exist. Therefore, fetch the previous datagram.
|
| + switch {
|
| + case !fetchIfMid:
|
| + return nil, fmt.Errorf("mid-fragment partial datagram not allowed")
|
| +
|
| + case uint64(p.Index) == le.StreamIndex:
|
| + // If we equal the stream index, then we are the first datagram in the
|
| + // stream, so return nil.
|
| + return nil, nil
|
| +
|
| + default:
|
| + // Perform a Get on the previous entry in the stream.
|
| + prevIdx := le.StreamIndex - uint64(p.Index) - 1
|
| + logs, err := s.Get(ctx, Index(types.MessageIndex(prevIdx)), LimitCount(1))
|
| + if err != nil {
|
| + return nil, fmt.Errorf("failed to get previous datagram (%d): %s", prevIdx, err)
|
| + }
|
| +
|
| + if len(logs) != 1 || logs[0].StreamIndex != prevIdx {
|
| + return nil, fmt.Errorf("previous datagram (%d) not returned", prevIdx)
|
| + }
|
| + if le, err = s.fetchFullDatagram(ctx, logs[0], false); err != nil {
|
| + return nil, fmt.Errorf("failed to recurse to previous datagram (%d): %s", prevIdx, err)
|
| + }
|
| + return le, nil
|
| + }
|
| + }
|
| +
|
| + // If this is "Last", but it's also index 0, then it is a partial datagram
|
| + // with one entry. Weird ... but whatever.
|
| + if p.Index == 0 {
|
| + dg.Partial = nil
|
| + return le, nil
|
| + }
|
| +
|
| + // Get the intermediate logs.
|
| + startIdx := types.MessageIndex(le.StreamIndex - uint64(p.Index))
|
| + count := int(p.Index)
|
| + logs, err := s.Get(ctx, Index(startIdx), LimitCount(count))
|
| + if err != nil {
|
| + return nil, fmt.Errorf("failed to get intermediate logs [%d .. %d]: %s",
|
| + startIdx, startIdx+types.MessageIndex(count)-1, err)
|
| + }
|
| +
|
| + if len(logs) < count {
|
| + return nil, fmt.Errorf("incomplete intermediate logs results (%d < %d)", len(logs), count)
|
| + }
|
| + logs = append(logs[:count], le)
|
| +
|
| + // Construct the full datagram.
|
| + aggregate := make([]byte, 0, int(p.Size))
|
| + for i, ple := range logs {
|
| + chunkDg := ple.GetDatagram()
|
| + if chunkDg == nil {
|
| + return nil, fmt.Errorf("intermediate datagram #%d is not a datagram", i)
|
| + }
|
| + chunkP := chunkDg.Partial
|
| + if chunkP == nil {
|
| + return nil, fmt.Errorf("intermediate datagram #%d is not partial", i)
|
| + }
|
| + if int(chunkP.Index) != i {
|
| + return nil, fmt.Errorf("intermediate datagram #%d does not have a contiguous index (%d)", i, chunkP.Index)
|
| + }
|
| + if chunkP.Size != p.Size {
|
| + return nil, fmt.Errorf("inconsistent datagram size (%d != %d)", chunkP.Size, p.Size)
|
| + }
|
| + if uint64(len(aggregate))+uint64(len(chunkDg.Data)) > p.Size {
|
| + return nil, fmt.Errorf("appending chunk data would exceed the declared size (%d > %d)",
|
| + len(aggregate)+len(chunkDg.Data), p.Size)
|
| + }
|
| + aggregate = append(aggregate, chunkDg.Data...)
|
| + }
|
| +
|
| + if uint64(len(aggregate)) != p.Size {
|
| + return nil, fmt.Errorf("reassembled datagram length (%d) differs from declared length (%d)", len(aggregate), p.Size)
|
| + }
|
| +
|
| + le = logs[0]
|
| + dg = le.GetDatagram()
|
| + dg.Data = aggregate
|
| + dg.Partial = nil
|
| + return le, nil
|
| +}
|
| +
|
| func loadStatePointer(stateP *LogStream, resp *logdog.GetResponse) error {
|
| if stateP == nil {
|
| return nil
|
| }
|
|
|
| - if resp.Desc == nil {
|
| - return errors.New("Requested descriptor was not returned")
|
| - }
|
| - if resp.State == nil {
|
| - return errors.New("Requested state was not returned")
|
| + ls, err := loadLogStream(resp.Project, resp.Desc.Path(), resp.State, resp.Desc)
|
| + if err != nil {
|
| + return fmt.Errorf("failde to load stream state: %v", err)
|
| }
|
| - ls := loadLogStream(resp.Project, resp.Desc.Path(), resp.State, resp.Desc)
|
| +
|
| *stateP = *ls
|
| return nil
|
| }
|
|
|