Index: appengine/logdog/coordinator/logView/view.go |
diff --git a/appengine/logdog/coordinator/logView/view.go b/appengine/logdog/coordinator/logView/view.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..b55a69012e607b87a3afb88590818caffb5e736d |
--- /dev/null |
+++ b/appengine/logdog/coordinator/logView/view.go |
@@ -0,0 +1,240 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
Ryan Tseng
2016/02/08 22:56:55
2016 /happy new year
dnj (Google)
2016/02/09 02:50:03
Done.
|
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package logView |
+ |
+import ( |
+ "io" |
+ "net/http" |
+ "strings" |
+ "time" |
+ |
+ "github.com/golang/protobuf/proto" |
+ "github.com/julienschmidt/httprouter" |
+ ds "github.com/luci/gae/service/datastore" |
+ "github.com/luci/luci-go/appengine/logdog/coordinator" |
+ "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
+ "github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/logdog/fetcher" |
+ "github.com/luci/luci-go/common/logdog/renderer" |
+ "github.com/luci/luci-go/common/logdog/types" |
+ log "github.com/luci/luci-go/common/logging" |
+ "github.com/luci/luci-go/common/proto/logdog/logpb" |
+ "github.com/luci/luci-go/server/logdog/storage" |
+ "golang.org/x/net/context" |
+) |
+ |
+const ( |
+ // DefaultViewTimeout is the default value for the Handler's Timeout. |
+ // |
+ // It targets the managed VM environment, which allows 24 hours. We stop |
+ // 10 minutes short of that to avoid conflict. |
+ DefaultViewTimeout = (24 * time.Hour) - (10 * time.Minute) |
+ |
+ // viewLogDelay is the amount of time to wait in between polls when no more |
+ // log entries are available during streaming. |
+ viewLogDelay = (5 * time.Second) |
+) |
+ |
+// Handler offers an HTTP handler that will return the rendered contents of a |
+// log stream. |
+type Handler struct { |
+ coordinator.Service |
+ |
+ // timeout is the maximum amount of time that we are willing to handle a |
+ // View request. |
+ // |
+ // This should be short of the maximum request time allowed by an AppEngine |
+ // handler, else the instance may be terminated. |
+ timeout time.Duration |
+} |
+ |
+// Handle will render the contents of a log stream to the supplied |
+// ResponseWriter. |
+// |
+// It expects the stream hash/path to be embedded in p as "path". |
+func (h *Handler) Handle(c context.Context, w http.ResponseWriter, req *http.Request, p httprouter.Params) { |
+ rc := h.handleImpl(c, w, req, p) |
+ w.WriteHeader(rc) |
+} |
+ |
+func (h *Handler) handleImpl(c context.Context, w http.ResponseWriter, req *http.Request, p httprouter.Params) int { |
+ // Get the requested log path/hash. |
+ path := strings.TrimLeft(p.ByName("path"), "/") |
+ log.Fields{ |
+ "path": path, |
+ }.Infof(c, "Request to view log stream.") |
+ |
+ ls, err := coordinator.NewLogStream(path) |
+ if err != nil { |
+ log.WithError(err).Errorf(c, "Invalid log sream path/hash value supplied.") |
+ return http.StatusBadRequest |
+ } |
+ |
+ // Set a timeout on our Context. |
+ timeout := h.timeout |
+ if timeout <= 0 { |
+ timeout = DefaultViewTimeout |
+ } |
+ c, cancelFunc := context.WithTimeout(c, timeout) |
+ defer cancelFunc() |
+ |
+ // Load the log stream metadata. |
+ if err := ds.Get(c).Get(&ls); err != nil { |
+ log.WithError(err).Errorf(c, "Failed to load LogStream.") |
+ |
+ switch err { |
+ case ds.ErrNoSuchEntity: |
+ return http.StatusNotFound |
+ default: |
+ return http.StatusInternalServerError |
+ } |
+ } |
+ |
+ // The user can stream purged logs only if they're an administrator. If not, |
+ // pretend that the stream does not exist. |
+ if ls.Purged { |
+ if err := config.IsAdminUser(c); err != nil { |
+ log.WithError(err).Warningf(c, "Non-admin user requested purged log stream.") |
+ return http.StatusNotFound |
+ } |
+ } |
+ |
+ // Get our Storage instance. |
+ st, err := h.GetLogStreamStorage(c, ls) |
+ if err != nil { |
+ log.WithError(err).Errorf(c, "Failed to get Storage instance.") |
+ return http.StatusInternalServerError |
+ } |
+ defer st.Close() |
+ |
+ // Fetch the requested log stream! |
+ w.Header().Add("Content-Type", ls.ContentType) |
+ |
+ fetchCtx, cancelFunc := context.WithCancel(c) |
Ryan Tseng
2016/02/08 22:56:55
cancelFunc isn't used anywhere
dnj (Google)
2016/02/09 02:50:03
Done.
|
+ f := fetcher.New(fetchCtx, fetcher.Options{ |
+ Source: &storageLogSource{ |
+ ls: ls, |
+ st: st, |
+ }, |
+ Delay: viewLogDelay, |
+ }) |
+ r := renderer.Renderer{ |
+ Source: f, |
+ Reproduce: true, |
+ } |
+ |
+ // Render log data to our HTTP response. |
+ count, err := io.Copy(w, &r) |
+ if err != nil { |
+ log.WithError(err).Errorf(c, "Failed to output log stream.") |
+ return http.StatusInternalServerError |
+ } |
+ |
+ log.Fields{ |
+ "bytes": count, |
+ }.Infof(c, "Successfully wrote log stream.") |
+ return http.StatusOK |
+} |
+ |
+// storageLogSource is a stateful incremental implementation of the |
+// fetcher.Source interface which uses direct storage access to pull log entries. |
+// |
+// The Fetcher will respond to Context cancellation, so we don't have to worry |
+// about that here. |
+type storageLogSource struct { |
+ ls *coordinator.LogStream |
+ st storage.Storage |
+} |
+ |
+func (f *storageLogSource) LogEntries(c context.Context, req *fetcher.LogRequest) ( |
Ryan Tseng
2016/02/08 22:56:55
docstring (lint should catch this)
dnj (Google)
2016/02/09 02:50:03
It's a public method on a private struct, so that'
|
+ logs []*logpb.LogEntry, tidx types.MessageIndex, err error) { |
+ // If this log stream hasn't been terminated yet, re-load it and check if |
+ // there is a terminal index. |
+ tidx = types.MessageIndex(f.ls.TerminalIndex) |
+ if tidx < 0 { |
+ log.Debugf(c, "Log stream is non-terminal, refreshing.") |
+ if err := ds.Get(c).Get(f.ls); err != nil { |
+ log.WithError(err).Errorf(c, "Failed to refresh log stream.") |
+ } |
+ |
+ tidx = types.MessageIndex(f.ls.TerminalIndex) |
+ if tidx >= 0 { |
+ log.Fields{ |
+ "terminalIndex": tidx, |
+ }.Debugf(c, "Log stream has been terminated.") |
+ } |
+ } |
+ |
+ sreq := storage.GetRequest{ |
+ Path: f.ls.Path(), |
+ Index: types.MessageIndex(req.Index), |
+ } |
+ |
+ // If we have a terminal index, set a limit. This is probably unnecessary, but |
+ // will handle the error case where logs past the terminal index are written |
+ // to intermediate storage before a terminal index is registered. |
+ if tidx >= 0 { |
+ sreq.Limit = int64(tidx - req.Index + 1) |
+ if sreq.Limit <= 0 { |
+ // No more log records in the stream. |
+ return |
+ } |
+ } |
+ |
+ // Issue the Get request. |
Ryan Tseng
2016/02/08 22:56:55
presumably for all chunks of a logstream?
dnj (Google)
2016/02/09 02:50:03
Well, for the ones requested via "req".
|
+ var logData [][]byte |
+ bytes := int64(0) |
+ err = f.st.Get(&sreq, func(idx types.MessageIndex, ld []byte) bool { |
+ // Enforce contiguous log records. |
+ if idx != sreq.Index { |
+ return false |
+ } |
+ logData = append(logData, ld) |
+ sreq.Index++ |
+ bytes += int64(len(ld)) |
+ |
+ // Apply byte/count constraints. |
+ return (!(req.Count > 0 && int64(len(logData)) >= req.Count) && !(req.Bytes > 0 && bytes >= req.Bytes)) |
+ }) |
+ if err != nil { |
+ log.Fields{ |
+ log.ErrorKey: err, |
+ "initialIndex": req.Index, |
+ "nextIndex": sreq.Index, |
+ "count": len(logData), |
+ }.Errorf(c, "Failed to fetch logs.") |
+ return |
+ } |
+ |
+ // Convert the logs to protobufs. |
+ logs = make([]*logpb.LogEntry, len(logData)) |
+ for i, ld := range logData { |
+ le := logpb.LogEntry{} |
+ if err = proto.Unmarshal(ld, &le); err != nil { |
+ log.Fields{ |
+ log.ErrorKey: err, |
+ "logIndex": i, |
+ }.Errorf(c, "Failed to unmarshal LogEntry from log data.") |
+ return |
+ } |
+ logs[i] = &le |
+ } |
+ return |
+} |
+ |
+// cancelSleep will block for the specified duration, or until the supplied |
+// Context is done. |
+func cancelSleep(c context.Context, d time.Duration) error { |
+ t := clock.NewTimer(c) |
+ t.Reset(d) |
+ defer t.Stop() |
+ |
+ select { |
+ case <-c.Done(): |
+ return c.Err() |
+ case <-t.GetC(): |
+ return nil |
+ } |
+} |