Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(300)

Unified Diff: milo/appengine/logs/logs.go

Issue 2796743004: Milo flex raw log viewer endpoint (Closed)
Patch Set: Fix module path Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: milo/appengine/logs/logs.go
diff --git a/milo/appengine/logs/logs.go b/milo/appengine/logs/logs.go
new file mode 100644
index 0000000000000000000000000000000000000000..3ba14bdc3895fcdfd8b54c6b73b4413c1df6f1fe
--- /dev/null
+++ b/milo/appengine/logs/logs.go
@@ -0,0 +1,151 @@
+package main
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/grpc/prpc"
+ "github.com/luci/luci-go/logdog/api/logpb"
+ "github.com/luci/luci-go/logdog/client/coordinator"
+ "github.com/luci/luci-go/logdog/common/fetcher"
+ "github.com/luci/luci-go/logdog/common/types"
+ "github.com/luci/luci-go/luci_config/common/cfgtypes"
+
+ "golang.org/x/net/context"
nodir 2017/04/29 00:59:45 3p imports should be before 1p
hinoka 2017/05/01 23:08:02 Done.
+)
+
+var errNoAuth = errors.New("no access")
+
+type streamInfo struct {
+ Project cfgtypes.ProjectName
+ Path types.StreamPath
+
+ // Client is the HTTP client to use for LogDog communication.
+ Client *coordinator.Client
+}
+
+const noStreamDelay = 5 * time.Second
+
+// coordinatorSource is a fetcher.Source implementation that uses the
+// Coordiantor API.
+type coordinatorSource struct {
+ sync.Mutex
+
+ stream *coordinator.Stream
+ tidx types.MessageIndex
+ tailFirst bool
+
+ streamState *coordinator.LogStream
+}
+
+func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogRequest) (
+ []*logpb.LogEntry, types.MessageIndex, error) {
+ s.Lock()
nodir 2017/04/29 00:59:45 FWIU, this serializes all requests to logdog. It i
hinoka 2017/05/01 23:08:02 Done.
+ defer s.Unlock()
+
+ params := append(make([]coordinator.GetParam, 0, 4),
+ coordinator.LimitBytes(int(req.Bytes)),
+ coordinator.LimitCount(req.Count),
+ coordinator.Index(req.Index),
+ )
+
+ // If we haven't terminated, use this opportunity to fetch/update our stream
+ // state.
+ var streamState coordinator.LogStream
+ reqStream := (s.streamState == nil || s.streamState.State.TerminalIndex < 0)
nodir 2017/04/29 00:59:45 nit: parens are unnecessary
hinoka 2017/05/01 23:08:02 Done.
+ if reqStream {
+ params = append(params, coordinator.WithState(&streamState))
nodir 2017/04/29 00:59:45 it seems that this variable controls whether or no
hinoka 2017/05/01 23:08:02 Done.
+ }
+
+ for {
+ logs, err := s.stream.Get(c, params...)
+ switch err {
+ case nil:
+ if reqStream {
+ s.streamState = &streamState
+ s.tidx = streamState.State.TerminalIndex
+ }
+ return logs, s.tidx, nil
+
+ case coordinator.ErrNoSuchStream:
+ log.Print("Stream does not exist. Sleeping pending registration.")
+
+ // Delay, interrupting if our Context is interrupted.
+ if tr := <-clock.After(c, noStreamDelay); tr.Incomplete() {
+ return nil, 0, tr.Err
+ }
+
+ default:
+ if err != nil {
nodir 2017/04/29 00:59:45 this is constant true because of `case nil` above,
hinoka 2017/05/01 23:08:02 Done.
+ return nil, 0, err
+ }
+ }
+ }
+}
+
+func logHandler(c context.Context, w http.ResponseWriter, host, path string) error {
+ // TODO(hinoka): Move this to luci-config.
+ if !(host == "luci-logdog.appspot.com" || host == "luci-logdog-dev.appspot.com") {
+ return fmt.Errorf("unknown host %s", host)
+ }
+ spath := strings.SplitN(path, "/", 2)
+ project := cfgtypes.ProjectName(spath[0])
+ streamPath := types.StreamPath(spath[1])
nodir 2017/04/29 00:59:45 this will crash the server if someone requests /lo
hinoka 2017/05/01 23:08:03 Done.
+
+ client := coordinator.NewClient(&prpc.Client{
+ C: &http.Client{
+ // TODO(hinoka): Once 712506 is resolved, figure out how to get auth.
nodir 2017/04/29 00:59:45 please write crbug.com/712506
hinoka 2017/05/01 23:08:02 Done.
+ Transport: http.DefaultTransport,
+ },
+ Host: host,
+ })
+ stream := client.Stream(project, streamPath)
+
+ // Pull stream information.
+ src := coordinatorSource{
nodir 2017/04/29 00:59:45 nit: consider inlining
hinoka 2017/05/01 23:08:02 Done.
+ stream: stream,
+ tidx: -1, // Must be set to probe for state.
+ }
+ f := fetcher.New(c, fetcher.Options{
+ Source: &src,
+ Index: types.MessageIndex(0),
+ Count: 0,
+ // Try to buffer as much as possible, with a large window, since this is
+ // basically a cloud-to-cloud connection.
+ BufferCount: 200,
+ BufferBytes: int64(4 * 1024 * 1024),
+ PrefetchFactor: 10,
+ })
+
+ for {
+ // Read out of the buffer. This _should_ be bottlenecked on the network
+ // connection between the Flex instance and the client, via Fprintf().
+ entry, err := f.NextLogEntry()
+ switch err {
+ case io.EOF:
+ break // We're done.
nodir 2017/04/29 00:59:45 i think this breaks the switch, not the loop you c
hinoka 2017/05/01 23:08:02 Oh you're right... I think "return nil" will work
+ case nil:
+ // Nothing
+ case coordinator.ErrNoAccess:
+ return errNoAuth // This will force a redirect
+ default:
+ return err
+ }
+ content := entry.GetText()
+ if content == nil {
+ break
+ }
+ for _, line := range content.Lines {
+ fmt.Fprint(w, line.Value)
+ fmt.Fprint(w, line.Delimiter)
+ }
+ }
+ return nil
+}

Powered by Google App Engine
This is Rietveld 408576698