| OLD | NEW |
| (Empty) | |
| 1 package main |
| 2 |
| 3 import ( |
| 4 "errors" |
| 5 "fmt" |
| 6 "io" |
| 7 "log" |
| 8 "net/http" |
| 9 "strings" |
| 10 "sync" |
| 11 "time" |
| 12 |
| 13 "golang.org/x/net/context" |
| 14 |
| 15 "github.com/luci/luci-go/common/clock" |
| 16 "github.com/luci/luci-go/grpc/prpc" |
| 17 "github.com/luci/luci-go/logdog/api/logpb" |
| 18 "github.com/luci/luci-go/logdog/client/coordinator" |
| 19 "github.com/luci/luci-go/logdog/common/fetcher" |
| 20 "github.com/luci/luci-go/logdog/common/types" |
| 21 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 22 ) |
| 23 |
| 24 var errNoAuth = errors.New("no access") |
| 25 |
| 26 type streamInfo struct { |
| 27 Project cfgtypes.ProjectName |
| 28 Path types.StreamPath |
| 29 |
| 30 // Client is the HTTP client to use for LogDog communication. |
| 31 Client *coordinator.Client |
| 32 } |
| 33 |
| 34 const noStreamDelay = 5 * time.Second |
| 35 |
| 36 // coordinatorSource is a fetcher.Source implementation that uses the |
| 37 // Coordiantor API. |
| 38 type coordinatorSource struct { |
| 39 sync.Mutex |
| 40 |
| 41 stream *coordinator.Stream |
| 42 tidx types.MessageIndex |
| 43 tailFirst bool |
| 44 |
| 45 streamState *coordinator.LogStream |
| 46 } |
| 47 |
| 48 func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques
t) ( |
| 49 []*logpb.LogEntry, types.MessageIndex, error) { |
| 50 s.Lock() |
| 51 // TODO(hinoka): If fetching multiple streams, this would cause requests |
| 52 // to be serialized. We may not want this. |
| 53 defer s.Unlock() |
| 54 |
| 55 params := append(make([]coordinator.GetParam, 0, 4), |
| 56 coordinator.LimitBytes(int(req.Bytes)), |
| 57 coordinator.LimitCount(req.Count), |
| 58 coordinator.Index(req.Index), |
| 59 ) |
| 60 |
| 61 // If we haven't terminated, use this opportunity to fetch/update our st
ream |
| 62 // state. |
| 63 var streamState coordinator.LogStream |
| 64 reqState := s.streamState == nil || s.streamState.State.TerminalIndex <
0 |
| 65 if reqState { |
| 66 params = append(params, coordinator.WithState(&streamState)) |
| 67 } |
| 68 |
| 69 for { |
| 70 logs, err := s.stream.Get(c, params...) |
| 71 switch err { |
| 72 case nil: |
| 73 if reqState { |
| 74 s.streamState = &streamState |
| 75 s.tidx = streamState.State.TerminalIndex |
| 76 } |
| 77 return logs, s.tidx, nil |
| 78 |
| 79 case coordinator.ErrNoSuchStream: |
| 80 log.Print("Stream does not exist. Sleeping pending regis
tration.") |
| 81 |
| 82 // Delay, interrupting if our Context is interrupted. |
| 83 if tr := <-clock.After(c, noStreamDelay); tr.Incomplete(
) { |
| 84 return nil, 0, tr.Err |
| 85 } |
| 86 |
| 87 default: |
| 88 return nil, 0, err |
| 89 } |
| 90 } |
| 91 } |
| 92 |
| 93 func logHandler(c context.Context, w http.ResponseWriter, host, path string) err
or { |
| 94 // TODO(hinoka): Move this to luci-config. |
| 95 if !(host == "luci-logdog.appspot.com" || host == "luci-logdog-dev.appsp
ot.com") { |
| 96 return fmt.Errorf("unknown host %s", host) |
| 97 } |
| 98 spath := strings.SplitN(path, "/", 2) |
| 99 if len(spath) != 2 { |
| 100 return fmt.Errorf("%s is not a valid path", path) |
| 101 } |
| 102 project := cfgtypes.ProjectName(spath[0]) |
| 103 streamPath := types.StreamPath(spath[1]) |
| 104 |
| 105 client := coordinator.NewClient(&prpc.Client{ |
| 106 C: &http.Client{ |
| 107 // TODO(hinoka): Once crbug.com/712506 is resolved, figu
re out how to get auth. |
| 108 Transport: http.DefaultTransport, |
| 109 }, |
| 110 Host: host, |
| 111 }) |
| 112 stream := client.Stream(project, streamPath) |
| 113 |
| 114 // Pull stream information. |
| 115 f := fetcher.New(c, fetcher.Options{ |
| 116 Source: &coordinatorSource{ |
| 117 stream: stream, |
| 118 tidx: -1, // Must be set to probe for state. |
| 119 }, |
| 120 Index: types.MessageIndex(0), |
| 121 Count: 0, |
| 122 // Try to buffer as much as possible, with a large window, since
this is |
| 123 // basically a cloud-to-cloud connection. |
| 124 BufferCount: 200, |
| 125 BufferBytes: int64(4 * 1024 * 1024), |
| 126 PrefetchFactor: 10, |
| 127 }) |
| 128 |
| 129 for { |
| 130 // Read out of the buffer. This _should_ be bottlenecked on the
network |
| 131 // connection between the Flex instance and the client, via Fpri
ntf(). |
| 132 entry, err := f.NextLogEntry() |
| 133 switch err { |
| 134 case io.EOF: |
| 135 return nil // We're done. |
| 136 case nil: |
| 137 // Nothing |
| 138 case coordinator.ErrNoAccess: |
| 139 return errNoAuth // This will force a redirect |
| 140 default: |
| 141 return err |
| 142 } |
| 143 content := entry.GetText() |
| 144 if content == nil { |
| 145 break |
| 146 } |
| 147 for _, line := range content.Lines { |
| 148 fmt.Fprint(w, line.Value) |
| 149 fmt.Fprint(w, line.Delimiter) |
| 150 } |
| 151 } |
| 152 return nil |
| 153 } |
| OLD | NEW |