Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
|
Ryan Tseng
2016/02/08 22:56:55
2016 /happy new year
dnj (Google)
2016/02/09 02:50:03
Done.
| |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package logView | |
| 6 | |
| 7 import ( | |
| 8 "io" | |
| 9 "net/http" | |
| 10 "strings" | |
| 11 "time" | |
| 12 | |
| 13 "github.com/golang/protobuf/proto" | |
| 14 "github.com/julienschmidt/httprouter" | |
| 15 ds "github.com/luci/gae/service/datastore" | |
| 16 "github.com/luci/luci-go/appengine/logdog/coordinator" | |
| 17 "github.com/luci/luci-go/appengine/logdog/coordinator/config" | |
| 18 "github.com/luci/luci-go/common/clock" | |
| 19 "github.com/luci/luci-go/common/logdog/fetcher" | |
| 20 "github.com/luci/luci-go/common/logdog/renderer" | |
| 21 "github.com/luci/luci-go/common/logdog/types" | |
| 22 log "github.com/luci/luci-go/common/logging" | |
| 23 "github.com/luci/luci-go/common/proto/logdog/logpb" | |
| 24 "github.com/luci/luci-go/server/logdog/storage" | |
| 25 "golang.org/x/net/context" | |
| 26 ) | |
| 27 | |
| 28 const ( | |
| 29 // DefaultViewTimeout is the default value for the Handler's Timeout. | |
| 30 // | |
| 31 // It targets the managed VM environment, which allows 24 hours. We stop | |
| 32 // 10 minutes short of that to avoid conflict. | |
| 33 DefaultViewTimeout = (24 * time.Hour) - (10 * time.Minute) | |
| 34 | |
| 35 // viewLogDelay is the amount of time to wait in between polls when no m ore | |
| 36 // log entries are available during streaming. | |
| 37 viewLogDelay = (5 * time.Second) | |
| 38 ) | |
| 39 | |
| 40 // Handler offers an HTTP handler that will return the rendered contents of a | |
| 41 // log stream. | |
| 42 type Handler struct { | |
| 43 coordinator.Service | |
| 44 | |
| 45 // timeout is the maximum amount of time that we are willing to handle a | |
| 46 // View request. | |
| 47 // | |
| 48 // This should be short of the maximum request time allowed by an AppEng ine | |
| 49 // handler, else the instance may be terminated. | |
| 50 timeout time.Duration | |
| 51 } | |
| 52 | |
| 53 // Handle will render the contents of a log stream to the supplied | |
| 54 // ResponseWriter. | |
| 55 // | |
| 56 // It expects the stream hash/path to be embedded in p as "path". | |
| 57 func (h *Handler) Handle(c context.Context, w http.ResponseWriter, req *http.Req uest, p httprouter.Params) { | |
| 58 rc := h.handleImpl(c, w, req, p) | |
| 59 w.WriteHeader(rc) | |
| 60 } | |
| 61 | |
| 62 func (h *Handler) handleImpl(c context.Context, w http.ResponseWriter, req *http .Request, p httprouter.Params) int { | |
| 63 // Get the requested log path/hash. | |
| 64 path := strings.TrimLeft(p.ByName("path"), "/") | |
| 65 log.Fields{ | |
| 66 "path": path, | |
| 67 }.Infof(c, "Request to view log stream.") | |
| 68 | |
| 69 ls, err := coordinator.NewLogStream(path) | |
| 70 if err != nil { | |
| 71 log.WithError(err).Errorf(c, "Invalid log sream path/hash value supplied.") | |
| 72 return http.StatusBadRequest | |
| 73 } | |
| 74 | |
| 75 // Set a timeout on our Context. | |
| 76 timeout := h.timeout | |
| 77 if timeout <= 0 { | |
| 78 timeout = DefaultViewTimeout | |
| 79 } | |
| 80 c, cancelFunc := context.WithTimeout(c, timeout) | |
| 81 defer cancelFunc() | |
| 82 | |
| 83 // Load the log stream metadata. | |
| 84 if err := ds.Get(c).Get(&ls); err != nil { | |
| 85 log.WithError(err).Errorf(c, "Failed to load LogStream.") | |
| 86 | |
| 87 switch err { | |
| 88 case ds.ErrNoSuchEntity: | |
| 89 return http.StatusNotFound | |
| 90 default: | |
| 91 return http.StatusInternalServerError | |
| 92 } | |
| 93 } | |
| 94 | |
| 95 // The user can stream purged logs only if they're an administrator. If not, | |
| 96 // pretend that the stream does not exist. | |
| 97 if ls.Purged { | |
| 98 if err := config.IsAdminUser(c); err != nil { | |
| 99 log.WithError(err).Warningf(c, "Non-admin user requested purged log stream.") | |
| 100 return http.StatusNotFound | |
| 101 } | |
| 102 } | |
| 103 | |
| 104 // Get our Storage instance. | |
| 105 st, err := h.GetLogStreamStorage(c, ls) | |
| 106 if err != nil { | |
| 107 log.WithError(err).Errorf(c, "Failed to get Storage instance.") | |
| 108 return http.StatusInternalServerError | |
| 109 } | |
| 110 defer st.Close() | |
| 111 | |
| 112 // Fetch the requested log stream! | |
| 113 w.Header().Add("Content-Type", ls.ContentType) | |
| 114 | |
| 115 fetchCtx, cancelFunc := context.WithCancel(c) | |
|
Ryan Tseng
2016/02/08 22:56:55
cancelFunc isn't used anywhere
dnj (Google)
2016/02/09 02:50:03
Done.
| |
| 116 f := fetcher.New(fetchCtx, fetcher.Options{ | |
| 117 Source: &storageLogSource{ | |
| 118 ls: ls, | |
| 119 st: st, | |
| 120 }, | |
| 121 Delay: viewLogDelay, | |
| 122 }) | |
| 123 r := renderer.Renderer{ | |
| 124 Source: f, | |
| 125 Reproduce: true, | |
| 126 } | |
| 127 | |
| 128 // Render log data to our HTTP response. | |
| 129 count, err := io.Copy(w, &r) | |
| 130 if err != nil { | |
| 131 log.WithError(err).Errorf(c, "Failed to output log stream.") | |
| 132 return http.StatusInternalServerError | |
| 133 } | |
| 134 | |
| 135 log.Fields{ | |
| 136 "bytes": count, | |
| 137 }.Infof(c, "Successfully wrote log stream.") | |
| 138 return http.StatusOK | |
| 139 } | |
| 140 | |
| 141 // storageLogSource is a stateful incremental implementation of the | |
| 142 // fetcher.Source interface which uses direct storage access to pull log entries . | |
| 143 // | |
| 144 // The Fetcher will respond to Context cancellation, so we don't have to worry | |
| 145 // about that here. | |
| 146 type storageLogSource struct { | |
| 147 ls *coordinator.LogStream | |
| 148 st storage.Storage | |
| 149 } | |
| 150 | |
| 151 func (f *storageLogSource) LogEntries(c context.Context, req *fetcher.LogRequest ) ( | |
|
Ryan Tseng
2016/02/08 22:56:55
docstring (lint should catch this)
dnj (Google)
2016/02/09 02:50:03
It's a public method on a private struct, so that'
| |
| 152 logs []*logpb.LogEntry, tidx types.MessageIndex, err error) { | |
| 153 // If this log stream hasn't been terminated yet, re-load it and check i f | |
| 154 // there is a terminal index. | |
| 155 tidx = types.MessageIndex(f.ls.TerminalIndex) | |
| 156 if tidx < 0 { | |
| 157 log.Debugf(c, "Log stream is non-terminal, refreshing.") | |
| 158 if err := ds.Get(c).Get(f.ls); err != nil { | |
| 159 log.WithError(err).Errorf(c, "Failed to refresh log stre am.") | |
| 160 } | |
| 161 | |
| 162 tidx = types.MessageIndex(f.ls.TerminalIndex) | |
| 163 if tidx >= 0 { | |
| 164 log.Fields{ | |
| 165 "terminalIndex": tidx, | |
| 166 }.Debugf(c, "Log stream has been terminated.") | |
| 167 } | |
| 168 } | |
| 169 | |
| 170 sreq := storage.GetRequest{ | |
| 171 Path: f.ls.Path(), | |
| 172 Index: types.MessageIndex(req.Index), | |
| 173 } | |
| 174 | |
| 175 // If we have a terminal index, set a limit. This is probably unnecessar y, but | |
| 176 // will handle the error case where logs past the terminal index are wri tten | |
| 177 // to intermediate storage before a terminal index is registered. | |
| 178 if tidx >= 0 { | |
| 179 sreq.Limit = int64(tidx - req.Index + 1) | |
| 180 if sreq.Limit <= 0 { | |
| 181 // No more log records in the stream. | |
| 182 return | |
| 183 } | |
| 184 } | |
| 185 | |
| 186 // Issue the Get request. | |
|
Ryan Tseng
2016/02/08 22:56:55
presumably for all chunks of a logstream?
dnj (Google)
2016/02/09 02:50:03
Well, for the ones requested via "req".
| |
| 187 var logData [][]byte | |
| 188 bytes := int64(0) | |
| 189 err = f.st.Get(&sreq, func(idx types.MessageIndex, ld []byte) bool { | |
| 190 // Enforce contiguous log records. | |
| 191 if idx != sreq.Index { | |
| 192 return false | |
| 193 } | |
| 194 logData = append(logData, ld) | |
| 195 sreq.Index++ | |
| 196 bytes += int64(len(ld)) | |
| 197 | |
| 198 // Apply byte/count constraints. | |
| 199 return (!(req.Count > 0 && int64(len(logData)) >= req.Count) && !(req.Bytes > 0 && bytes >= req.Bytes)) | |
| 200 }) | |
| 201 if err != nil { | |
| 202 log.Fields{ | |
| 203 log.ErrorKey: err, | |
| 204 "initialIndex": req.Index, | |
| 205 "nextIndex": sreq.Index, | |
| 206 "count": len(logData), | |
| 207 }.Errorf(c, "Failed to fetch logs.") | |
| 208 return | |
| 209 } | |
| 210 | |
| 211 // Convert the logs to protobufs. | |
| 212 logs = make([]*logpb.LogEntry, len(logData)) | |
| 213 for i, ld := range logData { | |
| 214 le := logpb.LogEntry{} | |
| 215 if err = proto.Unmarshal(ld, &le); err != nil { | |
| 216 log.Fields{ | |
| 217 log.ErrorKey: err, | |
| 218 "logIndex": i, | |
| 219 }.Errorf(c, "Failed to unmarshal LogEntry from log data. ") | |
| 220 return | |
| 221 } | |
| 222 logs[i] = &le | |
| 223 } | |
| 224 return | |
| 225 } | |
| 226 | |
| 227 // cancelSleep will block for the specified duration, or until the supplied | |
| 228 // Context is done. | |
| 229 func cancelSleep(c context.Context, d time.Duration) error { | |
| 230 t := clock.NewTimer(c) | |
| 231 t.Reset(d) | |
| 232 defer t.Stop() | |
| 233 | |
| 234 select { | |
| 235 case <-c.Done(): | |
| 236 return c.Err() | |
| 237 case <-t.GetC(): | |
| 238 return nil | |
| 239 } | |
| 240 } | |
| OLD | NEW |