| Index: milo/appengine/logs/logs.go
|
| diff --git a/milo/appengine/logs/logs.go b/milo/appengine/logs/logs.go
|
| deleted file mode 100644
|
| index 743581eaa336c2b02359542cb158af2ce78182a5..0000000000000000000000000000000000000000
|
| --- a/milo/appengine/logs/logs.go
|
| +++ /dev/null
|
| @@ -1,153 +0,0 @@
|
| -package main
|
| -
|
| -import (
|
| - "errors"
|
| - "fmt"
|
| - "io"
|
| - "log"
|
| - "net/http"
|
| - "strings"
|
| - "sync"
|
| - "time"
|
| -
|
| - "golang.org/x/net/context"
|
| -
|
| - "github.com/luci/luci-go/common/clock"
|
| - "github.com/luci/luci-go/grpc/prpc"
|
| - "github.com/luci/luci-go/logdog/api/logpb"
|
| - "github.com/luci/luci-go/logdog/client/coordinator"
|
| - "github.com/luci/luci-go/logdog/common/fetcher"
|
| - "github.com/luci/luci-go/logdog/common/types"
|
| - "github.com/luci/luci-go/luci_config/common/cfgtypes"
|
| -)
|
| -
|
| -var errNoAuth = errors.New("no access")
|
| -
|
| -type streamInfo struct {
|
| - Project cfgtypes.ProjectName
|
| - Path types.StreamPath
|
| -
|
| - // Client is the HTTP client to use for LogDog communication.
|
| - Client *coordinator.Client
|
| -}
|
| -
|
| -const noStreamDelay = 5 * time.Second
|
| -
|
| -// coordinatorSource is a fetcher.Source implementation that uses the
|
| -// Coordiantor API.
|
| -type coordinatorSource struct {
|
| - sync.Mutex
|
| -
|
| - stream *coordinator.Stream
|
| - tidx types.MessageIndex
|
| - tailFirst bool
|
| -
|
| - streamState *coordinator.LogStream
|
| -}
|
| -
|
| -func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogRequest) (
|
| - []*logpb.LogEntry, types.MessageIndex, error) {
|
| - s.Lock()
|
| - // TODO(hinoka): If fetching multiple streams, this would cause requests
|
| - // to be serialized. We may not want this.
|
| - defer s.Unlock()
|
| -
|
| - params := append(make([]coordinator.GetParam, 0, 4),
|
| - coordinator.LimitBytes(int(req.Bytes)),
|
| - coordinator.LimitCount(req.Count),
|
| - coordinator.Index(req.Index),
|
| - )
|
| -
|
| - // If we haven't terminated, use this opportunity to fetch/update our stream
|
| - // state.
|
| - var streamState coordinator.LogStream
|
| - reqState := s.streamState == nil || s.streamState.State.TerminalIndex < 0
|
| - if reqState {
|
| - params = append(params, coordinator.WithState(&streamState))
|
| - }
|
| -
|
| - for {
|
| - logs, err := s.stream.Get(c, params...)
|
| - switch err {
|
| - case nil:
|
| - if reqState {
|
| - s.streamState = &streamState
|
| - s.tidx = streamState.State.TerminalIndex
|
| - }
|
| - return logs, s.tidx, nil
|
| -
|
| - case coordinator.ErrNoSuchStream:
|
| - log.Print("Stream does not exist. Sleeping pending registration.")
|
| -
|
| - // Delay, interrupting if our Context is interrupted.
|
| - if tr := <-clock.After(c, noStreamDelay); tr.Incomplete() {
|
| - return nil, 0, tr.Err
|
| - }
|
| -
|
| - default:
|
| - return nil, 0, err
|
| - }
|
| - }
|
| -}
|
| -
|
| -func logHandler(c context.Context, w http.ResponseWriter, host, path string) error {
|
| - // TODO(hinoka): Move this to luci-config.
|
| - if !(host == "luci-logdog.appspot.com" || host == "luci-logdog-dev.appspot.com") {
|
| - return fmt.Errorf("unknown host %s", host)
|
| - }
|
| - spath := strings.SplitN(path, "/", 2)
|
| - if len(spath) != 2 {
|
| - return fmt.Errorf("%s is not a valid path", path)
|
| - }
|
| - project := cfgtypes.ProjectName(spath[0])
|
| - streamPath := types.StreamPath(spath[1])
|
| -
|
| - client := coordinator.NewClient(&prpc.Client{
|
| - C: &http.Client{
|
| - // TODO(hinoka): Once crbug.com/712506 is resolved, figure out how to get auth.
|
| - Transport: http.DefaultTransport,
|
| - },
|
| - Host: host,
|
| - })
|
| - stream := client.Stream(project, streamPath)
|
| -
|
| - // Pull stream information.
|
| - f := fetcher.New(c, fetcher.Options{
|
| - Source: &coordinatorSource{
|
| - stream: stream,
|
| - tidx: -1, // Must be set to probe for state.
|
| - },
|
| - Index: types.MessageIndex(0),
|
| - Count: 0,
|
| - // Try to buffer as much as possible, with a large window, since this is
|
| - // basically a cloud-to-cloud connection.
|
| - BufferCount: 200,
|
| - BufferBytes: int64(4 * 1024 * 1024),
|
| - PrefetchFactor: 10,
|
| - })
|
| -
|
| - for {
|
| - // Read out of the buffer. This _should_ be bottlenecked on the network
|
| - // connection between the Flex instance and the client, via Fprintf().
|
| - entry, err := f.NextLogEntry()
|
| - switch err {
|
| - case io.EOF:
|
| - return nil // We're done.
|
| - case nil:
|
| - // Nothing
|
| - case coordinator.ErrNoAccess:
|
| - return errNoAuth // This will force a redirect
|
| - default:
|
| - return err
|
| - }
|
| - content := entry.GetText()
|
| - if content == nil {
|
| - break
|
| - }
|
| - for _, line := range content.Lines {
|
| - fmt.Fprint(w, line.Value)
|
| - fmt.Fprint(w, line.Delimiter)
|
| - }
|
| - }
|
| - return nil
|
| -}
|
|
|