Index: appengine/logdog/coordinator/endpoints/logs/get.go |
diff --git a/appengine/logdog/coordinator/endpoints/logs/get.go b/appengine/logdog/coordinator/endpoints/logs/get.go |
index c2803d2e219d1f6720dfe9b87537ef89b6fa54df..1121cc4742aa8cacb92cee343114e3c741d4a44f 100644 |
--- a/appengine/logdog/coordinator/endpoints/logs/get.go |
+++ b/appengine/logdog/coordinator/endpoints/logs/get.go |
@@ -19,7 +19,6 @@ import ( |
"github.com/luci/luci-go/common/proto/logdog/logpb" |
"github.com/luci/luci-go/common/retry" |
"github.com/luci/luci-go/server/logdog/storage" |
- "github.com/luci/luci-go/server/logdog/storage/archive" |
"golang.org/x/net/context" |
"google.golang.org/grpc/codes" |
) |
@@ -27,7 +26,7 @@ import ( |
const ( |
// getInitialArraySize is the initial amount of log slots to allocate for a |
// Get request. |
- getInitialArraySize = 256 |
+ getInitialArraySize = int64(256) |
// getBytesLimit is the maximum amount of data that we are willing to query. |
// AppEngine limits our response size to 32MB. However, this limit applies |
@@ -135,46 +134,15 @@ func (s *Server) getImpl(c context.Context, req *logs.GetRequest, tail bool) (*l |
func (s *Server) getLogs(c context.Context, req *logs.GetRequest, tail bool, ls *coordinator.LogStream) ( |
[]*logpb.LogEntry, error) { |
- var st storage.Storage |
- if !ls.Archived() { |
- log.Debugf(c, "Log is not archived. Fetching from intermediate storage.") |
- |
- // Logs are not archived. Fetch from intermediate storage. |
- var err error |
- st, err = s.Storage(c) |
- if err != nil { |
- return nil, err |
- } |
- } else { |
- log.Debugf(c, "Log is archived. Fetching from archive storage.") |
- var err error |
- gs, err := s.GSClient(c) |
- if err != nil { |
- log.WithError(err).Errorf(c, "Failed to create Google Storage client.") |
- return nil, err |
- } |
- defer func() { |
- if err := gs.Close(); err != nil { |
- log.WithError(err).Warningf(c, "Failed to close Google Storage client.") |
- } |
- }() |
- |
- st, err = archive.New(c, archive.Options{ |
- IndexURL: ls.ArchiveIndexURL, |
- StreamURL: ls.ArchiveStreamURL, |
- Client: gs, |
- }) |
- if err != nil { |
- log.WithError(err).Errorf(c, "Failed to create Google Storage storage instance.") |
- return nil, err |
- } |
+ st, err := s.StorageForLogStream(c, ls) |
+ if err != nil { |
+ return nil, err |
} |
defer st.Close() |
path := ls.Path() |
var fetchedLogs [][]byte |
- var err error |
if tail { |
fetchedLogs, err = getTail(c, st, path) |
} else { |
@@ -216,7 +184,7 @@ func getHead(c context.Context, req *logs.GetRequest, st storage.Storage, p type |
} |
// Allocate result logs array. |
- logCount := int(req.LogCount) |
+ logCount := int64(req.LogCount) |
asz := getInitialArraySize |
if logCount > 0 && logCount < asz { |
asz = logCount |
@@ -229,12 +197,11 @@ func getHead(c context.Context, req *logs.GetRequest, st storage.Storage, p type |
Limit: logCount, |
} |
- count := 0 |
err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { |
// Issue the Get request. This may return a transient error, in which case |
// we will retry. |
return st.Get(&sreq, func(idx types.MessageIndex, ld []byte) bool { |
- if count > 0 && byteLimit-len(ld) < 0 { |
+ if len(logs) > 0 && byteLimit-len(ld) < 0 { |
// Not the first log, and we've exceeded our byte limit. |
return false |
} |
@@ -245,8 +212,7 @@ func getHead(c context.Context, req *logs.GetRequest, st storage.Storage, p type |
} |
logs = append(logs, ld) |
sreq.Index = idx + 1 |
- count++ |
- return !(logCount > 0 && count >= logCount) |
+ return !(logCount > 0 && int64(len(logs)) >= logCount) |
}) |
}, func(err error, delay time.Duration) { |
log.Fields{ |