Index: appengine/logdog/coordinator/logView/view.go |
diff --git a/appengine/logdog/coordinator/logView/view.go b/appengine/logdog/coordinator/logView/view.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..9d36dd930fe008cf058eb473b6585861fcffd34b |
--- /dev/null |
+++ b/appengine/logdog/coordinator/logView/view.go |
@@ -0,0 +1,232 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package logView |
+ |
+import ( |
+ "io" |
+ "net/http" |
+ "strings" |
+ "time" |
+ |
+ "github.com/golang/protobuf/proto" |
+ "github.com/julienschmidt/httprouter" |
+ ds "github.com/luci/gae/service/datastore" |
+ "github.com/luci/luci-go/appengine/logdog/coordinator" |
+ "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
+ "github.com/luci/luci-go/common/logdog/fetcher" |
+ "github.com/luci/luci-go/common/logdog/renderer" |
+ "github.com/luci/luci-go/common/logdog/types" |
+ log "github.com/luci/luci-go/common/logging" |
+ "github.com/luci/luci-go/common/proto/logdog/logpb" |
+ "github.com/luci/luci-go/server/logdog/storage" |
+ "github.com/luci/luci-go/server/middleware" |
+ "golang.org/x/net/context" |
+) |
+ |
+const ( |
+ // DefaultViewTimeout is the default value for the Handler's Timeout. |
+ // |
+ // It targets the managed VM environment, which allows 24 hours. We stop |
+ // 10 minutes short of that to avoid conflict. |
+ DefaultViewTimeout = (24 * time.Hour) - (10 * time.Minute) |
+ |
+ // viewLogDelay is the amount of time to wait in between polls when no more |
+ // log entries are available during streaming. |
+ viewLogDelay = (5 * time.Second) |
+) |
+ |
+// Handler offers an HTTP handler that will return the rendered contents of a |
+// log stream. |
+type Handler struct { |
+ coordinator.Service |
+ |
+ // timeout is the maximum amount of time that we are willing to handle a |
+ // View request. |
+ // |
+ // This should be short of the maximum request time allowed by an AppEngine |
+ // handler, else the instance may be terminated. |
+ timeout time.Duration |
+} |
+ |
+// InstallHandlers installs the view handler into the supplied router. |
+func (h *Handler) InstallHandlers(r *httprouter.Router, b middleware.Base) { |
+ r.GET("/logs/view/*path", b(h.handle)) |
+} |
+ |
+// handle will render the contents of a log stream to the supplied |
+// ResponseWriter. |
+// |
+// It expects the stream hash/path to be embedded in p as "path". |
+func (h *Handler) handle(c context.Context, w http.ResponseWriter, req *http.Request, p httprouter.Params) { |
+ if rc := h.handleImpl(c, w, req, p); rc != 0 { |
+ w.WriteHeader(rc) |
+ } |
+} |
+ |
+// handleImpl handles the HTTP request. If it returns a non-zero value, that |
+// will be used as the HTTP response code. |
+func (h *Handler) handleImpl(c context.Context, w http.ResponseWriter, req *http.Request, p httprouter.Params) int { |
+ // Get the requested log path/hash. |
+ path := strings.TrimLeft(p.ByName("path"), "/") |
+ log.Fields{ |
+ "path": path, |
+ }.Infof(c, "Request to view log stream.") |
+ |
+ ls, err := coordinator.NewLogStream(path) |
+ if err != nil { |
+ log.WithError(err).Errorf(c, "Invalid log sream path/hash value supplied.") |
+ return http.StatusBadRequest |
+ } |
+ |
+ // Set a timeout on our Context. |
+ timeout := h.timeout |
+ if timeout <= 0 { |
+ timeout = DefaultViewTimeout |
+ } |
+ c, cancelFunc := context.WithTimeout(c, timeout) |
+ defer cancelFunc() |
+ |
+ // Load the log stream metadata. |
+ if err := ds.Get(c).Get(ls); err != nil { |
+ log.WithError(err).Errorf(c, "Failed to load LogStream.") |
+ |
+ switch err { |
+ case ds.ErrNoSuchEntity: |
+ return http.StatusNotFound |
+ default: |
+ return http.StatusInternalServerError |
+ } |
+ } |
+ |
+ // The user can stream purged logs only if they're an administrator. If not, |
+ // pretend that the stream does not exist. |
+ if ls.Purged { |
+ if err := config.IsAdminUser(c); err != nil { |
+ log.WithError(err).Warningf(c, "Non-admin user requested purged log stream.") |
+ return http.StatusNotFound |
+ } |
+ } |
+ |
+ // Get our Storage instance. |
+ st, err := h.StorageForLogStream(c, ls) |
+ if err != nil { |
+ log.WithError(err).Errorf(c, "Failed to get Storage instance.") |
+ return http.StatusInternalServerError |
+ } |
+ defer st.Close() |
+ |
+ // Fetch the requested log stream! |
+ w.Header().Add("Content-Type", ls.ContentType) |
+ |
+ f := fetcher.New(c, fetcher.Options{ |
+ Source: &storageLogSource{ |
+ ls: ls, |
+ st: st, |
+ }, |
+ Delay: viewLogDelay, |
+ }) |
+ r := renderer.Renderer{ |
+ Source: f, |
+ Reproduce: true, |
+ } |
+ |
+ // Render log data to our HTTP response. |
+ count, err := io.Copy(w, &r) |
+ if err != nil { |
+ log.WithError(err).Errorf(c, "Failed to output log stream.") |
+ return http.StatusInternalServerError |
+ } |
+ |
+ log.Fields{ |
+ "bytes": count, |
+ }.Infof(c, "Successfully wrote log stream.") |
+ return 0 |
+} |
+ |
+// storageLogSource is a stateful incremental implementation of the |
+// fetcher.Source interface which uses direct storage access to pull log entries. |
+// |
+// The Fetcher will respond to Context cancellation, so we don't have to worry |
+// about that here. |
+type storageLogSource struct { |
+ ls *coordinator.LogStream |
+ st storage.Storage |
+} |
+ |
+func (f *storageLogSource) LogEntries(c context.Context, req *fetcher.LogRequest) ( |
+ logs []*logpb.LogEntry, tidx types.MessageIndex, err error) { |
+ // If this log stream hasn't been terminated yet, re-load it and check if |
+ // there is a terminal index. |
+ tidx = types.MessageIndex(f.ls.TerminalIndex) |
+ if tidx < 0 { |
+ log.Debugf(c, "Log stream is non-terminal, refreshing.") |
+ if err := ds.Get(c).Get(f.ls); err != nil { |
+ log.WithError(err).Errorf(c, "Failed to refresh log stream.") |
+ } |
+ |
+ tidx = types.MessageIndex(f.ls.TerminalIndex) |
+ if tidx >= 0 { |
+ log.Fields{ |
+ "terminalIndex": tidx, |
+ }.Debugf(c, "Log stream has been terminated.") |
+ } |
+ } |
+ |
+ sreq := storage.GetRequest{ |
+ Path: f.ls.Path(), |
+ Index: types.MessageIndex(req.Index), |
+ } |
+ |
+ // If we have a terminal index, set a limit. This is probably unnecessary, but |
+ // will handle the error case where logs past the terminal index are written |
+ // to intermediate storage before a terminal index is registered. |
+ if tidx >= 0 { |
+ sreq.Limit = int64(tidx - req.Index + 1) |
+ if sreq.Limit <= 0 { |
+ // No more log records in the stream. |
+ return |
+ } |
+ } |
+ |
+ // Issue the Get request. |
+ var logData [][]byte |
+ bytes := int64(0) |
+ err = f.st.Get(&sreq, func(idx types.MessageIndex, ld []byte) bool { |
+ // Enforce contiguous log records. |
+ if idx != sreq.Index { |
+ return false |
+ } |
+ logData = append(logData, ld) |
+ sreq.Index++ |
+ bytes += int64(len(ld)) |
+ |
+ // Apply byte/count constraints. |
+ return (!(req.Count > 0 && int64(len(logData)) >= req.Count) && !(req.Bytes > 0 && bytes >= req.Bytes)) |
+ }) |
+ if err != nil { |
+ log.Fields{ |
+ log.ErrorKey: err, |
+ "initialIndex": req.Index, |
+ "nextIndex": sreq.Index, |
+ "count": len(logData), |
+ }.Errorf(c, "Failed to fetch logs.") |
+ return |
+ } |
+ |
+ // Convert the logs to protobufs. |
+ logs = make([]*logpb.LogEntry, len(logData)) |
+ for i, ld := range logData { |
+ le := logpb.LogEntry{} |
+ if err = proto.Unmarshal(ld, &le); err != nil { |
+ log.Fields{ |
+ log.ErrorKey: err, |
+ "logIndex": i, |
+ }.Errorf(c, "Failed to unmarshal LogEntry from log data.") |
+ return |
+ } |
+ logs[i] = &le |
+ } |
+ return |
+} |