| Index: appengine/logdog/coordinator/endpoints/logs/get.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/logs/get.go b/appengine/logdog/coordinator/endpoints/logs/get.go
|
| index dd84d9e3100dcf6d9b2c51f4ce0f9cd1eb49dfde..9750e236a1e6d7432c4dcb3d233c2ba83fb937c4 100644
|
| --- a/appengine/logdog/coordinator/endpoints/logs/get.go
|
| +++ b/appengine/logdog/coordinator/endpoints/logs/get.go
|
| @@ -5,13 +5,13 @@
|
| package logs
|
|
|
| import (
|
| - "net/url"
|
| "time"
|
|
|
| "github.com/golang/protobuf/proto"
|
| ds "github.com/luci/gae/service/datastore"
|
| "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1"
|
| + "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/grpcutil"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| log "github.com/luci/luci-go/common/logging"
|
| @@ -46,8 +46,9 @@ func (s *server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp
|
| // Tail returns the last log entry for a given log stream.
|
| func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetResponse, error) {
|
| r := logdog.GetRequest{
|
| - Path: req.Path,
|
| - State: req.State,
|
| + Project: req.Project,
|
| + Path: req.Path,
|
| + State: req.State,
|
| }
|
| return s.getImpl(c, &r, true)
|
| }
|
| @@ -55,27 +56,26 @@ func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe
|
| // getImpl is common code shared between Get and Tail endpoints.
|
| func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (*logdog.GetResponse, error) {
|
| log.Fields{
|
| - "path": req.Path,
|
| - "index": req.Index,
|
| - "tail": tail,
|
| + "project": req.Project,
|
| + "path": req.Path,
|
| + "index": req.Index,
|
| + "tail": tail,
|
| }.Debugf(c, "Received get request.")
|
|
|
| - // Fetch the log stream state for this log stream.
|
| - u, err := url.Parse(req.Path)
|
| + ls, err := coordinator.NewLogStream(req.Path)
|
| if err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "path": req.Path,
|
| - }.Errorf(c, "Could not parse path URL.")
|
| - return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path encoding")
|
| + log.WithError(err).Errorf(c, "Invalid path supplied.")
|
| + return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path value")
|
| }
|
| - ls, err := coordinator.NewLogStream(u.Path)
|
| - if err != nil {
|
| +
|
| + // The user may supply a hash instead of a full path. Once resolved, log
|
| + // the original log stream.
|
| + path := ls.Path()
|
| + if req.Path != string(path) {
|
| log.Fields{
|
| - log.ErrorKey: err,
|
| - "path": u.Path,
|
| - }.Errorf(c, "Invalid path supplied.")
|
| - return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path value")
|
| + "hashPath": req.Path,
|
| + "streamPath": path,
|
| + }.Debugf(c, "Resolved hash path.")
|
| }
|
|
|
| // If this log entry is Purged and we're not admin, pretend it doesn't exist.
|
| @@ -92,19 +92,13 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
|
| }
|
|
|
| case ds.ErrNoSuchEntity:
|
| - log.Fields{
|
| - "path": u.Path,
|
| - }.Errorf(c, "Log stream does not exist.")
|
| + log.Errorf(c, "Log stream does not exist.")
|
| return nil, grpcutil.Errf(codes.NotFound, "path not found")
|
|
|
| default:
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "path": u.Path,
|
| - }.Errorf(c, "Failed to look up log stream.")
|
| + log.WithError(err).Errorf(c, "Failed to look up log stream.")
|
| return nil, grpcutil.Internal
|
| }
|
| - path := ls.Path()
|
|
|
| // If nothing was requested, return nothing.
|
| resp := logdog.GetResponse{}
|
| @@ -127,10 +121,7 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
|
| if tail || req.LogCount >= 0 {
|
| resp.Logs, err = s.getLogs(c, req, tail, ls)
|
| if err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "path": path,
|
| - }.Errorf(c, "Failed to get logs.")
|
| + log.WithError(err).Errorf(c, "Failed to get logs.")
|
| return nil, grpcutil.Internal
|
| }
|
| }
|
| @@ -191,14 +182,14 @@ func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, l
|
| }
|
| defer st.Close()
|
|
|
| - path := ls.Path()
|
| + project, path := coordinator.Project(c), ls.Path()
|
|
|
| var fetchedLogs [][]byte
|
| var err error
|
| if tail {
|
| - fetchedLogs, err = getTail(c, st, path)
|
| + fetchedLogs, err = getTail(c, st, project, path)
|
| } else {
|
| - fetchedLogs, err = getHead(c, req, st, path, byteLimit)
|
| + fetchedLogs, err = getHead(c, req, st, project, path, byteLimit)
|
| }
|
| if err != nil {
|
| log.WithError(err).Errorf(c, "Failed to fetch log records.")
|
| @@ -221,15 +212,16 @@ func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, l
|
| return logEntries, nil
|
| }
|
|
|
| -func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p types.StreamPath, byteLimit int) (
|
| - [][]byte, error) {
|
| - c = log.SetFields(c, log.Fields{
|
| - "path": p,
|
| +func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, project config.ProjectName,
|
| + path types.StreamPath, byteLimit int) ([][]byte, error) {
|
| + log.Fields{
|
| + "project": project,
|
| + "path": path,
|
| "index": req.Index,
|
| "count": req.LogCount,
|
| "bytes": req.ByteCount,
|
| "noncontiguous": req.NonContiguous,
|
| - })
|
| + }.Debugf(c, "Issuing Get request.")
|
|
|
| // Allocate result logs array.
|
| logCount := int(req.LogCount)
|
| @@ -240,9 +232,10 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty
|
| logs := make([][]byte, 0, asz)
|
|
|
| sreq := storage.GetRequest{
|
| - Path: p,
|
| - Index: types.MessageIndex(req.Index),
|
| - Limit: logCount,
|
| + Project: project,
|
| + Path: path,
|
| + Index: types.MessageIndex(req.Index),
|
| + Limit: logCount,
|
| }
|
|
|
| count := 0
|
| @@ -273,7 +266,14 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty
|
| "count": len(logs),
|
| }.Warningf(c, "Transient error while loading logs; retrying.")
|
| })
|
| - if err != nil {
|
| + switch err {
|
| + case nil:
|
| + return logs, nil
|
| +
|
| + case storage.ErrDoesNotExist:
|
| + return nil, nil
|
| +
|
| + default:
|
| log.Fields{
|
| log.ErrorKey: err,
|
| "initialIndex": req.Index,
|
| @@ -282,14 +282,17 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty
|
| }.Errorf(c, "Failed to execute range request.")
|
| return nil, err
|
| }
|
| -
|
| - return logs, nil
|
| }
|
|
|
| -func getTail(c context.Context, st storage.Storage, p types.StreamPath) ([][]byte, error) {
|
| +func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) ([][]byte, error) {
|
| + log.Fields{
|
| + "project": project,
|
| + "path": path,
|
| + }.Debugf(c, "Issuing Tail request.")
|
| +
|
| var data []byte
|
| err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err error) {
|
| - data, _, err = st.Tail("", p)
|
| + data, _, err = st.Tail(project, path)
|
| return
|
| }, func(err error, delay time.Duration) {
|
| log.Fields{
|
| @@ -297,9 +300,15 @@ func getTail(c context.Context, st storage.Storage, p types.StreamPath) ([][]byt
|
| "delay": delay,
|
| }.Warningf(c, "Transient error while fetching tail log; retrying.")
|
| })
|
| - if err != nil {
|
| + switch err {
|
| + case nil:
|
| + return [][]byte{data}, nil
|
| +
|
| + case storage.ErrDoesNotExist:
|
| + return nil, nil
|
| +
|
| + default:
|
| log.WithError(err).Errorf(c, "Failed to fetch tail log.")
|
| return nil, err
|
| }
|
| - return [][]byte{data}, err
|
| }
|
|
|