Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1323)

Unified Diff: appengine/logdog/coordinator/logView/view.go

Issue 1672833003: LogDog: Add log rendering view. Base URL: https://github.com/luci/luci-go@master
Patch Set: Clean up, add tests, little reorg. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/logView/view.go
diff --git a/appengine/logdog/coordinator/logView/view.go b/appengine/logdog/coordinator/logView/view.go
new file mode 100644
index 0000000000000000000000000000000000000000..9d36dd930fe008cf058eb473b6585861fcffd34b
--- /dev/null
+++ b/appengine/logdog/coordinator/logView/view.go
@@ -0,0 +1,232 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package logView
+
+import (
+ "io"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/julienschmidt/httprouter"
+ ds "github.com/luci/gae/service/datastore"
+ "github.com/luci/luci-go/appengine/logdog/coordinator"
+ "github.com/luci/luci-go/appengine/logdog/coordinator/config"
+ "github.com/luci/luci-go/common/logdog/fetcher"
+ "github.com/luci/luci-go/common/logdog/renderer"
+ "github.com/luci/luci-go/common/logdog/types"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/proto/logdog/logpb"
+ "github.com/luci/luci-go/server/logdog/storage"
+ "github.com/luci/luci-go/server/middleware"
+ "golang.org/x/net/context"
+)
+
+const (
+ // DefaultViewTimeout is the default value for the Handler's Timeout.
+ //
+ // It targets the managed VM environment, which allows 24 hours. We stop
+ // 10 minutes short of that to avoid conflict.
+ DefaultViewTimeout = (24 * time.Hour) - (10 * time.Minute)
+
+ // viewLogDelay is the amount of time to wait in between polls when no more
+ // log entries are available during streaming.
+ viewLogDelay = (5 * time.Second)
+)
+
+// Handler offers an HTTP handler that will return the rendered contents of a
+// log stream.
+type Handler struct {
+ coordinator.Service
+
+ // timeout is the maximum amount of time that we are willing to handle a
+ // View request.
+ //
+ // This should be short of the maximum request time allowed by an AppEngine
+ // handler, else the instance may be terminated.
+ timeout time.Duration
+}
+
+// InstallHandlers installs the view handler into the supplied router.
+func (h *Handler) InstallHandlers(r *httprouter.Router, b middleware.Base) {
+ r.GET("/logs/view/*path", b(h.handle))
+}
+
+// handle will render the contents of a log stream to the supplied
+// ResponseWriter.
+//
+// It expects the stream hash/path to be embedded in p as "path".
+func (h *Handler) handle(c context.Context, w http.ResponseWriter, req *http.Request, p httprouter.Params) {
+ if rc := h.handleImpl(c, w, req, p); rc != 0 {
+ w.WriteHeader(rc)
+ }
+}
+
+// handleImpl handles the HTTP request. If it returns a non-zero value, that
+// will be used as the HTTP response code.
+func (h *Handler) handleImpl(c context.Context, w http.ResponseWriter, req *http.Request, p httprouter.Params) int {
+ // Get the requested log path/hash.
+ path := strings.TrimLeft(p.ByName("path"), "/")
+ log.Fields{
+ "path": path,
+ }.Infof(c, "Request to view log stream.")
+
+ ls, err := coordinator.NewLogStream(path)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Invalid log sream path/hash value supplied.")
+ return http.StatusBadRequest
+ }
+
+ // Set a timeout on our Context.
+ timeout := h.timeout
+ if timeout <= 0 {
+ timeout = DefaultViewTimeout
+ }
+ c, cancelFunc := context.WithTimeout(c, timeout)
+ defer cancelFunc()
+
+ // Load the log stream metadata.
+ if err := ds.Get(c).Get(ls); err != nil {
+ log.WithError(err).Errorf(c, "Failed to load LogStream.")
+
+ switch err {
+ case ds.ErrNoSuchEntity:
+ return http.StatusNotFound
+ default:
+ return http.StatusInternalServerError
+ }
+ }
+
+ // The user can stream purged logs only if they're an administrator. If not,
+ // pretend that the stream does not exist.
+ if ls.Purged {
+ if err := config.IsAdminUser(c); err != nil {
+ log.WithError(err).Warningf(c, "Non-admin user requested purged log stream.")
+ return http.StatusNotFound
+ }
+ }
+
+ // Get our Storage instance.
+ st, err := h.StorageForLogStream(c, ls)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to get Storage instance.")
+ return http.StatusInternalServerError
+ }
+ defer st.Close()
+
+ // Fetch the requested log stream!
+ w.Header().Add("Content-Type", ls.ContentType)
+
+ f := fetcher.New(c, fetcher.Options{
+ Source: &storageLogSource{
+ ls: ls,
+ st: st,
+ },
+ Delay: viewLogDelay,
+ })
+ r := renderer.Renderer{
+ Source: f,
+ Reproduce: true,
+ }
+
+ // Render log data to our HTTP response.
+ count, err := io.Copy(w, &r)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to output log stream.")
+ return http.StatusInternalServerError
+ }
+
+ log.Fields{
+ "bytes": count,
+ }.Infof(c, "Successfully wrote log stream.")
+ return 0
+}
+
+// storageLogSource is a stateful incremental implementation of the
+// fetcher.Source interface which uses direct storage access to pull log entries.
+//
+// The Fetcher will respond to Context cancellation, so we don't have to worry
+// about that here.
+type storageLogSource struct {
+ ls *coordinator.LogStream
+ st storage.Storage
+}
+
+func (f *storageLogSource) LogEntries(c context.Context, req *fetcher.LogRequest) (
+ logs []*logpb.LogEntry, tidx types.MessageIndex, err error) {
+ // If this log stream hasn't been terminated yet, re-load it and check if
+ // there is a terminal index.
+ tidx = types.MessageIndex(f.ls.TerminalIndex)
+ if tidx < 0 {
+ log.Debugf(c, "Log stream is non-terminal, refreshing.")
+ if err := ds.Get(c).Get(f.ls); err != nil {
+ log.WithError(err).Errorf(c, "Failed to refresh log stream.")
+ }
+
+ tidx = types.MessageIndex(f.ls.TerminalIndex)
+ if tidx >= 0 {
+ log.Fields{
+ "terminalIndex": tidx,
+ }.Debugf(c, "Log stream has been terminated.")
+ }
+ }
+
+ sreq := storage.GetRequest{
+ Path: f.ls.Path(),
+ Index: types.MessageIndex(req.Index),
+ }
+
+ // If we have a terminal index, set a limit. This is probably unnecessary, but
+ // will handle the error case where logs past the terminal index are written
+ // to intermediate storage before a terminal index is registered.
+ if tidx >= 0 {
+ sreq.Limit = int64(tidx - req.Index + 1)
+ if sreq.Limit <= 0 {
+ // No more log records in the stream.
+ return
+ }
+ }
+
+ // Issue the Get request.
+ var logData [][]byte
+ bytes := int64(0)
+ err = f.st.Get(&sreq, func(idx types.MessageIndex, ld []byte) bool {
+ // Enforce contiguous log records.
+ if idx != sreq.Index {
+ return false
+ }
+ logData = append(logData, ld)
+ sreq.Index++
+ bytes += int64(len(ld))
+
+ // Apply byte/count constraints.
+ return (!(req.Count > 0 && int64(len(logData)) >= req.Count) && !(req.Bytes > 0 && bytes >= req.Bytes))
+ })
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "initialIndex": req.Index,
+ "nextIndex": sreq.Index,
+ "count": len(logData),
+ }.Errorf(c, "Failed to fetch logs.")
+ return
+ }
+
+ // Convert the logs to protobufs.
+ logs = make([]*logpb.LogEntry, len(logData))
+ for i, ld := range logData {
+ le := logpb.LogEntry{}
+ if err = proto.Unmarshal(ld, &le); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "logIndex": i,
+ }.Errorf(c, "Failed to unmarshal LogEntry from log data.")
+ return
+ }
+ logs[i] = &le
+ }
+ return
+}
« no previous file with comments | « appengine/logdog/coordinator/endpoints/logs/get_test.go ('k') | appengine/logdog/coordinator/logView/view_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698