Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1379)

Unified Diff: logdog/appengine/coordinator/endpoints/logs/get.go

Issue 2538203002: LogDog: Add signed GS URL fetching. (Closed)
Patch Set: Allow index signing, use gaesigner. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: logdog/appengine/coordinator/endpoints/logs/get.go
diff --git a/logdog/appengine/coordinator/endpoints/logs/get.go b/logdog/appengine/coordinator/endpoints/logs/get.go
index 5dbc6529b3ec39a313b7399ec1b2543082f135cf..dc7bde6a8324b636f11c7e6660b5f47421676136 100644
--- a/logdog/appengine/coordinator/endpoints/logs/get.go
+++ b/logdog/appengine/coordinator/endpoints/logs/get.go
@@ -7,18 +7,20 @@ package logs
import (
"time"
- ds "github.com/luci/gae/service/datastore"
"github.com/luci/luci-go/common/config"
+ "github.com/luci/luci-go/common/errors"
log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/proto/google"
"github.com/luci/luci-go/common/retry"
"github.com/luci/luci-go/grpc/grpcutil"
"github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
"github.com/luci/luci-go/logdog/api/logpb"
"github.com/luci/luci-go/logdog/appengine/coordinator"
"github.com/luci/luci-go/logdog/common/storage"
- "github.com/luci/luci-go/logdog/common/storage/archive"
"github.com/luci/luci-go/logdog/common/types"
+ ds "github.com/luci/gae/service/datastore"
+
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
)
@@ -94,12 +96,7 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
}
}
- // If nothing was requested, return nothing.
resp := logdog.GetResponse{}
- if !(req.State || tail) && req.LogCount < 0 {
- return &resp, nil
- }
-
if req.State {
resp.State = buildLogStreamState(ls, lst)
@@ -112,13 +109,9 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
}
// Retrieve requested logs from storage, if requested.
- if tail || req.LogCount >= 0 {
- var err error
- resp.Logs, err = s.getLogs(c, req, tail, ls, lst)
- if err != nil {
- log.WithError(err).Errorf(c, "Failed to get logs.")
- return nil, grpcutil.Internal
- }
+ if err := s.getLogs(c, req, &resp, tail, ls, lst); err != nil {
+ log.WithError(err).Errorf(c, "Failed to get logs.")
+ return nil, grpcutil.Internal
}
log.Fields{
@@ -127,74 +120,69 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
return &resp, nil
}
-func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, ls *coordinator.LogStream,
- lst *coordinator.LogStreamState) ([]*logpb.LogEntry, error) {
- byteLimit := int(req.ByteCount)
- if byteLimit <= 0 || byteLimit > getBytesLimit {
- byteLimit = getBytesLimit
+func (s *server) getLogs(c context.Context, req *logdog.GetRequest, resp *logdog.GetResponse,
+ tail bool, ls *coordinator.LogStream, lst *coordinator.LogStreamState) error {
+
+ // Identify our URL signing parameters.
+ var signingRequest coordinator.URLSigningRequest
+ if sr := req.GetSignedUrls; sr != nil {
+ signingRequest.Lifetime = sr.Lifetime.Duration()
+ signingRequest.Stream = sr.Stream
+ signingRequest.Index = sr.Index
+ }
+ if !tail && req.LogCount < 0 && !signingRequest.HasWork() {
+ // No log operations are acutally needed, so don't bother instanting our
+ // Storage instance only to do nothing.
+ return nil
}
svc := coordinator.GetServices(c)
- var st storage.Storage
- if !lst.ArchivalState().Archived() {
- log.Debugf(c, "Log is not archived. Fetching from intermediate storage.")
-
- // Logs are not archived. Fetch from intermediate storage.
- var err error
- st, err = svc.IntermediateStorage(c)
- if err != nil {
- return nil, err
- }
- } else {
- log.Fields{
- "indexURL": lst.ArchiveIndexURL,
- "streamURL": lst.ArchiveStreamURL,
- "archiveTime": lst.ArchivedTime,
- }.Debugf(c, "Log is archived. Fetching from archive storage.")
-
- var err error
- gs, err := svc.GSClient(c)
- if err != nil {
- log.WithError(err).Errorf(c, "Failed to create Google Storage client.")
- return nil, err
- }
- defer func() {
- if err := gs.Close(); err != nil {
- log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
- }
- }()
-
- st, err = archive.New(c, archive.Options{
- IndexURL: lst.ArchiveIndexURL,
- StreamURL: lst.ArchiveStreamURL,
- Client: gs,
- Cache: svc.StorageCache(),
- })
- if err != nil {
- log.WithError(err).Errorf(c, "Failed to create Google Storage storage instance.")
- return nil, err
- }
+ st, err := svc.StorageForStream(c, lst)
+ if err != nil {
+ return errors.Annotate(err).InternalReason("failed to create storage instance").Err()
}
defer st.Close()
project, path := coordinator.Project(c), ls.Path()
- var fetchedLogs []*logpb.LogEntry
- var err error
if tail {
- fetchedLogs, err = getTail(c, st, project, path)
- } else {
- fetchedLogs, err = getHead(c, req, st, project, path, byteLimit)
+ resp.Logs, err = getTail(c, st, project, path)
+ } else if req.LogCount >= 0 {
+ byteLimit := int(req.ByteCount)
+ if byteLimit <= 0 || byteLimit > getBytesLimit {
+ byteLimit = getBytesLimit
+ }
+
+ resp.Logs, err = getHead(c, req, st, project, path, byteLimit)
}
if err != nil {
log.WithError(err).Errorf(c, "Failed to fetch log records.")
- return nil, err
+ return err
+ }
+
+ // If we're requesting a signedl URL, try and get that too.
+ if signingRequest.HasWork() {
+ signedURLs, err := st.GetSignedURLs(c, &signingRequest)
+ switch {
+ case err != nil:
+ return errors.Annotate(err).InternalReason("failed to generate signed URL").Err()
+
+ case signedURLs == nil:
+ log.Debugf(c, "Signed URL was requested, but is not supported by storage.")
+
+ default:
+ resp.SignedUrls = &logdog.GetResponse_SignedUrls{
+ Expiration: google.NewTimestamp(signedURLs.Expiration),
+ Stream: signedURLs.Stream,
+ Index: signedURLs.Index,
+ }
+ }
}
- return fetchedLogs, nil
+ return nil
}
-func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, project config.ProjectName,
+func getHead(c context.Context, req *logdog.GetRequest, st coordinator.Storage, project config.ProjectName,
path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) {
log.Fields{
"project": project,
@@ -278,7 +266,7 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj
}
}
-func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) (
+func getTail(c context.Context, st coordinator.Storage, project config.ProjectName, path types.StreamPath) (
[]*logpb.LogEntry, error) {
log.Fields{
"project": project,

Powered by Google App Engine
This is Rietveld 408576698