| Index: logdog/appengine/coordinator/endpoints/logs/get.go
|
| diff --git a/logdog/appengine/coordinator/endpoints/logs/get.go b/logdog/appengine/coordinator/endpoints/logs/get.go
|
| index 5dbc6529b3ec39a313b7399ec1b2543082f135cf..dc7bde6a8324b636f11c7e6660b5f47421676136 100644
|
| --- a/logdog/appengine/coordinator/endpoints/logs/get.go
|
| +++ b/logdog/appengine/coordinator/endpoints/logs/get.go
|
| @@ -7,18 +7,20 @@ package logs
|
| import (
|
| "time"
|
|
|
| - ds "github.com/luci/gae/service/datastore"
|
| "github.com/luci/luci-go/common/config"
|
| + "github.com/luci/luci-go/common/errors"
|
| log "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/common/proto/google"
|
| "github.com/luci/luci-go/common/retry"
|
| "github.com/luci/luci-go/grpc/grpcutil"
|
| "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
|
| "github.com/luci/luci-go/logdog/api/logpb"
|
| "github.com/luci/luci-go/logdog/appengine/coordinator"
|
| "github.com/luci/luci-go/logdog/common/storage"
|
| - "github.com/luci/luci-go/logdog/common/storage/archive"
|
| "github.com/luci/luci-go/logdog/common/types"
|
|
|
| + ds "github.com/luci/gae/service/datastore"
|
| +
|
| "golang.org/x/net/context"
|
| "google.golang.org/grpc/codes"
|
| )
|
| @@ -94,12 +96,7 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
|
| }
|
| }
|
|
|
| - // If nothing was requested, return nothing.
|
| resp := logdog.GetResponse{}
|
| - if !(req.State || tail) && req.LogCount < 0 {
|
| - return &resp, nil
|
| - }
|
| -
|
| if req.State {
|
| resp.State = buildLogStreamState(ls, lst)
|
|
|
| @@ -112,13 +109,9 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
|
| }
|
|
|
| // Retrieve requested logs from storage, if requested.
|
| - if tail || req.LogCount >= 0 {
|
| - var err error
|
| - resp.Logs, err = s.getLogs(c, req, tail, ls, lst)
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to get logs.")
|
| - return nil, grpcutil.Internal
|
| - }
|
| + if err := s.getLogs(c, req, &resp, tail, ls, lst); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to get logs.")
|
| + return nil, grpcutil.Internal
|
| }
|
|
|
| log.Fields{
|
| @@ -127,74 +120,69 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
|
| return &resp, nil
|
| }
|
|
|
| -func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, ls *coordinator.LogStream,
|
| - lst *coordinator.LogStreamState) ([]*logpb.LogEntry, error) {
|
| - byteLimit := int(req.ByteCount)
|
| - if byteLimit <= 0 || byteLimit > getBytesLimit {
|
| - byteLimit = getBytesLimit
|
| +func (s *server) getLogs(c context.Context, req *logdog.GetRequest, resp *logdog.GetResponse,
|
| + tail bool, ls *coordinator.LogStream, lst *coordinator.LogStreamState) error {
|
| +
|
| + // Identify our URL signing parameters.
|
| + var signingRequest coordinator.URLSigningRequest
|
| + if sr := req.GetSignedUrls; sr != nil {
|
| + signingRequest.Lifetime = sr.Lifetime.Duration()
|
| + signingRequest.Stream = sr.Stream
|
| + signingRequest.Index = sr.Index
|
| + }
|
| + if !tail && req.LogCount < 0 && !signingRequest.HasWork() {
|
| + // No log operations are acutally needed, so don't bother instanting our
|
| + // Storage instance only to do nothing.
|
| + return nil
|
| }
|
|
|
| svc := coordinator.GetServices(c)
|
| - var st storage.Storage
|
| - if !lst.ArchivalState().Archived() {
|
| - log.Debugf(c, "Log is not archived. Fetching from intermediate storage.")
|
| -
|
| - // Logs are not archived. Fetch from intermediate storage.
|
| - var err error
|
| - st, err = svc.IntermediateStorage(c)
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - } else {
|
| - log.Fields{
|
| - "indexURL": lst.ArchiveIndexURL,
|
| - "streamURL": lst.ArchiveStreamURL,
|
| - "archiveTime": lst.ArchivedTime,
|
| - }.Debugf(c, "Log is archived. Fetching from archive storage.")
|
| -
|
| - var err error
|
| - gs, err := svc.GSClient(c)
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to create Google Storage client.")
|
| - return nil, err
|
| - }
|
| - defer func() {
|
| - if err := gs.Close(); err != nil {
|
| - log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
|
| - }
|
| - }()
|
| -
|
| - st, err = archive.New(c, archive.Options{
|
| - IndexURL: lst.ArchiveIndexURL,
|
| - StreamURL: lst.ArchiveStreamURL,
|
| - Client: gs,
|
| - Cache: svc.StorageCache(),
|
| - })
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to create Google Storage storage instance.")
|
| - return nil, err
|
| - }
|
| + st, err := svc.StorageForStream(c, lst)
|
| + if err != nil {
|
| + return errors.Annotate(err).InternalReason("failed to create storage instance").Err()
|
| }
|
| defer st.Close()
|
|
|
| project, path := coordinator.Project(c), ls.Path()
|
|
|
| - var fetchedLogs []*logpb.LogEntry
|
| - var err error
|
| if tail {
|
| - fetchedLogs, err = getTail(c, st, project, path)
|
| - } else {
|
| - fetchedLogs, err = getHead(c, req, st, project, path, byteLimit)
|
| + resp.Logs, err = getTail(c, st, project, path)
|
| + } else if req.LogCount >= 0 {
|
| + byteLimit := int(req.ByteCount)
|
| + if byteLimit <= 0 || byteLimit > getBytesLimit {
|
| + byteLimit = getBytesLimit
|
| + }
|
| +
|
| + resp.Logs, err = getHead(c, req, st, project, path, byteLimit)
|
| }
|
| if err != nil {
|
| log.WithError(err).Errorf(c, "Failed to fetch log records.")
|
| - return nil, err
|
| + return err
|
| + }
|
| +
|
| + // If we're requesting a signedl URL, try and get that too.
|
| + if signingRequest.HasWork() {
|
| + signedURLs, err := st.GetSignedURLs(c, &signingRequest)
|
| + switch {
|
| + case err != nil:
|
| + return errors.Annotate(err).InternalReason("failed to generate signed URL").Err()
|
| +
|
| + case signedURLs == nil:
|
| + log.Debugf(c, "Signed URL was requested, but is not supported by storage.")
|
| +
|
| + default:
|
| + resp.SignedUrls = &logdog.GetResponse_SignedUrls{
|
| + Expiration: google.NewTimestamp(signedURLs.Expiration),
|
| + Stream: signedURLs.Stream,
|
| + Index: signedURLs.Index,
|
| + }
|
| + }
|
| }
|
|
|
| - return fetchedLogs, nil
|
| + return nil
|
| }
|
|
|
| -func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, project config.ProjectName,
|
| +func getHead(c context.Context, req *logdog.GetRequest, st coordinator.Storage, project config.ProjectName,
|
| path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) {
|
| log.Fields{
|
| "project": project,
|
| @@ -278,7 +266,7 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj
|
| }
|
| }
|
|
|
| -func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) (
|
| +func getTail(c context.Context, st coordinator.Storage, project config.ProjectName, path types.StreamPath) (
|
| []*logpb.LogEntry, error) {
|
| log.Fields{
|
| "project": project,
|
|
|