Index: appengine/logdog/coordinator/endpoints/logs/get.go |
diff --git a/appengine/logdog/coordinator/endpoints/logs/get.go b/appengine/logdog/coordinator/endpoints/logs/get.go |
index dd84d9e3100dcf6d9b2c51f4ce0f9cd1eb49dfde..9750e236a1e6d7432c4dcb3d233c2ba83fb937c4 100644 |
--- a/appengine/logdog/coordinator/endpoints/logs/get.go |
+++ b/appengine/logdog/coordinator/endpoints/logs/get.go |
@@ -5,13 +5,13 @@ |
package logs |
import ( |
- "net/url" |
"time" |
"github.com/golang/protobuf/proto" |
ds "github.com/luci/gae/service/datastore" |
"github.com/luci/luci-go/appengine/logdog/coordinator" |
"github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" |
+ "github.com/luci/luci-go/common/config" |
"github.com/luci/luci-go/common/grpcutil" |
"github.com/luci/luci-go/common/logdog/types" |
log "github.com/luci/luci-go/common/logging" |
@@ -46,8 +46,9 @@ func (s *server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp |
// Tail returns the last log entry for a given log stream. |
func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetResponse, error) { |
r := logdog.GetRequest{ |
- Path: req.Path, |
- State: req.State, |
+ Project: req.Project, |
+ Path: req.Path, |
+ State: req.State, |
} |
return s.getImpl(c, &r, true) |
} |
@@ -55,27 +56,26 @@ func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe |
// getImpl is common code shared between Get and Tail endpoints. |
func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (*logdog.GetResponse, error) { |
log.Fields{ |
- "path": req.Path, |
- "index": req.Index, |
- "tail": tail, |
+ "project": req.Project, |
+ "path": req.Path, |
+ "index": req.Index, |
+ "tail": tail, |
}.Debugf(c, "Received get request.") |
- // Fetch the log stream state for this log stream. |
- u, err := url.Parse(req.Path) |
+ ls, err := coordinator.NewLogStream(req.Path) |
if err != nil { |
- log.Fields{ |
- log.ErrorKey: err, |
- "path": req.Path, |
- }.Errorf(c, "Could not parse path URL.") |
- return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path encoding") |
+ log.WithError(err).Errorf(c, "Invalid path supplied.") |
+ return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path value") |
} |
- ls, err := coordinator.NewLogStream(u.Path) |
- if err != nil { |
+ |
+ // The user may supply a hash instead of a full path. Once resolved, log |
+ // the original log stream. |
+ path := ls.Path() |
+ if req.Path != string(path) { |
log.Fields{ |
- log.ErrorKey: err, |
- "path": u.Path, |
- }.Errorf(c, "Invalid path supplied.") |
- return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path value") |
+ "hashPath": req.Path, |
+ "streamPath": path, |
+ }.Debugf(c, "Resolved hash path.") |
} |
// If this log entry is Purged and we're not admin, pretend it doesn't exist. |
@@ -92,19 +92,13 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( |
} |
case ds.ErrNoSuchEntity: |
- log.Fields{ |
- "path": u.Path, |
- }.Errorf(c, "Log stream does not exist.") |
+ log.Errorf(c, "Log stream does not exist.") |
return nil, grpcutil.Errf(codes.NotFound, "path not found") |
default: |
- log.Fields{ |
- log.ErrorKey: err, |
- "path": u.Path, |
- }.Errorf(c, "Failed to look up log stream.") |
+ log.WithError(err).Errorf(c, "Failed to look up log stream.") |
return nil, grpcutil.Internal |
} |
- path := ls.Path() |
// If nothing was requested, return nothing. |
resp := logdog.GetResponse{} |
@@ -127,10 +121,7 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( |
if tail || req.LogCount >= 0 { |
resp.Logs, err = s.getLogs(c, req, tail, ls) |
if err != nil { |
- log.Fields{ |
- log.ErrorKey: err, |
- "path": path, |
- }.Errorf(c, "Failed to get logs.") |
+ log.WithError(err).Errorf(c, "Failed to get logs.") |
return nil, grpcutil.Internal |
} |
} |
@@ -191,14 +182,14 @@ func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, l |
} |
defer st.Close() |
- path := ls.Path() |
+ project, path := coordinator.Project(c), ls.Path() |
var fetchedLogs [][]byte |
var err error |
if tail { |
- fetchedLogs, err = getTail(c, st, path) |
+ fetchedLogs, err = getTail(c, st, project, path) |
} else { |
- fetchedLogs, err = getHead(c, req, st, path, byteLimit) |
+ fetchedLogs, err = getHead(c, req, st, project, path, byteLimit) |
} |
if err != nil { |
log.WithError(err).Errorf(c, "Failed to fetch log records.") |
@@ -221,15 +212,16 @@ func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, l |
return logEntries, nil |
} |
-func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p types.StreamPath, byteLimit int) ( |
- [][]byte, error) { |
- c = log.SetFields(c, log.Fields{ |
- "path": p, |
+func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, project config.ProjectName, |
+ path types.StreamPath, byteLimit int) ([][]byte, error) { |
+ log.Fields{ |
+ "project": project, |
+ "path": path, |
"index": req.Index, |
"count": req.LogCount, |
"bytes": req.ByteCount, |
"noncontiguous": req.NonContiguous, |
- }) |
+ }.Debugf(c, "Issuing Get request.") |
// Allocate result logs array. |
logCount := int(req.LogCount) |
@@ -240,9 +232,10 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty |
logs := make([][]byte, 0, asz) |
sreq := storage.GetRequest{ |
- Path: p, |
- Index: types.MessageIndex(req.Index), |
- Limit: logCount, |
+ Project: project, |
+ Path: path, |
+ Index: types.MessageIndex(req.Index), |
+ Limit: logCount, |
} |
count := 0 |
@@ -273,7 +266,14 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty |
"count": len(logs), |
}.Warningf(c, "Transient error while loading logs; retrying.") |
}) |
- if err != nil { |
+ switch err { |
+ case nil: |
+ return logs, nil |
+ |
+ case storage.ErrDoesNotExist: |
+ return nil, nil |
+ |
+ default: |
log.Fields{ |
log.ErrorKey: err, |
"initialIndex": req.Index, |
@@ -282,14 +282,17 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty |
}.Errorf(c, "Failed to execute range request.") |
return nil, err |
} |
- |
- return logs, nil |
} |
-func getTail(c context.Context, st storage.Storage, p types.StreamPath) ([][]byte, error) { |
+func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) ([][]byte, error) { |
+ log.Fields{ |
+ "project": project, |
+ "path": path, |
+ }.Debugf(c, "Issuing Tail request.") |
+ |
var data []byte |
err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err error) { |
- data, _, err = st.Tail("", p) |
+ data, _, err = st.Tail(project, path) |
return |
}, func(err error, delay time.Duration) { |
log.Fields{ |
@@ -297,9 +300,15 @@ func getTail(c context.Context, st storage.Storage, p types.StreamPath) ([][]byt |
"delay": delay, |
}.Warningf(c, "Transient error while fetching tail log; retrying.") |
}) |
- if err != nil { |
+ switch err { |
+ case nil: |
+ return [][]byte{data}, nil |
+ |
+ case storage.ErrDoesNotExist: |
+ return nil, nil |
+ |
+ default: |
log.WithError(err).Errorf(c, "Failed to fetch tail log.") |
return nil, err |
} |
- return [][]byte{data}, err |
} |