OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
Ryan Tseng
2016/02/08 22:56:55
2016 /happy new year
dnj (Google)
2016/02/09 02:50:03
Done.
| |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package logView | |
6 | |
7 import ( | |
8 "io" | |
9 "net/http" | |
10 "strings" | |
11 "time" | |
12 | |
13 "github.com/golang/protobuf/proto" | |
14 "github.com/julienschmidt/httprouter" | |
15 ds "github.com/luci/gae/service/datastore" | |
16 "github.com/luci/luci-go/appengine/logdog/coordinator" | |
17 "github.com/luci/luci-go/appengine/logdog/coordinator/config" | |
18 "github.com/luci/luci-go/common/clock" | |
19 "github.com/luci/luci-go/common/logdog/fetcher" | |
20 "github.com/luci/luci-go/common/logdog/renderer" | |
21 "github.com/luci/luci-go/common/logdog/types" | |
22 log "github.com/luci/luci-go/common/logging" | |
23 "github.com/luci/luci-go/common/proto/logdog/logpb" | |
24 "github.com/luci/luci-go/server/logdog/storage" | |
25 "golang.org/x/net/context" | |
26 ) | |
27 | |
28 const ( | |
29 // DefaultViewTimeout is the default value for the Handler's Timeout. | |
30 // | |
31 // It targets the managed VM environment, which allows 24 hours. We stop | |
32 // 10 minutes short of that to avoid conflict. | |
33 DefaultViewTimeout = (24 * time.Hour) - (10 * time.Minute) | |
34 | |
35 // viewLogDelay is the amount of time to wait in between polls when no m ore | |
36 // log entries are available during streaming. | |
37 viewLogDelay = (5 * time.Second) | |
38 ) | |
39 | |
40 // Handler offers an HTTP handler that will return the rendered contents of a | |
41 // log stream. | |
42 type Handler struct { | |
43 coordinator.Service | |
44 | |
45 // timeout is the maximum amount of time that we are willing to handle a | |
46 // View request. | |
47 // | |
48 // This should be short of the maximum request time allowed by an AppEng ine | |
49 // handler, else the instance may be terminated. | |
50 timeout time.Duration | |
51 } | |
52 | |
53 // Handle will render the contents of a log stream to the supplied | |
54 // ResponseWriter. | |
55 // | |
56 // It expects the stream hash/path to be embedded in p as "path". | |
57 func (h *Handler) Handle(c context.Context, w http.ResponseWriter, req *http.Req uest, p httprouter.Params) { | |
58 rc := h.handleImpl(c, w, req, p) | |
59 w.WriteHeader(rc) | |
60 } | |
61 | |
62 func (h *Handler) handleImpl(c context.Context, w http.ResponseWriter, req *http .Request, p httprouter.Params) int { | |
63 // Get the requested log path/hash. | |
64 path := strings.TrimLeft(p.ByName("path"), "/") | |
65 log.Fields{ | |
66 "path": path, | |
67 }.Infof(c, "Request to view log stream.") | |
68 | |
69 ls, err := coordinator.NewLogStream(path) | |
70 if err != nil { | |
71 log.WithError(err).Errorf(c, "Invalid log sream path/hash value supplied.") | |
72 return http.StatusBadRequest | |
73 } | |
74 | |
75 // Set a timeout on our Context. | |
76 timeout := h.timeout | |
77 if timeout <= 0 { | |
78 timeout = DefaultViewTimeout | |
79 } | |
80 c, cancelFunc := context.WithTimeout(c, timeout) | |
81 defer cancelFunc() | |
82 | |
83 // Load the log stream metadata. | |
84 if err := ds.Get(c).Get(&ls); err != nil { | |
85 log.WithError(err).Errorf(c, "Failed to load LogStream.") | |
86 | |
87 switch err { | |
88 case ds.ErrNoSuchEntity: | |
89 return http.StatusNotFound | |
90 default: | |
91 return http.StatusInternalServerError | |
92 } | |
93 } | |
94 | |
95 // The user can stream purged logs only if they're an administrator. If not, | |
96 // pretend that the stream does not exist. | |
97 if ls.Purged { | |
98 if err := config.IsAdminUser(c); err != nil { | |
99 log.WithError(err).Warningf(c, "Non-admin user requested purged log stream.") | |
100 return http.StatusNotFound | |
101 } | |
102 } | |
103 | |
104 // Get our Storage instance. | |
105 st, err := h.GetLogStreamStorage(c, ls) | |
106 if err != nil { | |
107 log.WithError(err).Errorf(c, "Failed to get Storage instance.") | |
108 return http.StatusInternalServerError | |
109 } | |
110 defer st.Close() | |
111 | |
112 // Fetch the requested log stream! | |
113 w.Header().Add("Content-Type", ls.ContentType) | |
114 | |
115 fetchCtx, cancelFunc := context.WithCancel(c) | |
Ryan Tseng
2016/02/08 22:56:55
cancelFunc isn't used anywhere
dnj (Google)
2016/02/09 02:50:03
Done.
| |
116 f := fetcher.New(fetchCtx, fetcher.Options{ | |
117 Source: &storageLogSource{ | |
118 ls: ls, | |
119 st: st, | |
120 }, | |
121 Delay: viewLogDelay, | |
122 }) | |
123 r := renderer.Renderer{ | |
124 Source: f, | |
125 Reproduce: true, | |
126 } | |
127 | |
128 // Render log data to our HTTP response. | |
129 count, err := io.Copy(w, &r) | |
130 if err != nil { | |
131 log.WithError(err).Errorf(c, "Failed to output log stream.") | |
132 return http.StatusInternalServerError | |
133 } | |
134 | |
135 log.Fields{ | |
136 "bytes": count, | |
137 }.Infof(c, "Successfully wrote log stream.") | |
138 return http.StatusOK | |
139 } | |
140 | |
141 // storageLogSource is a stateful incremental implementation of the | |
142 // fetcher.Source interface which uses direct storage access to pull log entries . | |
143 // | |
144 // The Fetcher will respond to Context cancellation, so we don't have to worry | |
145 // about that here. | |
146 type storageLogSource struct { | |
147 ls *coordinator.LogStream | |
148 st storage.Storage | |
149 } | |
150 | |
151 func (f *storageLogSource) LogEntries(c context.Context, req *fetcher.LogRequest ) ( | |
Ryan Tseng
2016/02/08 22:56:55
docstring (lint should catch this)
dnj (Google)
2016/02/09 02:50:03
It's a public method on a private struct, so that'
| |
152 logs []*logpb.LogEntry, tidx types.MessageIndex, err error) { | |
153 // If this log stream hasn't been terminated yet, re-load it and check i f | |
154 // there is a terminal index. | |
155 tidx = types.MessageIndex(f.ls.TerminalIndex) | |
156 if tidx < 0 { | |
157 log.Debugf(c, "Log stream is non-terminal, refreshing.") | |
158 if err := ds.Get(c).Get(f.ls); err != nil { | |
159 log.WithError(err).Errorf(c, "Failed to refresh log stre am.") | |
160 } | |
161 | |
162 tidx = types.MessageIndex(f.ls.TerminalIndex) | |
163 if tidx >= 0 { | |
164 log.Fields{ | |
165 "terminalIndex": tidx, | |
166 }.Debugf(c, "Log stream has been terminated.") | |
167 } | |
168 } | |
169 | |
170 sreq := storage.GetRequest{ | |
171 Path: f.ls.Path(), | |
172 Index: types.MessageIndex(req.Index), | |
173 } | |
174 | |
175 // If we have a terminal index, set a limit. This is probably unnecessar y, but | |
176 // will handle the error case where logs past the terminal index are wri tten | |
177 // to intermediate storage before a terminal index is registered. | |
178 if tidx >= 0 { | |
179 sreq.Limit = int64(tidx - req.Index + 1) | |
180 if sreq.Limit <= 0 { | |
181 // No more log records in the stream. | |
182 return | |
183 } | |
184 } | |
185 | |
186 // Issue the Get request. | |
Ryan Tseng
2016/02/08 22:56:55
presumably for all chunks of a logstream?
dnj (Google)
2016/02/09 02:50:03
Well, for the ones requested via "req".
| |
187 var logData [][]byte | |
188 bytes := int64(0) | |
189 err = f.st.Get(&sreq, func(idx types.MessageIndex, ld []byte) bool { | |
190 // Enforce contiguous log records. | |
191 if idx != sreq.Index { | |
192 return false | |
193 } | |
194 logData = append(logData, ld) | |
195 sreq.Index++ | |
196 bytes += int64(len(ld)) | |
197 | |
198 // Apply byte/count constraints. | |
199 return (!(req.Count > 0 && int64(len(logData)) >= req.Count) && !(req.Bytes > 0 && bytes >= req.Bytes)) | |
200 }) | |
201 if err != nil { | |
202 log.Fields{ | |
203 log.ErrorKey: err, | |
204 "initialIndex": req.Index, | |
205 "nextIndex": sreq.Index, | |
206 "count": len(logData), | |
207 }.Errorf(c, "Failed to fetch logs.") | |
208 return | |
209 } | |
210 | |
211 // Convert the logs to protobufs. | |
212 logs = make([]*logpb.LogEntry, len(logData)) | |
213 for i, ld := range logData { | |
214 le := logpb.LogEntry{} | |
215 if err = proto.Unmarshal(ld, &le); err != nil { | |
216 log.Fields{ | |
217 log.ErrorKey: err, | |
218 "logIndex": i, | |
219 }.Errorf(c, "Failed to unmarshal LogEntry from log data. ") | |
220 return | |
221 } | |
222 logs[i] = &le | |
223 } | |
224 return | |
225 } | |
226 | |
227 // cancelSleep will block for the specified duration, or until the supplied | |
228 // Context is done. | |
229 func cancelSleep(c context.Context, d time.Duration) error { | |
230 t := clock.NewTimer(c) | |
231 t.Reset(d) | |
232 defer t.Stop() | |
233 | |
234 select { | |
235 case <-c.Done(): | |
236 return c.Err() | |
237 case <-t.GetC(): | |
238 return nil | |
239 } | |
240 } | |
OLD | NEW |