Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(149)

Side by Side Diff: appengine/logdog/coordinator/logView/view.go

Issue 1672833003: LogDog: Add log rendering view. Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
Ryan Tseng 2016/02/08 22:56:55 2016 /happy new year
dnj (Google) 2016/02/09 02:50:03 Done.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package logView
6
7 import (
8 "io"
9 "net/http"
10 "strings"
11 "time"
12
13 "github.com/golang/protobuf/proto"
14 "github.com/julienschmidt/httprouter"
15 ds "github.com/luci/gae/service/datastore"
16 "github.com/luci/luci-go/appengine/logdog/coordinator"
17 "github.com/luci/luci-go/appengine/logdog/coordinator/config"
18 "github.com/luci/luci-go/common/clock"
19 "github.com/luci/luci-go/common/logdog/fetcher"
20 "github.com/luci/luci-go/common/logdog/renderer"
21 "github.com/luci/luci-go/common/logdog/types"
22 log "github.com/luci/luci-go/common/logging"
23 "github.com/luci/luci-go/common/proto/logdog/logpb"
24 "github.com/luci/luci-go/server/logdog/storage"
25 "golang.org/x/net/context"
26 )
27
28 const (
29 // DefaultViewTimeout is the default value for the Handler's Timeout.
30 //
31 // It targets the managed VM environment, which allows 24 hours. We stop
32 // 10 minutes short of that to avoid conflict.
33 DefaultViewTimeout = (24 * time.Hour) - (10 * time.Minute)
34
35 // viewLogDelay is the amount of time to wait in between polls when no m ore
36 // log entries are available during streaming.
37 viewLogDelay = (5 * time.Second)
38 )
39
40 // Handler offers an HTTP handler that will return the rendered contents of a
41 // log stream.
42 type Handler struct {
43 coordinator.Service
44
45 // timeout is the maximum amount of time that we are willing to handle a
46 // View request.
47 //
48 // This should be short of the maximum request time allowed by an AppEng ine
49 // handler, else the instance may be terminated.
50 timeout time.Duration
51 }
52
53 // Handle will render the contents of a log stream to the supplied
54 // ResponseWriter.
55 //
56 // It expects the stream hash/path to be embedded in p as "path".
57 func (h *Handler) Handle(c context.Context, w http.ResponseWriter, req *http.Req uest, p httprouter.Params) {
58 rc := h.handleImpl(c, w, req, p)
59 w.WriteHeader(rc)
60 }
61
62 func (h *Handler) handleImpl(c context.Context, w http.ResponseWriter, req *http .Request, p httprouter.Params) int {
63 // Get the requested log path/hash.
64 path := strings.TrimLeft(p.ByName("path"), "/")
65 log.Fields{
66 "path": path,
67 }.Infof(c, "Request to view log stream.")
68
69 ls, err := coordinator.NewLogStream(path)
70 if err != nil {
71 log.WithError(err).Errorf(c, "Invalid log sream path/hash value supplied.")
72 return http.StatusBadRequest
73 }
74
75 // Set a timeout on our Context.
76 timeout := h.timeout
77 if timeout <= 0 {
78 timeout = DefaultViewTimeout
79 }
80 c, cancelFunc := context.WithTimeout(c, timeout)
81 defer cancelFunc()
82
83 // Load the log stream metadata.
84 if err := ds.Get(c).Get(&ls); err != nil {
85 log.WithError(err).Errorf(c, "Failed to load LogStream.")
86
87 switch err {
88 case ds.ErrNoSuchEntity:
89 return http.StatusNotFound
90 default:
91 return http.StatusInternalServerError
92 }
93 }
94
95 // The user can stream purged logs only if they're an administrator. If not,
96 // pretend that the stream does not exist.
97 if ls.Purged {
98 if err := config.IsAdminUser(c); err != nil {
99 log.WithError(err).Warningf(c, "Non-admin user requested purged log stream.")
100 return http.StatusNotFound
101 }
102 }
103
104 // Get our Storage instance.
105 st, err := h.GetLogStreamStorage(c, ls)
106 if err != nil {
107 log.WithError(err).Errorf(c, "Failed to get Storage instance.")
108 return http.StatusInternalServerError
109 }
110 defer st.Close()
111
112 // Fetch the requested log stream!
113 w.Header().Add("Content-Type", ls.ContentType)
114
115 fetchCtx, cancelFunc := context.WithCancel(c)
Ryan Tseng 2016/02/08 22:56:55 cancelFunc isn't used anywhere
dnj (Google) 2016/02/09 02:50:03 Done.
116 f := fetcher.New(fetchCtx, fetcher.Options{
117 Source: &storageLogSource{
118 ls: ls,
119 st: st,
120 },
121 Delay: viewLogDelay,
122 })
123 r := renderer.Renderer{
124 Source: f,
125 Reproduce: true,
126 }
127
128 // Render log data to our HTTP response.
129 count, err := io.Copy(w, &r)
130 if err != nil {
131 log.WithError(err).Errorf(c, "Failed to output log stream.")
132 return http.StatusInternalServerError
133 }
134
135 log.Fields{
136 "bytes": count,
137 }.Infof(c, "Successfully wrote log stream.")
138 return http.StatusOK
139 }
140
141 // storageLogSource is a stateful incremental implementation of the
142 // fetcher.Source interface which uses direct storage access to pull log entries .
143 //
144 // The Fetcher will respond to Context cancellation, so we don't have to worry
145 // about that here.
146 type storageLogSource struct {
147 ls *coordinator.LogStream
148 st storage.Storage
149 }
150
151 func (f *storageLogSource) LogEntries(c context.Context, req *fetcher.LogRequest ) (
Ryan Tseng 2016/02/08 22:56:55 docstring (lint should catch this)
dnj (Google) 2016/02/09 02:50:03 It's a public method on a private struct, so that'
152 logs []*logpb.LogEntry, tidx types.MessageIndex, err error) {
153 // If this log stream hasn't been terminated yet, re-load it and check i f
154 // there is a terminal index.
155 tidx = types.MessageIndex(f.ls.TerminalIndex)
156 if tidx < 0 {
157 log.Debugf(c, "Log stream is non-terminal, refreshing.")
158 if err := ds.Get(c).Get(f.ls); err != nil {
159 log.WithError(err).Errorf(c, "Failed to refresh log stre am.")
160 }
161
162 tidx = types.MessageIndex(f.ls.TerminalIndex)
163 if tidx >= 0 {
164 log.Fields{
165 "terminalIndex": tidx,
166 }.Debugf(c, "Log stream has been terminated.")
167 }
168 }
169
170 sreq := storage.GetRequest{
171 Path: f.ls.Path(),
172 Index: types.MessageIndex(req.Index),
173 }
174
175 // If we have a terminal index, set a limit. This is probably unnecessar y, but
176 // will handle the error case where logs past the terminal index are wri tten
177 // to intermediate storage before a terminal index is registered.
178 if tidx >= 0 {
179 sreq.Limit = int64(tidx - req.Index + 1)
180 if sreq.Limit <= 0 {
181 // No more log records in the stream.
182 return
183 }
184 }
185
186 // Issue the Get request.
Ryan Tseng 2016/02/08 22:56:55 presumably for all chunks of a logstream?
dnj (Google) 2016/02/09 02:50:03 Well, for the ones requested via "req".
187 var logData [][]byte
188 bytes := int64(0)
189 err = f.st.Get(&sreq, func(idx types.MessageIndex, ld []byte) bool {
190 // Enforce contiguous log records.
191 if idx != sreq.Index {
192 return false
193 }
194 logData = append(logData, ld)
195 sreq.Index++
196 bytes += int64(len(ld))
197
198 // Apply byte/count constraints.
199 return (!(req.Count > 0 && int64(len(logData)) >= req.Count) && !(req.Bytes > 0 && bytes >= req.Bytes))
200 })
201 if err != nil {
202 log.Fields{
203 log.ErrorKey: err,
204 "initialIndex": req.Index,
205 "nextIndex": sreq.Index,
206 "count": len(logData),
207 }.Errorf(c, "Failed to fetch logs.")
208 return
209 }
210
211 // Convert the logs to protobufs.
212 logs = make([]*logpb.LogEntry, len(logData))
213 for i, ld := range logData {
214 le := logpb.LogEntry{}
215 if err = proto.Unmarshal(ld, &le); err != nil {
216 log.Fields{
217 log.ErrorKey: err,
218 "logIndex": i,
219 }.Errorf(c, "Failed to unmarshal LogEntry from log data. ")
220 return
221 }
222 logs[i] = &le
223 }
224 return
225 }
226
227 // cancelSleep will block for the specified duration, or until the supplied
228 // Context is done.
229 func cancelSleep(c context.Context, d time.Duration) error {
230 t := clock.NewTimer(c)
231 t.Reset(d)
232 defer t.Stop()
233
234 select {
235 case <-c.Done():
236 return c.Err()
237 case <-t.GetC():
238 return nil
239 }
240 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698