OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package logView |
| 6 |
| 7 import ( |
| 8 "io" |
| 9 "net/http" |
| 10 "strings" |
| 11 "time" |
| 12 |
| 13 "github.com/golang/protobuf/proto" |
| 14 "github.com/julienschmidt/httprouter" |
| 15 ds "github.com/luci/gae/service/datastore" |
| 16 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 17 "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
| 18 "github.com/luci/luci-go/common/logdog/fetcher" |
| 19 "github.com/luci/luci-go/common/logdog/renderer" |
| 20 "github.com/luci/luci-go/common/logdog/types" |
| 21 log "github.com/luci/luci-go/common/logging" |
| 22 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 23 "github.com/luci/luci-go/server/logdog/storage" |
| 24 "github.com/luci/luci-go/server/middleware" |
| 25 "golang.org/x/net/context" |
| 26 ) |
| 27 |
| 28 const ( |
| 29 // DefaultViewTimeout is the default value for the Handler's Timeout. |
| 30 // |
| 31 // It targets the managed VM environment, which allows 24 hours. We stop |
| 32 // 10 minutes short of that to avoid conflict. |
| 33 DefaultViewTimeout = (24 * time.Hour) - (10 * time.Minute) |
| 34 |
| 35 // viewLogDelay is the amount of time to wait in between polls when no m
ore |
| 36 // log entries are available during streaming. |
| 37 viewLogDelay = (5 * time.Second) |
| 38 ) |
| 39 |
| 40 // Handler offers an HTTP handler that will return the rendered contents of a |
| 41 // log stream. |
| 42 type Handler struct { |
| 43 coordinator.Service |
| 44 |
| 45 // timeout is the maximum amount of time that we are willing to handle a |
| 46 // View request. |
| 47 // |
| 48 // This should be short of the maximum request time allowed by an AppEng
ine |
| 49 // handler, else the instance may be terminated. |
| 50 timeout time.Duration |
| 51 } |
| 52 |
| 53 // InstallHandlers installs the view handler into the supplied router. |
| 54 func (h *Handler) InstallHandlers(r *httprouter.Router, b middleware.Base) { |
| 55 r.GET("/logs/view/*path", b(h.handle)) |
| 56 } |
| 57 |
| 58 // handle will render the contents of a log stream to the supplied |
| 59 // ResponseWriter. |
| 60 // |
| 61 // It expects the stream hash/path to be embedded in p as "path". |
| 62 func (h *Handler) handle(c context.Context, w http.ResponseWriter, req *http.Req
uest, p httprouter.Params) { |
| 63 if rc := h.handleImpl(c, w, req, p); rc != 0 { |
| 64 w.WriteHeader(rc) |
| 65 } |
| 66 } |
| 67 |
| 68 // handleImpl handles the HTTP request. If it returns a non-zero value, that |
| 69 // will be used as the HTTP response code. |
| 70 func (h *Handler) handleImpl(c context.Context, w http.ResponseWriter, req *http
.Request, p httprouter.Params) int { |
| 71 // Get the requested log path/hash. |
| 72 path := strings.TrimLeft(p.ByName("path"), "/") |
| 73 log.Fields{ |
| 74 "path": path, |
| 75 }.Infof(c, "Request to view log stream.") |
| 76 |
| 77 ls, err := coordinator.NewLogStream(path) |
| 78 if err != nil { |
| 79 log.WithError(err).Errorf(c, "Invalid log sream path/hash value
supplied.") |
| 80 return http.StatusBadRequest |
| 81 } |
| 82 |
| 83 // Set a timeout on our Context. |
| 84 timeout := h.timeout |
| 85 if timeout <= 0 { |
| 86 timeout = DefaultViewTimeout |
| 87 } |
| 88 c, cancelFunc := context.WithTimeout(c, timeout) |
| 89 defer cancelFunc() |
| 90 |
| 91 // Load the log stream metadata. |
| 92 if err := ds.Get(c).Get(ls); err != nil { |
| 93 log.WithError(err).Errorf(c, "Failed to load LogStream.") |
| 94 |
| 95 switch err { |
| 96 case ds.ErrNoSuchEntity: |
| 97 return http.StatusNotFound |
| 98 default: |
| 99 return http.StatusInternalServerError |
| 100 } |
| 101 } |
| 102 |
| 103 // The user can stream purged logs only if they're an administrator. If
not, |
| 104 // pretend that the stream does not exist. |
| 105 if ls.Purged { |
| 106 if err := config.IsAdminUser(c); err != nil { |
| 107 log.WithError(err).Warningf(c, "Non-admin user requested
purged log stream.") |
| 108 return http.StatusNotFound |
| 109 } |
| 110 } |
| 111 |
| 112 // Get our Storage instance. |
| 113 st, err := h.StorageForLogStream(c, ls) |
| 114 if err != nil { |
| 115 log.WithError(err).Errorf(c, "Failed to get Storage instance.") |
| 116 return http.StatusInternalServerError |
| 117 } |
| 118 defer st.Close() |
| 119 |
| 120 // Fetch the requested log stream! |
| 121 w.Header().Add("Content-Type", ls.ContentType) |
| 122 |
| 123 f := fetcher.New(c, fetcher.Options{ |
| 124 Source: &storageLogSource{ |
| 125 ls: ls, |
| 126 st: st, |
| 127 }, |
| 128 Delay: viewLogDelay, |
| 129 }) |
| 130 r := renderer.Renderer{ |
| 131 Source: f, |
| 132 Reproduce: true, |
| 133 } |
| 134 |
| 135 // Render log data to our HTTP response. |
| 136 count, err := io.Copy(w, &r) |
| 137 if err != nil { |
| 138 log.WithError(err).Errorf(c, "Failed to output log stream.") |
| 139 return http.StatusInternalServerError |
| 140 } |
| 141 |
| 142 log.Fields{ |
| 143 "bytes": count, |
| 144 }.Infof(c, "Successfully wrote log stream.") |
| 145 return 0 |
| 146 } |
| 147 |
| 148 // storageLogSource is a stateful incremental implementation of the |
| 149 // fetcher.Source interface which uses direct storage access to pull log entries
. |
| 150 // |
| 151 // The Fetcher will respond to Context cancellation, so we don't have to worry |
| 152 // about that here. |
| 153 type storageLogSource struct { |
| 154 ls *coordinator.LogStream |
| 155 st storage.Storage |
| 156 } |
| 157 |
| 158 func (f *storageLogSource) LogEntries(c context.Context, req *fetcher.LogRequest
) ( |
| 159 logs []*logpb.LogEntry, tidx types.MessageIndex, err error) { |
| 160 // If this log stream hasn't been terminated yet, re-load it and check i
f |
| 161 // there is a terminal index. |
| 162 tidx = types.MessageIndex(f.ls.TerminalIndex) |
| 163 if tidx < 0 { |
| 164 log.Debugf(c, "Log stream is non-terminal, refreshing.") |
| 165 if err := ds.Get(c).Get(f.ls); err != nil { |
| 166 log.WithError(err).Errorf(c, "Failed to refresh log stre
am.") |
| 167 } |
| 168 |
| 169 tidx = types.MessageIndex(f.ls.TerminalIndex) |
| 170 if tidx >= 0 { |
| 171 log.Fields{ |
| 172 "terminalIndex": tidx, |
| 173 }.Debugf(c, "Log stream has been terminated.") |
| 174 } |
| 175 } |
| 176 |
| 177 sreq := storage.GetRequest{ |
| 178 Path: f.ls.Path(), |
| 179 Index: types.MessageIndex(req.Index), |
| 180 } |
| 181 |
| 182 // If we have a terminal index, set a limit. This is probably unnecessar
y, but |
| 183 // will handle the error case where logs past the terminal index are wri
tten |
| 184 // to intermediate storage before a terminal index is registered. |
| 185 if tidx >= 0 { |
| 186 sreq.Limit = int64(tidx - req.Index + 1) |
| 187 if sreq.Limit <= 0 { |
| 188 // No more log records in the stream. |
| 189 return |
| 190 } |
| 191 } |
| 192 |
| 193 // Issue the Get request. |
| 194 var logData [][]byte |
| 195 bytes := int64(0) |
| 196 err = f.st.Get(&sreq, func(idx types.MessageIndex, ld []byte) bool { |
| 197 // Enforce contiguous log records. |
| 198 if idx != sreq.Index { |
| 199 return false |
| 200 } |
| 201 logData = append(logData, ld) |
| 202 sreq.Index++ |
| 203 bytes += int64(len(ld)) |
| 204 |
| 205 // Apply byte/count constraints. |
| 206 return (!(req.Count > 0 && int64(len(logData)) >= req.Count) &&
!(req.Bytes > 0 && bytes >= req.Bytes)) |
| 207 }) |
| 208 if err != nil { |
| 209 log.Fields{ |
| 210 log.ErrorKey: err, |
| 211 "initialIndex": req.Index, |
| 212 "nextIndex": sreq.Index, |
| 213 "count": len(logData), |
| 214 }.Errorf(c, "Failed to fetch logs.") |
| 215 return |
| 216 } |
| 217 |
| 218 // Convert the logs to protobufs. |
| 219 logs = make([]*logpb.LogEntry, len(logData)) |
| 220 for i, ld := range logData { |
| 221 le := logpb.LogEntry{} |
| 222 if err = proto.Unmarshal(ld, &le); err != nil { |
| 223 log.Fields{ |
| 224 log.ErrorKey: err, |
| 225 "logIndex": i, |
| 226 }.Errorf(c, "Failed to unmarshal LogEntry from log data.
") |
| 227 return |
| 228 } |
| 229 logs[i] = &le |
| 230 } |
| 231 return |
| 232 } |
OLD | NEW |