Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(181)

Unified Diff: milo/appengine/logdog/build.go

Issue 2341113002: Update Coordinator client, add datagram assembly. (Closed)
Patch Set: Comments. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « logdog/client/coordinator/stream_test.go ('k') | milo/appengine/logdog/http.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: milo/appengine/logdog/build.go
diff --git a/milo/appengine/logdog/build.go b/milo/appengine/logdog/build.go
index a08e20d95ec7777102a0b9fe7a93025679ceb8bf..9a420cae11135331f381e7327e6537dc30167e0b 100644
--- a/milo/appengine/logdog/build.go
+++ b/milo/appengine/logdog/build.go
@@ -17,9 +17,8 @@ import (
"github.com/luci/luci-go/common/proto/google"
miloProto "github.com/luci/luci-go/common/proto/milo"
"github.com/luci/luci-go/grpc/grpcutil"
- "github.com/luci/luci-go/grpc/prpc"
- "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
"github.com/luci/luci-go/logdog/api/logpb"
+ "github.com/luci/luci-go/logdog/client/coordinator"
"github.com/luci/luci-go/logdog/common/types"
"github.com/luci/luci-go/milo/api/resp"
"github.com/luci/luci-go/milo/appengine/logdog/internal"
@@ -52,7 +51,7 @@ type annotationStreamRequest struct {
path types.StreamPath
// logDogClient is the HTTP client to use for LogDog communication.
- logDogClient http.Client
+ logDogClient *coordinator.Client
// cs is the unmarshalled annotation stream Step and associated data.
cs internal.CachedStep
@@ -122,21 +121,17 @@ func (as *annotationStreamRequest) load(c context.Context) error {
}
// Load from LogDog directly.
- client := logdog.NewLogsPRPCClient(&prpc.Client{
- C: &as.logDogClient,
- Host: as.host,
- })
-
log.Fields{
"project": as.project,
"path": as.path,
"host": as.host,
}.Infof(c, "Making tail request to LogDog to fetch annotation stream.")
- resp, err := client.Tail(c, &logdog.TailRequest{
- Project: string(as.project),
- Path: string(as.path),
- State: true,
- })
+
+ var (
+ state coordinator.LogStream
+ stream = as.logDogClient.Stream(as.project, as.path)
+ )
+ le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Complete())
switch code := grpcutil.Code(err); code {
case codes.OK:
break
@@ -162,59 +157,34 @@ func (as *annotationStreamRequest) load(c context.Context) error {
// Make sure that this is an annotation stream.
switch {
- case resp.Desc.ContentType != miloProto.ContentTypeAnnotations:
+ case state.Desc.ContentType != miloProto.ContentTypeAnnotations:
return &miloerror.Error{
Message: "Requested stream is not a Milo annotation protobuf",
Code: http.StatusBadRequest,
}
- case resp.Desc.StreamType != logpb.StreamType_DATAGRAM:
+ case state.Desc.StreamType != logpb.StreamType_DATAGRAM:
return &miloerror.Error{
Message: "Requested stream is not a datagram stream",
Code: http.StatusBadRequest,
}
- case len(resp.Logs) == 0:
+ case le == nil:
// No annotation stream data, so render a minimal page.
return nil
}
// Get the last log entry in the stream. In reality, this will be index 0,
// since the "Tail" call should only return one log entry.
- latestStream := resp.Logs[len(resp.Logs)-1]
- dg := latestStream.GetDatagram()
- switch {
- case dg == nil:
+ //
+ // Because we supplied the "Complete" flag to Tail and suceeded, this datagram
+ // will be complete even if its source datagram(s) are fragments.
+ dg := le.GetDatagram()
+ if dg == nil {
return &miloerror.Error{
Message: "Datagram stream does not have datagram data",
Code: http.StatusInternalServerError,
}
-
- case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last):
- // LogDog splits large datagrams into consecutive fragments. If the
- // annotation state is fragmented, a reconstruction algorithm will have to
- // be employed here to build the full datagram before processing.
- //
- // At the moment, no annotation streams are expected to be anywhere close to
- // this large, so we're going to handle this case by erroring. A
- // reconstruction algorithm would look like:
- // 1) "Tail" to get the latest datagram, identify it as partial.
- // 1a) Perform a bounds check on the total datagram size to ensure that it
- // can be safely reconstructed.
- // 2) Determine if it's the last partial index. If not, then the latest
- // datagram is incomplete. Determine our initial datagram's stream index
- // the by subtracting the partial index from this message's stream index.
- // 2a) If this datagram index is "0", the first datagram in the stream is
- // partial and all of the data isn't here, so treat this as "no data".
- // 2b) Otherwise, goto (1), using "Get" request on the datagram index minus
- // one to get the previous datagram.
- // 3) Issue a "Get" request for our initial datagram index through the index
- // preceding ours.
- // 4) Reassemble the binary data from the full set of datagrams.
- return &miloerror.Error{
- Message: "Partial datagram streams are not supported yet",
- Code: http.StatusNotImplemented,
- }
}
// Attempt to decode the Step protobuf.
@@ -249,7 +219,7 @@ func (as *annotationStreamRequest) load(c context.Context) error {
// Build our CachedStep.
as.cs = internal.CachedStep{
Step: &step,
- Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamIndex == uint64(resp.State.TerminalIndex)),
+ Finished: (state.State.TerminalIndex >= 0 && le.StreamIndex == uint64(state.State.TerminalIndex)),
}
// Annotee is apparently not putting an ended time on some annotation protos.
« no previous file with comments | « logdog/client/coordinator/stream_test.go ('k') | milo/appengine/logdog/http.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698