Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 package main | |
| 2 | |
| 3 import ( | |
| 4 "errors" | |
| 5 "fmt" | |
| 6 "io" | |
| 7 "log" | |
| 8 "net/http" | |
| 9 "strings" | |
| 10 "sync" | |
| 11 "time" | |
| 12 | |
| 13 "github.com/luci/luci-go/common/clock" | |
| 14 "github.com/luci/luci-go/grpc/prpc" | |
| 15 "github.com/luci/luci-go/logdog/api/logpb" | |
| 16 "github.com/luci/luci-go/logdog/client/coordinator" | |
| 17 "github.com/luci/luci-go/logdog/common/fetcher" | |
| 18 "github.com/luci/luci-go/logdog/common/types" | |
| 19 "github.com/luci/luci-go/luci_config/common/cfgtypes" | |
| 20 | |
| 21 "golang.org/x/net/context" | |
|
nodir
2017/04/29 00:59:45
3p imports should be before 1p
hinoka
2017/05/01 23:08:02
Done.
| |
| 22 ) | |
| 23 | |
| 24 var errNoAuth = errors.New("no access") | |
| 25 | |
| 26 type streamInfo struct { | |
| 27 Project cfgtypes.ProjectName | |
| 28 Path types.StreamPath | |
| 29 | |
| 30 // Client is the HTTP client to use for LogDog communication. | |
| 31 Client *coordinator.Client | |
| 32 } | |
| 33 | |
| 34 const noStreamDelay = 5 * time.Second | |
| 35 | |
| 36 // coordinatorSource is a fetcher.Source implementation that uses the | |
| 37 // Coordiantor API. | |
| 38 type coordinatorSource struct { | |
| 39 sync.Mutex | |
| 40 | |
| 41 stream *coordinator.Stream | |
| 42 tidx types.MessageIndex | |
| 43 tailFirst bool | |
| 44 | |
| 45 streamState *coordinator.LogStream | |
| 46 } | |
| 47 | |
| 48 func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques t) ( | |
| 49 []*logpb.LogEntry, types.MessageIndex, error) { | |
| 50 s.Lock() | |
|
nodir
2017/04/29 00:59:45
FWIU, this serializes all requests to logdog. It i
hinoka
2017/05/01 23:08:02
Done.
| |
| 51 defer s.Unlock() | |
| 52 | |
| 53 params := append(make([]coordinator.GetParam, 0, 4), | |
| 54 coordinator.LimitBytes(int(req.Bytes)), | |
| 55 coordinator.LimitCount(req.Count), | |
| 56 coordinator.Index(req.Index), | |
| 57 ) | |
| 58 | |
| 59 // If we haven't terminated, use this opportunity to fetch/update our st ream | |
| 60 // state. | |
| 61 var streamState coordinator.LogStream | |
| 62 reqStream := (s.streamState == nil || s.streamState.State.TerminalIndex < 0) | |
|
nodir
2017/04/29 00:59:45
nit: parens are unnecessary
hinoka
2017/05/01 23:08:02
Done.
| |
| 63 if reqStream { | |
| 64 params = append(params, coordinator.WithState(&streamState)) | |
|
nodir
2017/04/29 00:59:45
it seems that this variable controls whether or no
hinoka
2017/05/01 23:08:02
Done.
| |
| 65 } | |
| 66 | |
| 67 for { | |
| 68 logs, err := s.stream.Get(c, params...) | |
| 69 switch err { | |
| 70 case nil: | |
| 71 if reqStream { | |
| 72 s.streamState = &streamState | |
| 73 s.tidx = streamState.State.TerminalIndex | |
| 74 } | |
| 75 return logs, s.tidx, nil | |
| 76 | |
| 77 case coordinator.ErrNoSuchStream: | |
| 78 log.Print("Stream does not exist. Sleeping pending regis tration.") | |
| 79 | |
| 80 // Delay, interrupting if our Context is interrupted. | |
| 81 if tr := <-clock.After(c, noStreamDelay); tr.Incomplete( ) { | |
| 82 return nil, 0, tr.Err | |
| 83 } | |
| 84 | |
| 85 default: | |
| 86 if err != nil { | |
|
nodir
2017/04/29 00:59:45
this is constant true because of `case nil` above,
hinoka
2017/05/01 23:08:02
Done.
| |
| 87 return nil, 0, err | |
| 88 } | |
| 89 } | |
| 90 } | |
| 91 } | |
| 92 | |
| 93 func logHandler(c context.Context, w http.ResponseWriter, host, path string) err or { | |
| 94 // TODO(hinoka): Move this to luci-config. | |
| 95 if !(host == "luci-logdog.appspot.com" || host == "luci-logdog-dev.appsp ot.com") { | |
| 96 return fmt.Errorf("unknown host %s", host) | |
| 97 } | |
| 98 spath := strings.SplitN(path, "/", 2) | |
| 99 project := cfgtypes.ProjectName(spath[0]) | |
| 100 streamPath := types.StreamPath(spath[1]) | |
|
nodir
2017/04/29 00:59:45
this will crash the server if someone requests /lo
hinoka
2017/05/01 23:08:03
Done.
| |
| 101 | |
| 102 client := coordinator.NewClient(&prpc.Client{ | |
| 103 C: &http.Client{ | |
| 104 // TODO(hinoka): Once 712506 is resolved, figure out how to get auth. | |
|
nodir
2017/04/29 00:59:45
please write crbug.com/712506
hinoka
2017/05/01 23:08:02
Done.
| |
| 105 Transport: http.DefaultTransport, | |
| 106 }, | |
| 107 Host: host, | |
| 108 }) | |
| 109 stream := client.Stream(project, streamPath) | |
| 110 | |
| 111 // Pull stream information. | |
| 112 src := coordinatorSource{ | |
|
nodir
2017/04/29 00:59:45
nit: consider inlining
hinoka
2017/05/01 23:08:02
Done.
| |
| 113 stream: stream, | |
| 114 tidx: -1, // Must be set to probe for state. | |
| 115 } | |
| 116 f := fetcher.New(c, fetcher.Options{ | |
| 117 Source: &src, | |
| 118 Index: types.MessageIndex(0), | |
| 119 Count: 0, | |
| 120 // Try to buffer as much as possible, with a large window, since this is | |
| 121 // basically a cloud-to-cloud connection. | |
| 122 BufferCount: 200, | |
| 123 BufferBytes: int64(4 * 1024 * 1024), | |
| 124 PrefetchFactor: 10, | |
| 125 }) | |
| 126 | |
| 127 for { | |
| 128 // Read out of the buffer. This _should_ be bottlenecked on the network | |
| 129 // connection between the Flex instance and the client, via Fpri ntf(). | |
| 130 entry, err := f.NextLogEntry() | |
| 131 switch err { | |
| 132 case io.EOF: | |
| 133 break // We're done. | |
|
nodir
2017/04/29 00:59:45
i think this breaks the switch, not the loop
you c
hinoka
2017/05/01 23:08:02
Oh you're right... I think "return nil" will work
| |
| 134 case nil: | |
| 135 // Nothing | |
| 136 case coordinator.ErrNoAccess: | |
| 137 return errNoAuth // This will force a redirect | |
| 138 default: | |
| 139 return err | |
| 140 } | |
| 141 content := entry.GetText() | |
| 142 if content == nil { | |
| 143 break | |
| 144 } | |
| 145 for _, line := range content.Lines { | |
| 146 fmt.Fprint(w, line.Value) | |
| 147 fmt.Fprint(w, line.Delimiter) | |
| 148 } | |
| 149 } | |
| 150 return nil | |
| 151 } | |
| OLD | NEW |