OLD | NEW |
| (Empty) |
1 package main | |
2 | |
3 import ( | |
4 "errors" | |
5 "fmt" | |
6 "io" | |
7 "log" | |
8 "net/http" | |
9 "strings" | |
10 "sync" | |
11 "time" | |
12 | |
13 "golang.org/x/net/context" | |
14 | |
15 "github.com/luci/luci-go/common/clock" | |
16 "github.com/luci/luci-go/grpc/prpc" | |
17 "github.com/luci/luci-go/logdog/api/logpb" | |
18 "github.com/luci/luci-go/logdog/client/coordinator" | |
19 "github.com/luci/luci-go/logdog/common/fetcher" | |
20 "github.com/luci/luci-go/logdog/common/types" | |
21 "github.com/luci/luci-go/luci_config/common/cfgtypes" | |
22 ) | |
23 | |
24 var errNoAuth = errors.New("no access") | |
25 | |
26 type streamInfo struct { | |
27 Project cfgtypes.ProjectName | |
28 Path types.StreamPath | |
29 | |
30 // Client is the HTTP client to use for LogDog communication. | |
31 Client *coordinator.Client | |
32 } | |
33 | |
34 const noStreamDelay = 5 * time.Second | |
35 | |
36 // coordinatorSource is a fetcher.Source implementation that uses the | |
37 // Coordiantor API. | |
38 type coordinatorSource struct { | |
39 sync.Mutex | |
40 | |
41 stream *coordinator.Stream | |
42 tidx types.MessageIndex | |
43 tailFirst bool | |
44 | |
45 streamState *coordinator.LogStream | |
46 } | |
47 | |
48 func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques
t) ( | |
49 []*logpb.LogEntry, types.MessageIndex, error) { | |
50 s.Lock() | |
51 // TODO(hinoka): If fetching multiple streams, this would cause requests | |
52 // to be serialized. We may not want this. | |
53 defer s.Unlock() | |
54 | |
55 params := append(make([]coordinator.GetParam, 0, 4), | |
56 coordinator.LimitBytes(int(req.Bytes)), | |
57 coordinator.LimitCount(req.Count), | |
58 coordinator.Index(req.Index), | |
59 ) | |
60 | |
61 // If we haven't terminated, use this opportunity to fetch/update our st
ream | |
62 // state. | |
63 var streamState coordinator.LogStream | |
64 reqState := s.streamState == nil || s.streamState.State.TerminalIndex <
0 | |
65 if reqState { | |
66 params = append(params, coordinator.WithState(&streamState)) | |
67 } | |
68 | |
69 for { | |
70 logs, err := s.stream.Get(c, params...) | |
71 switch err { | |
72 case nil: | |
73 if reqState { | |
74 s.streamState = &streamState | |
75 s.tidx = streamState.State.TerminalIndex | |
76 } | |
77 return logs, s.tidx, nil | |
78 | |
79 case coordinator.ErrNoSuchStream: | |
80 log.Print("Stream does not exist. Sleeping pending regis
tration.") | |
81 | |
82 // Delay, interrupting if our Context is interrupted. | |
83 if tr := <-clock.After(c, noStreamDelay); tr.Incomplete(
) { | |
84 return nil, 0, tr.Err | |
85 } | |
86 | |
87 default: | |
88 return nil, 0, err | |
89 } | |
90 } | |
91 } | |
92 | |
93 func logHandler(c context.Context, w http.ResponseWriter, host, path string) err
or { | |
94 // TODO(hinoka): Move this to luci-config. | |
95 if !(host == "luci-logdog.appspot.com" || host == "luci-logdog-dev.appsp
ot.com") { | |
96 return fmt.Errorf("unknown host %s", host) | |
97 } | |
98 spath := strings.SplitN(path, "/", 2) | |
99 if len(spath) != 2 { | |
100 return fmt.Errorf("%s is not a valid path", path) | |
101 } | |
102 project := cfgtypes.ProjectName(spath[0]) | |
103 streamPath := types.StreamPath(spath[1]) | |
104 | |
105 client := coordinator.NewClient(&prpc.Client{ | |
106 C: &http.Client{ | |
107 // TODO(hinoka): Once crbug.com/712506 is resolved, figu
re out how to get auth. | |
108 Transport: http.DefaultTransport, | |
109 }, | |
110 Host: host, | |
111 }) | |
112 stream := client.Stream(project, streamPath) | |
113 | |
114 // Pull stream information. | |
115 f := fetcher.New(c, fetcher.Options{ | |
116 Source: &coordinatorSource{ | |
117 stream: stream, | |
118 tidx: -1, // Must be set to probe for state. | |
119 }, | |
120 Index: types.MessageIndex(0), | |
121 Count: 0, | |
122 // Try to buffer as much as possible, with a large window, since
this is | |
123 // basically a cloud-to-cloud connection. | |
124 BufferCount: 200, | |
125 BufferBytes: int64(4 * 1024 * 1024), | |
126 PrefetchFactor: 10, | |
127 }) | |
128 | |
129 for { | |
130 // Read out of the buffer. This _should_ be bottlenecked on the
network | |
131 // connection between the Flex instance and the client, via Fpri
ntf(). | |
132 entry, err := f.NextLogEntry() | |
133 switch err { | |
134 case io.EOF: | |
135 return nil // We're done. | |
136 case nil: | |
137 // Nothing | |
138 case coordinator.ErrNoAccess: | |
139 return errNoAuth // This will force a redirect | |
140 default: | |
141 return err | |
142 } | |
143 content := entry.GetText() | |
144 if content == nil { | |
145 break | |
146 } | |
147 for _, line := range content.Lines { | |
148 fmt.Fprint(w, line.Value) | |
149 fmt.Fprint(w, line.Delimiter) | |
150 } | |
151 } | |
152 return nil | |
153 } | |
OLD | NEW |