Chromium Code Reviews| Index: appengine/logdog/coordinator/logView/view.go |
| diff --git a/appengine/logdog/coordinator/logView/view.go b/appengine/logdog/coordinator/logView/view.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..b55a69012e607b87a3afb88590818caffb5e736d |
| --- /dev/null |
| +++ b/appengine/logdog/coordinator/logView/view.go |
| @@ -0,0 +1,240 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
|
Ryan Tseng
2016/02/08 22:56:55
2016 /happy new year
dnj (Google)
2016/02/09 02:50:03
Done.
|
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package logView |
| + |
| +import ( |
| + "io" |
| + "net/http" |
| + "strings" |
| + "time" |
| + |
| + "github.com/golang/protobuf/proto" |
| + "github.com/julienschmidt/httprouter" |
| + ds "github.com/luci/gae/service/datastore" |
| + "github.com/luci/luci-go/appengine/logdog/coordinator" |
| + "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/logdog/fetcher" |
| + "github.com/luci/luci-go/common/logdog/renderer" |
| + "github.com/luci/luci-go/common/logdog/types" |
| + log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/proto/logdog/logpb" |
| + "github.com/luci/luci-go/server/logdog/storage" |
| + "golang.org/x/net/context" |
| +) |
| + |
| +const ( |
| + // DefaultViewTimeout is the default value for the Handler's Timeout. |
| + // |
| + // It targets the managed VM environment, which allows 24 hours. We stop |
| + // 10 minutes short of that to avoid conflict. |
| + DefaultViewTimeout = (24 * time.Hour) - (10 * time.Minute) |
| + |
| + // viewLogDelay is the amount of time to wait in between polls when no more |
| + // log entries are available during streaming. |
| + viewLogDelay = (5 * time.Second) |
| +) |
| + |
| +// Handler offers an HTTP handler that will return the rendered contents of a |
| +// log stream. |
| +type Handler struct { |
| + coordinator.Service |
| + |
| + // timeout is the maximum amount of time that we are willing to handle a |
| + // View request. |
| + // |
| + // This should be short of the maximum request time allowed by an AppEngine |
| + // handler, else the instance may be terminated. |
| + timeout time.Duration |
| +} |
| + |
| +// Handle will render the contents of a log stream to the supplied |
| +// ResponseWriter. |
| +// |
| +// It expects the stream hash/path to be embedded in p as "path". |
| +func (h *Handler) Handle(c context.Context, w http.ResponseWriter, req *http.Request, p httprouter.Params) { |
| + rc := h.handleImpl(c, w, req, p) |
| + w.WriteHeader(rc) |
| +} |
| + |
| +func (h *Handler) handleImpl(c context.Context, w http.ResponseWriter, req *http.Request, p httprouter.Params) int { |
| + // Get the requested log path/hash. |
| + path := strings.TrimLeft(p.ByName("path"), "/") |
| + log.Fields{ |
| + "path": path, |
| + }.Infof(c, "Request to view log stream.") |
| + |
| + ls, err := coordinator.NewLogStream(path) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Invalid log sream path/hash value supplied.") |
| + return http.StatusBadRequest |
| + } |
| + |
| + // Set a timeout on our Context. |
| + timeout := h.timeout |
| + if timeout <= 0 { |
| + timeout = DefaultViewTimeout |
| + } |
| + c, cancelFunc := context.WithTimeout(c, timeout) |
| + defer cancelFunc() |
| + |
| + // Load the log stream metadata. |
| + if err := ds.Get(c).Get(&ls); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to load LogStream.") |
| + |
| + switch err { |
| + case ds.ErrNoSuchEntity: |
| + return http.StatusNotFound |
| + default: |
| + return http.StatusInternalServerError |
| + } |
| + } |
| + |
| + // The user can stream purged logs only if they're an administrator. If not, |
| + // pretend that the stream does not exist. |
| + if ls.Purged { |
| + if err := config.IsAdminUser(c); err != nil { |
| + log.WithError(err).Warningf(c, "Non-admin user requested purged log stream.") |
| + return http.StatusNotFound |
| + } |
| + } |
| + |
| + // Get our Storage instance. |
| + st, err := h.GetLogStreamStorage(c, ls) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to get Storage instance.") |
| + return http.StatusInternalServerError |
| + } |
| + defer st.Close() |
| + |
| + // Fetch the requested log stream! |
| + w.Header().Add("Content-Type", ls.ContentType) |
| + |
| + fetchCtx, cancelFunc := context.WithCancel(c) |
|
Ryan Tseng
2016/02/08 22:56:55
cancelFunc isn't used anywhere
dnj (Google)
2016/02/09 02:50:03
Done.
|
| + f := fetcher.New(fetchCtx, fetcher.Options{ |
| + Source: &storageLogSource{ |
| + ls: ls, |
| + st: st, |
| + }, |
| + Delay: viewLogDelay, |
| + }) |
| + r := renderer.Renderer{ |
| + Source: f, |
| + Reproduce: true, |
| + } |
| + |
| + // Render log data to our HTTP response. |
| + count, err := io.Copy(w, &r) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to output log stream.") |
| + return http.StatusInternalServerError |
| + } |
| + |
| + log.Fields{ |
| + "bytes": count, |
| + }.Infof(c, "Successfully wrote log stream.") |
| + return http.StatusOK |
| +} |
| + |
| +// storageLogSource is a stateful incremental implementation of the |
| +// fetcher.Source interface which uses direct storage access to pull log entries. |
| +// |
| +// The Fetcher will respond to Context cancellation, so we don't have to worry |
| +// about that here. |
| +type storageLogSource struct { |
| + ls *coordinator.LogStream |
| + st storage.Storage |
| +} |
| + |
| +func (f *storageLogSource) LogEntries(c context.Context, req *fetcher.LogRequest) ( |
|
Ryan Tseng
2016/02/08 22:56:55
docstring (lint should catch this)
dnj (Google)
2016/02/09 02:50:03
It's a public method on a private struct, so that'
|
| + logs []*logpb.LogEntry, tidx types.MessageIndex, err error) { |
| + // If this log stream hasn't been terminated yet, re-load it and check if |
| + // there is a terminal index. |
| + tidx = types.MessageIndex(f.ls.TerminalIndex) |
| + if tidx < 0 { |
| + log.Debugf(c, "Log stream is non-terminal, refreshing.") |
| + if err := ds.Get(c).Get(f.ls); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to refresh log stream.") |
| + } |
| + |
| + tidx = types.MessageIndex(f.ls.TerminalIndex) |
| + if tidx >= 0 { |
| + log.Fields{ |
| + "terminalIndex": tidx, |
| + }.Debugf(c, "Log stream has been terminated.") |
| + } |
| + } |
| + |
| + sreq := storage.GetRequest{ |
| + Path: f.ls.Path(), |
| + Index: types.MessageIndex(req.Index), |
| + } |
| + |
| + // If we have a terminal index, set a limit. This is probably unnecessary, but |
| + // will handle the error case where logs past the terminal index are written |
| + // to intermediate storage before a terminal index is registered. |
| + if tidx >= 0 { |
| + sreq.Limit = int64(tidx - req.Index + 1) |
| + if sreq.Limit <= 0 { |
| + // No more log records in the stream. |
| + return |
| + } |
| + } |
| + |
| + // Issue the Get request. |
|
Ryan Tseng
2016/02/08 22:56:55
presumably for all chunks of a logstream?
dnj (Google)
2016/02/09 02:50:03
Well, for the ones requested via "req".
|
| + var logData [][]byte |
| + bytes := int64(0) |
| + err = f.st.Get(&sreq, func(idx types.MessageIndex, ld []byte) bool { |
| + // Enforce contiguous log records. |
| + if idx != sreq.Index { |
| + return false |
| + } |
| + logData = append(logData, ld) |
| + sreq.Index++ |
| + bytes += int64(len(ld)) |
| + |
| + // Apply byte/count constraints. |
| + return (!(req.Count > 0 && int64(len(logData)) >= req.Count) && !(req.Bytes > 0 && bytes >= req.Bytes)) |
| + }) |
| + if err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "initialIndex": req.Index, |
| + "nextIndex": sreq.Index, |
| + "count": len(logData), |
| + }.Errorf(c, "Failed to fetch logs.") |
| + return |
| + } |
| + |
| + // Convert the logs to protobufs. |
| + logs = make([]*logpb.LogEntry, len(logData)) |
| + for i, ld := range logData { |
| + le := logpb.LogEntry{} |
| + if err = proto.Unmarshal(ld, &le); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "logIndex": i, |
| + }.Errorf(c, "Failed to unmarshal LogEntry from log data.") |
| + return |
| + } |
| + logs[i] = &le |
| + } |
| + return |
| +} |
| + |
| +// cancelSleep will block for the specified duration, or until the supplied |
| +// Context is done. |
| +func cancelSleep(c context.Context, d time.Duration) error { |
| + t := clock.NewTimer(c) |
| + t.Reset(d) |
| + defer t.Stop() |
| + |
| + select { |
| + case <-c.Done(): |
| + return c.Err() |
| + case <-t.GetC(): |
| + return nil |
| + } |
| +} |