| OLD | NEW |
| (Empty) |
| 1 package main | |
| 2 | |
| 3 import ( | |
| 4 "errors" | |
| 5 "fmt" | |
| 6 "io" | |
| 7 "log" | |
| 8 "net/http" | |
| 9 "strings" | |
| 10 "sync" | |
| 11 "time" | |
| 12 | |
| 13 "golang.org/x/net/context" | |
| 14 | |
| 15 "github.com/luci/luci-go/common/clock" | |
| 16 "github.com/luci/luci-go/grpc/prpc" | |
| 17 "github.com/luci/luci-go/logdog/api/logpb" | |
| 18 "github.com/luci/luci-go/logdog/client/coordinator" | |
| 19 "github.com/luci/luci-go/logdog/common/fetcher" | |
| 20 "github.com/luci/luci-go/logdog/common/types" | |
| 21 "github.com/luci/luci-go/luci_config/common/cfgtypes" | |
| 22 ) | |
| 23 | |
| 24 var errNoAuth = errors.New("no access") | |
| 25 | |
| 26 type streamInfo struct { | |
| 27 Project cfgtypes.ProjectName | |
| 28 Path types.StreamPath | |
| 29 | |
| 30 // Client is the HTTP client to use for LogDog communication. | |
| 31 Client *coordinator.Client | |
| 32 } | |
| 33 | |
| 34 const noStreamDelay = 5 * time.Second | |
| 35 | |
| 36 // coordinatorSource is a fetcher.Source implementation that uses the | |
| 37 // Coordiantor API. | |
| 38 type coordinatorSource struct { | |
| 39 sync.Mutex | |
| 40 | |
| 41 stream *coordinator.Stream | |
| 42 tidx types.MessageIndex | |
| 43 tailFirst bool | |
| 44 | |
| 45 streamState *coordinator.LogStream | |
| 46 } | |
| 47 | |
| 48 func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques
t) ( | |
| 49 []*logpb.LogEntry, types.MessageIndex, error) { | |
| 50 s.Lock() | |
| 51 // TODO(hinoka): If fetching multiple streams, this would cause requests | |
| 52 // to be serialized. We may not want this. | |
| 53 defer s.Unlock() | |
| 54 | |
| 55 params := append(make([]coordinator.GetParam, 0, 4), | |
| 56 coordinator.LimitBytes(int(req.Bytes)), | |
| 57 coordinator.LimitCount(req.Count), | |
| 58 coordinator.Index(req.Index), | |
| 59 ) | |
| 60 | |
| 61 // If we haven't terminated, use this opportunity to fetch/update our st
ream | |
| 62 // state. | |
| 63 var streamState coordinator.LogStream | |
| 64 reqState := s.streamState == nil || s.streamState.State.TerminalIndex <
0 | |
| 65 if reqState { | |
| 66 params = append(params, coordinator.WithState(&streamState)) | |
| 67 } | |
| 68 | |
| 69 for { | |
| 70 logs, err := s.stream.Get(c, params...) | |
| 71 switch err { | |
| 72 case nil: | |
| 73 if reqState { | |
| 74 s.streamState = &streamState | |
| 75 s.tidx = streamState.State.TerminalIndex | |
| 76 } | |
| 77 return logs, s.tidx, nil | |
| 78 | |
| 79 case coordinator.ErrNoSuchStream: | |
| 80 log.Print("Stream does not exist. Sleeping pending regis
tration.") | |
| 81 | |
| 82 // Delay, interrupting if our Context is interrupted. | |
| 83 if tr := <-clock.After(c, noStreamDelay); tr.Incomplete(
) { | |
| 84 return nil, 0, tr.Err | |
| 85 } | |
| 86 | |
| 87 default: | |
| 88 return nil, 0, err | |
| 89 } | |
| 90 } | |
| 91 } | |
| 92 | |
| 93 func logHandler(c context.Context, w http.ResponseWriter, host, path string) err
or { | |
| 94 // TODO(hinoka): Move this to luci-config. | |
| 95 if !(host == "luci-logdog.appspot.com" || host == "luci-logdog-dev.appsp
ot.com") { | |
| 96 return fmt.Errorf("unknown host %s", host) | |
| 97 } | |
| 98 spath := strings.SplitN(path, "/", 2) | |
| 99 if len(spath) != 2 { | |
| 100 return fmt.Errorf("%s is not a valid path", path) | |
| 101 } | |
| 102 project := cfgtypes.ProjectName(spath[0]) | |
| 103 streamPath := types.StreamPath(spath[1]) | |
| 104 | |
| 105 client := coordinator.NewClient(&prpc.Client{ | |
| 106 C: &http.Client{ | |
| 107 // TODO(hinoka): Once crbug.com/712506 is resolved, figu
re out how to get auth. | |
| 108 Transport: http.DefaultTransport, | |
| 109 }, | |
| 110 Host: host, | |
| 111 }) | |
| 112 stream := client.Stream(project, streamPath) | |
| 113 | |
| 114 // Pull stream information. | |
| 115 f := fetcher.New(c, fetcher.Options{ | |
| 116 Source: &coordinatorSource{ | |
| 117 stream: stream, | |
| 118 tidx: -1, // Must be set to probe for state. | |
| 119 }, | |
| 120 Index: types.MessageIndex(0), | |
| 121 Count: 0, | |
| 122 // Try to buffer as much as possible, with a large window, since
this is | |
| 123 // basically a cloud-to-cloud connection. | |
| 124 BufferCount: 200, | |
| 125 BufferBytes: int64(4 * 1024 * 1024), | |
| 126 PrefetchFactor: 10, | |
| 127 }) | |
| 128 | |
| 129 for { | |
| 130 // Read out of the buffer. This _should_ be bottlenecked on the
network | |
| 131 // connection between the Flex instance and the client, via Fpri
ntf(). | |
| 132 entry, err := f.NextLogEntry() | |
| 133 switch err { | |
| 134 case io.EOF: | |
| 135 return nil // We're done. | |
| 136 case nil: | |
| 137 // Nothing | |
| 138 case coordinator.ErrNoAccess: | |
| 139 return errNoAuth // This will force a redirect | |
| 140 default: | |
| 141 return err | |
| 142 } | |
| 143 content := entry.GetText() | |
| 144 if content == nil { | |
| 145 break | |
| 146 } | |
| 147 for _, line := range content.Lines { | |
| 148 fmt.Fprint(w, line.Value) | |
| 149 fmt.Fprint(w, line.Delimiter) | |
| 150 } | |
| 151 } | |
| 152 return nil | |
| 153 } | |
| OLD | NEW |