Chromium Code Reviews| Index: milo/appengine/logs/logs.go |
| diff --git a/milo/appengine/logs/logs.go b/milo/appengine/logs/logs.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..3ba14bdc3895fcdfd8b54c6b73b4413c1df6f1fe |
| --- /dev/null |
| +++ b/milo/appengine/logs/logs.go |
| @@ -0,0 +1,151 @@ |
| +package main |
| + |
| +import ( |
| + "errors" |
| + "fmt" |
| + "io" |
| + "log" |
| + "net/http" |
| + "strings" |
| + "sync" |
| + "time" |
| + |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/grpc/prpc" |
| + "github.com/luci/luci-go/logdog/api/logpb" |
| + "github.com/luci/luci-go/logdog/client/coordinator" |
| + "github.com/luci/luci-go/logdog/common/fetcher" |
| + "github.com/luci/luci-go/logdog/common/types" |
| + "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| + |
| + "golang.org/x/net/context" |
|
nodir
2017/04/29 00:59:45
3p imports should be before 1p
hinoka
2017/05/01 23:08:02
Done.
|
| +) |
| + |
| +var errNoAuth = errors.New("no access") |
| + |
| +type streamInfo struct { |
| + Project cfgtypes.ProjectName |
| + Path types.StreamPath |
| + |
| + // Client is the HTTP client to use for LogDog communication. |
| + Client *coordinator.Client |
| +} |
| + |
| +const noStreamDelay = 5 * time.Second |
| + |
| +// coordinatorSource is a fetcher.Source implementation that uses the |
| +// Coordiantor API. |
| +type coordinatorSource struct { |
| + sync.Mutex |
| + |
| + stream *coordinator.Stream |
| + tidx types.MessageIndex |
| + tailFirst bool |
| + |
| + streamState *coordinator.LogStream |
| +} |
| + |
| +func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogRequest) ( |
| + []*logpb.LogEntry, types.MessageIndex, error) { |
| + s.Lock() |
|
nodir
2017/04/29 00:59:45
FWIU, this serializes all requests to logdog. It i
hinoka
2017/05/01 23:08:02
Done.
|
| + defer s.Unlock() |
| + |
| + params := append(make([]coordinator.GetParam, 0, 4), |
| + coordinator.LimitBytes(int(req.Bytes)), |
| + coordinator.LimitCount(req.Count), |
| + coordinator.Index(req.Index), |
| + ) |
| + |
| + // If we haven't terminated, use this opportunity to fetch/update our stream |
| + // state. |
| + var streamState coordinator.LogStream |
| + reqStream := (s.streamState == nil || s.streamState.State.TerminalIndex < 0) |
|
nodir
2017/04/29 00:59:45
nit: parens are unnecessary
hinoka
2017/05/01 23:08:02
Done.
|
| + if reqStream { |
| + params = append(params, coordinator.WithState(&streamState)) |
|
nodir
2017/04/29 00:59:45
it seems that this variable controls whether or no
hinoka
2017/05/01 23:08:02
Done.
|
| + } |
| + |
| + for { |
| + logs, err := s.stream.Get(c, params...) |
| + switch err { |
| + case nil: |
| + if reqStream { |
| + s.streamState = &streamState |
| + s.tidx = streamState.State.TerminalIndex |
| + } |
| + return logs, s.tidx, nil |
| + |
| + case coordinator.ErrNoSuchStream: |
| + log.Print("Stream does not exist. Sleeping pending registration.") |
| + |
| + // Delay, interrupting if our Context is interrupted. |
| + if tr := <-clock.After(c, noStreamDelay); tr.Incomplete() { |
| + return nil, 0, tr.Err |
| + } |
| + |
| + default: |
| + if err != nil { |
|
nodir
2017/04/29 00:59:45
this is constant true because of `case nil` above,
hinoka
2017/05/01 23:08:02
Done.
|
| + return nil, 0, err |
| + } |
| + } |
| + } |
| +} |
| + |
| +func logHandler(c context.Context, w http.ResponseWriter, host, path string) error { |
| + // TODO(hinoka): Move this to luci-config. |
| + if !(host == "luci-logdog.appspot.com" || host == "luci-logdog-dev.appspot.com") { |
| + return fmt.Errorf("unknown host %s", host) |
| + } |
| + spath := strings.SplitN(path, "/", 2) |
| + project := cfgtypes.ProjectName(spath[0]) |
| + streamPath := types.StreamPath(spath[1]) |
|
nodir
2017/04/29 00:59:45
this will crash the server if someone requests /lo
hinoka
2017/05/01 23:08:03
Done.
|
| + |
| + client := coordinator.NewClient(&prpc.Client{ |
| + C: &http.Client{ |
| + // TODO(hinoka): Once 712506 is resolved, figure out how to get auth. |
|
nodir
2017/04/29 00:59:45
please write crbug.com/712506
hinoka
2017/05/01 23:08:02
Done.
|
| + Transport: http.DefaultTransport, |
| + }, |
| + Host: host, |
| + }) |
| + stream := client.Stream(project, streamPath) |
| + |
| + // Pull stream information. |
| + src := coordinatorSource{ |
|
nodir
2017/04/29 00:59:45
nit: consider inlining
hinoka
2017/05/01 23:08:02
Done.
|
| + stream: stream, |
| + tidx: -1, // Must be set to probe for state. |
| + } |
| + f := fetcher.New(c, fetcher.Options{ |
| + Source: &src, |
| + Index: types.MessageIndex(0), |
| + Count: 0, |
| + // Try to buffer as much as possible, with a large window, since this is |
| + // basically a cloud-to-cloud connection. |
| + BufferCount: 200, |
| + BufferBytes: int64(4 * 1024 * 1024), |
| + PrefetchFactor: 10, |
| + }) |
| + |
| + for { |
| + // Read out of the buffer. This _should_ be bottlenecked on the network |
| + // connection between the Flex instance and the client, via Fprintf(). |
| + entry, err := f.NextLogEntry() |
| + switch err { |
| + case io.EOF: |
| + break // We're done. |
|
nodir
2017/04/29 00:59:45
i think this breaks the switch, not the loop
you c
hinoka
2017/05/01 23:08:02
Oh you're right... I think "return nil" will work
|
| + case nil: |
| + // Nothing |
| + case coordinator.ErrNoAccess: |
| + return errNoAuth // This will force a redirect |
| + default: |
| + return err |
| + } |
| + content := entry.GetText() |
| + if content == nil { |
| + break |
| + } |
| + for _, line := range content.Lines { |
| + fmt.Fprint(w, line.Value) |
| + fmt.Fprint(w, line.Delimiter) |
| + } |
| + } |
| + return nil |
| +} |