| Index: milo/appengine/logdog/build.go
|
| diff --git a/milo/appengine/logdog/build.go b/milo/appengine/logdog/build.go
|
| index a08e20d95ec7777102a0b9fe7a93025679ceb8bf..9a420cae11135331f381e7327e6537dc30167e0b 100644
|
| --- a/milo/appengine/logdog/build.go
|
| +++ b/milo/appengine/logdog/build.go
|
| @@ -17,9 +17,8 @@ import (
|
| "github.com/luci/luci-go/common/proto/google"
|
| miloProto "github.com/luci/luci-go/common/proto/milo"
|
| "github.com/luci/luci-go/grpc/grpcutil"
|
| - "github.com/luci/luci-go/grpc/prpc"
|
| - "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
|
| "github.com/luci/luci-go/logdog/api/logpb"
|
| + "github.com/luci/luci-go/logdog/client/coordinator"
|
| "github.com/luci/luci-go/logdog/common/types"
|
| "github.com/luci/luci-go/milo/api/resp"
|
| "github.com/luci/luci-go/milo/appengine/logdog/internal"
|
| @@ -52,7 +51,7 @@ type annotationStreamRequest struct {
|
| path types.StreamPath
|
|
|
| // logDogClient is the HTTP client to use for LogDog communication.
|
| - logDogClient http.Client
|
| + logDogClient *coordinator.Client
|
|
|
| // cs is the unmarshalled annotation stream Step and associated data.
|
| cs internal.CachedStep
|
| @@ -122,21 +121,17 @@ func (as *annotationStreamRequest) load(c context.Context) error {
|
| }
|
|
|
| // Load from LogDog directly.
|
| - client := logdog.NewLogsPRPCClient(&prpc.Client{
|
| - C: &as.logDogClient,
|
| - Host: as.host,
|
| - })
|
| -
|
| log.Fields{
|
| "project": as.project,
|
| "path": as.path,
|
| "host": as.host,
|
| }.Infof(c, "Making tail request to LogDog to fetch annotation stream.")
|
| - resp, err := client.Tail(c, &logdog.TailRequest{
|
| - Project: string(as.project),
|
| - Path: string(as.path),
|
| - State: true,
|
| - })
|
| +
|
| + var (
|
| + state coordinator.LogStream
|
| + stream = as.logDogClient.Stream(as.project, as.path)
|
| + )
|
| + le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Complete())
|
| switch code := grpcutil.Code(err); code {
|
| case codes.OK:
|
| break
|
| @@ -162,59 +157,34 @@ func (as *annotationStreamRequest) load(c context.Context) error {
|
|
|
| // Make sure that this is an annotation stream.
|
| switch {
|
| - case resp.Desc.ContentType != miloProto.ContentTypeAnnotations:
|
| + case state.Desc.ContentType != miloProto.ContentTypeAnnotations:
|
| return &miloerror.Error{
|
| Message: "Requested stream is not a Milo annotation protobuf",
|
| Code: http.StatusBadRequest,
|
| }
|
|
|
| - case resp.Desc.StreamType != logpb.StreamType_DATAGRAM:
|
| + case state.Desc.StreamType != logpb.StreamType_DATAGRAM:
|
| return &miloerror.Error{
|
| Message: "Requested stream is not a datagram stream",
|
| Code: http.StatusBadRequest,
|
| }
|
|
|
| - case len(resp.Logs) == 0:
|
| + case le == nil:
|
| // No annotation stream data, so render a minimal page.
|
| return nil
|
| }
|
|
|
| // Get the last log entry in the stream. In reality, this will be index 0,
|
| // since the "Tail" call should only return one log entry.
|
| - latestStream := resp.Logs[len(resp.Logs)-1]
|
| - dg := latestStream.GetDatagram()
|
| - switch {
|
| - case dg == nil:
|
| + //
|
| + // Because we supplied the "Complete" flag to Tail and suceeded, this datagram
|
| + // will be complete even if its source datagram(s) are fragments.
|
| + dg := le.GetDatagram()
|
| + if dg == nil {
|
| return &miloerror.Error{
|
| Message: "Datagram stream does not have datagram data",
|
| Code: http.StatusInternalServerError,
|
| }
|
| -
|
| - case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last):
|
| - // LogDog splits large datagrams into consecutive fragments. If the
|
| - // annotation state is fragmented, a reconstruction algorithm will have to
|
| - // be employed here to build the full datagram before processing.
|
| - //
|
| - // At the moment, no annotation streams are expected to be anywhere close to
|
| - // this large, so we're going to handle this case by erroring. A
|
| - // reconstruction algorithm would look like:
|
| - // 1) "Tail" to get the latest datagram, identify it as partial.
|
| - // 1a) Perform a bounds check on the total datagram size to ensure that it
|
| - // can be safely reconstructed.
|
| - // 2) Determine if it's the last partial index. If not, then the latest
|
| - // datagram is incomplete. Determine our initial datagram's stream index
|
| - // the by subtracting the partial index from this message's stream index.
|
| - // 2a) If this datagram index is "0", the first datagram in the stream is
|
| - // partial and all of the data isn't here, so treat this as "no data".
|
| - // 2b) Otherwise, goto (1), using "Get" request on the datagram index minus
|
| - // one to get the previous datagram.
|
| - // 3) Issue a "Get" request for our initial datagram index through the index
|
| - // preceding ours.
|
| - // 4) Reassemble the binary data from the full set of datagrams.
|
| - return &miloerror.Error{
|
| - Message: "Partial datagram streams are not supported yet",
|
| - Code: http.StatusNotImplemented,
|
| - }
|
| }
|
|
|
| // Attempt to decode the Step protobuf.
|
| @@ -249,7 +219,7 @@ func (as *annotationStreamRequest) load(c context.Context) error {
|
| // Build our CachedStep.
|
| as.cs = internal.CachedStep{
|
| Step: &step,
|
| - Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamIndex == uint64(resp.State.TerminalIndex)),
|
| + Finished: (state.State.TerminalIndex >= 0 && le.StreamIndex == uint64(state.State.TerminalIndex)),
|
| }
|
|
|
| // Annotee is apparently not putting an ended time on some annotation protos.
|
|
|