Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3051)

Unified Diff: appengine/logdog/coordinator/endpoints/logs/get.go

Issue 1904503003: LogDog: Fix archived log stream read errors. (Closed) Base URL: https://github.com/luci/luci-go@hierarchy-check-first
Patch Set: Delete "offset()" method. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | appengine/logdog/coordinator/endpoints/logs/get_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/logdog/coordinator/endpoints/logs/get.go
diff --git a/appengine/logdog/coordinator/endpoints/logs/get.go b/appengine/logdog/coordinator/endpoints/logs/get.go
index 857da79520296500008092fbc68b9474f2080869..6bfccacf7871a53cb971d535c9600ec0b91272e9 100644
--- a/appengine/logdog/coordinator/endpoints/logs/get.go
+++ b/appengine/logdog/coordinator/endpoints/logs/get.go
@@ -29,9 +29,12 @@ const (
getInitialArraySize = 256
// getBytesLimit is the maximum amount of data that we are willing to query.
- // AppEngine limits our response size to 32MB. However, this limit applies
- // to the raw recovered LogEntry data, so we'll artificially constrain this
- // to 16MB so the additional JSON overhead doesn't kill it.
+ //
+ // We will limit byte responses to 16MB, based on the following constraints:
+ // - AppEngine cannot respond with more than 32MB of data. This includes JSON
+ // overhead, including notation and base64 data expansion.
+ // - `urlfetch`, which is used for Google Cloud Storage (archival) responses,
+ // cannot handle responses larger than 32MB.
getBytesLimit = 16 * 1024 * 1024
)
@@ -52,6 +55,11 @@ func (s *Server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe
// getImpl is common code shared between Get and Tail endpoints.
func (s *Server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (*logdog.GetResponse, error) {
svc := s.GetServices()
+ log.Fields{
+ "path": req.Path,
+ "index": req.Index,
+ "tail": tail,
+ }.Debugf(c, "Received get request.")
// Fetch the log stream state for this log stream.
u, err := url.Parse(req.Path)
@@ -137,6 +145,11 @@ func (s *Server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
func (s *Server) getLogs(c context.Context, svc coordinator.Services, req *logdog.GetRequest, tail bool,
ls *coordinator.LogStream) (
[]*logpb.LogEntry, error) {
+ byteLimit := int(req.ByteCount)
+ if byteLimit <= 0 || byteLimit > getBytesLimit {
+ byteLimit = getBytesLimit
+ }
+
var st storage.Storage
if !ls.Archived() {
log.Debugf(c, "Log is not archived. Fetching from intermediate storage.")
@@ -148,7 +161,12 @@ func (s *Server) getLogs(c context.Context, svc coordinator.Services, req *logdo
return nil, err
}
} else {
- log.Debugf(c, "Log is archived. Fetching from archive storage.")
+ log.Fields{
+ "indexURL": ls.ArchiveIndexURL,
+ "streamURL": ls.ArchiveStreamURL,
+ "archiveTime": ls.ArchivedTime,
+ }.Debugf(c, "Log is archived. Fetching from archive storage.")
+
var err error
gs, err := svc.GSClient(c)
if err != nil {
@@ -165,6 +183,7 @@ func (s *Server) getLogs(c context.Context, svc coordinator.Services, req *logdo
IndexURL: ls.ArchiveIndexURL,
StreamURL: ls.ArchiveStreamURL,
Client: gs,
+ MaxBytes: byteLimit,
})
if err != nil {
log.WithError(err).Errorf(c, "Failed to create Google Storage storage instance.")
@@ -180,7 +199,7 @@ func (s *Server) getLogs(c context.Context, svc coordinator.Services, req *logdo
if tail {
fetchedLogs, err = getTail(c, st, path)
} else {
- fetchedLogs, err = getHead(c, req, st, path)
+ fetchedLogs, err = getHead(c, req, st, path, byteLimit)
}
if err != nil {
log.WithError(err).Errorf(c, "Failed to fetch log records.")
@@ -203,7 +222,8 @@ func (s *Server) getLogs(c context.Context, svc coordinator.Services, req *logdo
return logEntries, nil
}
-func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p types.StreamPath) ([][]byte, error) {
+func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p types.StreamPath, byteLimit int) (
+ [][]byte, error) {
c = log.SetFields(c, log.Fields{
"path": p,
"index": req.Index,
@@ -212,11 +232,6 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty
"noncontiguous": req.NonContiguous,
})
- byteLimit := int(req.ByteCount)
- if byteLimit <= 0 || byteLimit > getBytesLimit {
- byteLimit = getBytesLimit
- }
-
// Allocate result logs array.
logCount := int(req.LogCount)
asz := getInitialArraySize
« no previous file with comments | « no previous file | appengine/logdog/coordinator/endpoints/logs/get_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698