| Index: appengine/logdog/coordinator/logView/view.go
|
| diff --git a/appengine/logdog/coordinator/logView/view.go b/appengine/logdog/coordinator/logView/view.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..9d36dd930fe008cf058eb473b6585861fcffd34b
|
| --- /dev/null
|
| +++ b/appengine/logdog/coordinator/logView/view.go
|
| @@ -0,0 +1,232 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package logView
|
| +
|
| +import (
|
| + "io"
|
| + "net/http"
|
| + "strings"
|
| + "time"
|
| +
|
| + "github.com/golang/protobuf/proto"
|
| + "github.com/julienschmidt/httprouter"
|
| + ds "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| + "github.com/luci/luci-go/appengine/logdog/coordinator/config"
|
| + "github.com/luci/luci-go/common/logdog/fetcher"
|
| + "github.com/luci/luci-go/common/logdog/renderer"
|
| + "github.com/luci/luci-go/common/logdog/types"
|
| + log "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/common/proto/logdog/logpb"
|
| + "github.com/luci/luci-go/server/logdog/storage"
|
| + "github.com/luci/luci-go/server/middleware"
|
| + "golang.org/x/net/context"
|
| +)
|
| +
|
| +const (
|
| + // DefaultViewTimeout is the default value for the Handler's Timeout.
|
| + //
|
| + // It targets the managed VM environment, which allows 24 hours. We stop
|
| + // 10 minutes short of that to avoid conflict.
|
| + DefaultViewTimeout = (24 * time.Hour) - (10 * time.Minute)
|
| +
|
| + // viewLogDelay is the amount of time to wait in between polls when no more
|
| + // log entries are available during streaming.
|
| + viewLogDelay = (5 * time.Second)
|
| +)
|
| +
|
| +// Handler offers an HTTP handler that will return the rendered contents of a
|
| +// log stream.
|
| +type Handler struct {
|
| + coordinator.Service
|
| +
|
| + // timeout is the maximum amount of time that we are willing to handle a
|
| + // View request.
|
| + //
|
| + // This should be short of the maximum request time allowed by an AppEngine
|
| + // handler, else the instance may be terminated.
|
| + timeout time.Duration
|
| +}
|
| +
|
| +// InstallHandlers installs the view handler into the supplied router.
|
| +func (h *Handler) InstallHandlers(r *httprouter.Router, b middleware.Base) {
|
| + r.GET("/logs/view/*path", b(h.handle))
|
| +}
|
| +
|
| +// handle will render the contents of a log stream to the supplied
|
| +// ResponseWriter.
|
| +//
|
| +// It expects the stream hash/path to be embedded in p as "path".
|
| +func (h *Handler) handle(c context.Context, w http.ResponseWriter, req *http.Request, p httprouter.Params) {
|
| + if rc := h.handleImpl(c, w, req, p); rc != 0 {
|
| + w.WriteHeader(rc)
|
| + }
|
| +}
|
| +
|
| +// handleImpl handles the HTTP request. If it returns a non-zero value, that
|
| +// will be used as the HTTP response code.
|
| +func (h *Handler) handleImpl(c context.Context, w http.ResponseWriter, req *http.Request, p httprouter.Params) int {
|
| + // Get the requested log path/hash.
|
| + path := strings.TrimLeft(p.ByName("path"), "/")
|
| + log.Fields{
|
| + "path": path,
|
| + }.Infof(c, "Request to view log stream.")
|
| +
|
| + ls, err := coordinator.NewLogStream(path)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Invalid log sream path/hash value supplied.")
|
| + return http.StatusBadRequest
|
| + }
|
| +
|
| + // Set a timeout on our Context.
|
| + timeout := h.timeout
|
| + if timeout <= 0 {
|
| + timeout = DefaultViewTimeout
|
| + }
|
| + c, cancelFunc := context.WithTimeout(c, timeout)
|
| + defer cancelFunc()
|
| +
|
| + // Load the log stream metadata.
|
| + if err := ds.Get(c).Get(ls); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to load LogStream.")
|
| +
|
| + switch err {
|
| + case ds.ErrNoSuchEntity:
|
| + return http.StatusNotFound
|
| + default:
|
| + return http.StatusInternalServerError
|
| + }
|
| + }
|
| +
|
| + // The user can stream purged logs only if they're an administrator. If not,
|
| + // pretend that the stream does not exist.
|
| + if ls.Purged {
|
| + if err := config.IsAdminUser(c); err != nil {
|
| + log.WithError(err).Warningf(c, "Non-admin user requested purged log stream.")
|
| + return http.StatusNotFound
|
| + }
|
| + }
|
| +
|
| + // Get our Storage instance.
|
| + st, err := h.StorageForLogStream(c, ls)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to get Storage instance.")
|
| + return http.StatusInternalServerError
|
| + }
|
| + defer st.Close()
|
| +
|
| + // Fetch the requested log stream!
|
| + w.Header().Add("Content-Type", ls.ContentType)
|
| +
|
| + f := fetcher.New(c, fetcher.Options{
|
| + Source: &storageLogSource{
|
| + ls: ls,
|
| + st: st,
|
| + },
|
| + Delay: viewLogDelay,
|
| + })
|
| + r := renderer.Renderer{
|
| + Source: f,
|
| + Reproduce: true,
|
| + }
|
| +
|
| + // Render log data to our HTTP response.
|
| + count, err := io.Copy(w, &r)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to output log stream.")
|
| + return http.StatusInternalServerError
|
| + }
|
| +
|
| + log.Fields{
|
| + "bytes": count,
|
| + }.Infof(c, "Successfully wrote log stream.")
|
| + return 0
|
| +}
|
| +
|
| +// storageLogSource is a stateful incremental implementation of the
|
| +// fetcher.Source interface which uses direct storage access to pull log entries.
|
| +//
|
| +// The Fetcher will respond to Context cancellation, so we don't have to worry
|
| +// about that here.
|
| +type storageLogSource struct {
|
| + ls *coordinator.LogStream
|
| + st storage.Storage
|
| +}
|
| +
|
| +func (f *storageLogSource) LogEntries(c context.Context, req *fetcher.LogRequest) (
|
| + logs []*logpb.LogEntry, tidx types.MessageIndex, err error) {
|
| + // If this log stream hasn't been terminated yet, re-load it and check if
|
| + // there is a terminal index.
|
| + tidx = types.MessageIndex(f.ls.TerminalIndex)
|
| + if tidx < 0 {
|
| + log.Debugf(c, "Log stream is non-terminal, refreshing.")
|
| + if err := ds.Get(c).Get(f.ls); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to refresh log stream.")
|
| + }
|
| +
|
| + tidx = types.MessageIndex(f.ls.TerminalIndex)
|
| + if tidx >= 0 {
|
| + log.Fields{
|
| + "terminalIndex": tidx,
|
| + }.Debugf(c, "Log stream has been terminated.")
|
| + }
|
| + }
|
| +
|
| + sreq := storage.GetRequest{
|
| + Path: f.ls.Path(),
|
| + Index: types.MessageIndex(req.Index),
|
| + }
|
| +
|
| + // If we have a terminal index, set a limit. This is probably unnecessary, but
|
| + // will handle the error case where logs past the terminal index are written
|
| + // to intermediate storage before a terminal index is registered.
|
| + if tidx >= 0 {
|
| + sreq.Limit = int64(tidx - req.Index + 1)
|
| + if sreq.Limit <= 0 {
|
| + // No more log records in the stream.
|
| + return
|
| + }
|
| + }
|
| +
|
| + // Issue the Get request.
|
| + var logData [][]byte
|
| + bytes := int64(0)
|
| + err = f.st.Get(&sreq, func(idx types.MessageIndex, ld []byte) bool {
|
| + // Enforce contiguous log records.
|
| + if idx != sreq.Index {
|
| + return false
|
| + }
|
| + logData = append(logData, ld)
|
| + sreq.Index++
|
| + bytes += int64(len(ld))
|
| +
|
| + // Apply byte/count constraints.
|
| + return (!(req.Count > 0 && int64(len(logData)) >= req.Count) && !(req.Bytes > 0 && bytes >= req.Bytes))
|
| + })
|
| + if err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "initialIndex": req.Index,
|
| + "nextIndex": sreq.Index,
|
| + "count": len(logData),
|
| + }.Errorf(c, "Failed to fetch logs.")
|
| + return
|
| + }
|
| +
|
| + // Convert the logs to protobufs.
|
| + logs = make([]*logpb.LogEntry, len(logData))
|
| + for i, ld := range logData {
|
| + le := logpb.LogEntry{}
|
| + if err = proto.Unmarshal(ld, &le); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "logIndex": i,
|
| + }.Errorf(c, "Failed to unmarshal LogEntry from log data.")
|
| + return
|
| + }
|
| + logs[i] = &le
|
| + }
|
| + return
|
| +}
|
|
|