Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(49)

Side by Side Diff: appengine/logdog/coordinator/logView/view.go

Issue 1672833003: LogDog: Add log rendering view. Base URL: https://github.com/luci/luci-go@master
Patch Set: Clean up, add tests, little reorg. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package logView
6
7 import (
8 "io"
9 "net/http"
10 "strings"
11 "time"
12
13 "github.com/golang/protobuf/proto"
14 "github.com/julienschmidt/httprouter"
15 ds "github.com/luci/gae/service/datastore"
16 "github.com/luci/luci-go/appengine/logdog/coordinator"
17 "github.com/luci/luci-go/appengine/logdog/coordinator/config"
18 "github.com/luci/luci-go/common/logdog/fetcher"
19 "github.com/luci/luci-go/common/logdog/renderer"
20 "github.com/luci/luci-go/common/logdog/types"
21 log "github.com/luci/luci-go/common/logging"
22 "github.com/luci/luci-go/common/proto/logdog/logpb"
23 "github.com/luci/luci-go/server/logdog/storage"
24 "github.com/luci/luci-go/server/middleware"
25 "golang.org/x/net/context"
26 )
27
28 const (
29 // DefaultViewTimeout is the default value for the Handler's Timeout.
30 //
31 // It targets the managed VM environment, which allows 24 hours. We stop
32 // 10 minutes short of that to avoid conflict.
33 DefaultViewTimeout = (24 * time.Hour) - (10 * time.Minute)
34
35 // viewLogDelay is the amount of time to wait in between polls when no m ore
36 // log entries are available during streaming.
37 viewLogDelay = (5 * time.Second)
38 )
39
40 // Handler offers an HTTP handler that will return the rendered contents of a
41 // log stream.
42 type Handler struct {
43 coordinator.Service
44
45 // timeout is the maximum amount of time that we are willing to handle a
46 // View request.
47 //
48 // This should be short of the maximum request time allowed by an AppEng ine
49 // handler, else the instance may be terminated.
50 timeout time.Duration
51 }
52
53 // InstallHandlers installs the view handler into the supplied router.
54 func (h *Handler) InstallHandlers(r *httprouter.Router, b middleware.Base) {
55 r.GET("/logs/view/*path", b(h.handle))
56 }
57
58 // handle will render the contents of a log stream to the supplied
59 // ResponseWriter.
60 //
61 // It expects the stream hash/path to be embedded in p as "path".
62 func (h *Handler) handle(c context.Context, w http.ResponseWriter, req *http.Req uest, p httprouter.Params) {
63 if rc := h.handleImpl(c, w, req, p); rc != 0 {
64 w.WriteHeader(rc)
65 }
66 }
67
68 // handleImpl handles the HTTP request. If it returns a non-zero value, that
69 // will be used as the HTTP response code.
70 func (h *Handler) handleImpl(c context.Context, w http.ResponseWriter, req *http .Request, p httprouter.Params) int {
71 // Get the requested log path/hash.
72 path := strings.TrimLeft(p.ByName("path"), "/")
73 log.Fields{
74 "path": path,
75 }.Infof(c, "Request to view log stream.")
76
77 ls, err := coordinator.NewLogStream(path)
78 if err != nil {
79 log.WithError(err).Errorf(c, "Invalid log sream path/hash value supplied.")
80 return http.StatusBadRequest
81 }
82
83 // Set a timeout on our Context.
84 timeout := h.timeout
85 if timeout <= 0 {
86 timeout = DefaultViewTimeout
87 }
88 c, cancelFunc := context.WithTimeout(c, timeout)
89 defer cancelFunc()
90
91 // Load the log stream metadata.
92 if err := ds.Get(c).Get(ls); err != nil {
93 log.WithError(err).Errorf(c, "Failed to load LogStream.")
94
95 switch err {
96 case ds.ErrNoSuchEntity:
97 return http.StatusNotFound
98 default:
99 return http.StatusInternalServerError
100 }
101 }
102
103 // The user can stream purged logs only if they're an administrator. If not,
104 // pretend that the stream does not exist.
105 if ls.Purged {
106 if err := config.IsAdminUser(c); err != nil {
107 log.WithError(err).Warningf(c, "Non-admin user requested purged log stream.")
108 return http.StatusNotFound
109 }
110 }
111
112 // Get our Storage instance.
113 st, err := h.StorageForLogStream(c, ls)
114 if err != nil {
115 log.WithError(err).Errorf(c, "Failed to get Storage instance.")
116 return http.StatusInternalServerError
117 }
118 defer st.Close()
119
120 // Fetch the requested log stream!
121 w.Header().Add("Content-Type", ls.ContentType)
122
123 f := fetcher.New(c, fetcher.Options{
124 Source: &storageLogSource{
125 ls: ls,
126 st: st,
127 },
128 Delay: viewLogDelay,
129 })
130 r := renderer.Renderer{
131 Source: f,
132 Reproduce: true,
133 }
134
135 // Render log data to our HTTP response.
136 count, err := io.Copy(w, &r)
137 if err != nil {
138 log.WithError(err).Errorf(c, "Failed to output log stream.")
139 return http.StatusInternalServerError
140 }
141
142 log.Fields{
143 "bytes": count,
144 }.Infof(c, "Successfully wrote log stream.")
145 return 0
146 }
147
148 // storageLogSource is a stateful incremental implementation of the
149 // fetcher.Source interface which uses direct storage access to pull log entries .
150 //
151 // The Fetcher will respond to Context cancellation, so we don't have to worry
152 // about that here.
153 type storageLogSource struct {
154 ls *coordinator.LogStream
155 st storage.Storage
156 }
157
158 func (f *storageLogSource) LogEntries(c context.Context, req *fetcher.LogRequest ) (
159 logs []*logpb.LogEntry, tidx types.MessageIndex, err error) {
160 // If this log stream hasn't been terminated yet, re-load it and check i f
161 // there is a terminal index.
162 tidx = types.MessageIndex(f.ls.TerminalIndex)
163 if tidx < 0 {
164 log.Debugf(c, "Log stream is non-terminal, refreshing.")
165 if err := ds.Get(c).Get(f.ls); err != nil {
166 log.WithError(err).Errorf(c, "Failed to refresh log stre am.")
167 }
168
169 tidx = types.MessageIndex(f.ls.TerminalIndex)
170 if tidx >= 0 {
171 log.Fields{
172 "terminalIndex": tidx,
173 }.Debugf(c, "Log stream has been terminated.")
174 }
175 }
176
177 sreq := storage.GetRequest{
178 Path: f.ls.Path(),
179 Index: types.MessageIndex(req.Index),
180 }
181
182 // If we have a terminal index, set a limit. This is probably unnecessar y, but
183 // will handle the error case where logs past the terminal index are wri tten
184 // to intermediate storage before a terminal index is registered.
185 if tidx >= 0 {
186 sreq.Limit = int64(tidx - req.Index + 1)
187 if sreq.Limit <= 0 {
188 // No more log records in the stream.
189 return
190 }
191 }
192
193 // Issue the Get request.
194 var logData [][]byte
195 bytes := int64(0)
196 err = f.st.Get(&sreq, func(idx types.MessageIndex, ld []byte) bool {
197 // Enforce contiguous log records.
198 if idx != sreq.Index {
199 return false
200 }
201 logData = append(logData, ld)
202 sreq.Index++
203 bytes += int64(len(ld))
204
205 // Apply byte/count constraints.
206 return (!(req.Count > 0 && int64(len(logData)) >= req.Count) && !(req.Bytes > 0 && bytes >= req.Bytes))
207 })
208 if err != nil {
209 log.Fields{
210 log.ErrorKey: err,
211 "initialIndex": req.Index,
212 "nextIndex": sreq.Index,
213 "count": len(logData),
214 }.Errorf(c, "Failed to fetch logs.")
215 return
216 }
217
218 // Convert the logs to protobufs.
219 logs = make([]*logpb.LogEntry, len(logData))
220 for i, ld := range logData {
221 le := logpb.LogEntry{}
222 if err = proto.Unmarshal(ld, &le); err != nil {
223 log.Fields{
224 log.ErrorKey: err,
225 "logIndex": i,
226 }.Errorf(c, "Failed to unmarshal LogEntry from log data. ")
227 return
228 }
229 logs[i] = &le
230 }
231 return
232 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/endpoints/logs/get_test.go ('k') | appengine/logdog/coordinator/logView/view_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698