Chromium Code Reviews| Index: logdog/appengine/coordinator/endpoints/logs/get.go |
| diff --git a/logdog/appengine/coordinator/endpoints/logs/get.go b/logdog/appengine/coordinator/endpoints/logs/get.go |
| index 5dbc6529b3ec39a313b7399ec1b2543082f135cf..c53c9efc9975ca50f6fa4a4b322544bda996e183 100644 |
| --- a/logdog/appengine/coordinator/endpoints/logs/get.go |
| +++ b/logdog/appengine/coordinator/endpoints/logs/get.go |
| @@ -7,18 +7,20 @@ package logs |
| import ( |
| "time" |
| - ds "github.com/luci/gae/service/datastore" |
| "github.com/luci/luci-go/common/config" |
| + "github.com/luci/luci-go/common/errors" |
| log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/proto/google" |
| "github.com/luci/luci-go/common/retry" |
| "github.com/luci/luci-go/grpc/grpcutil" |
| "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" |
| "github.com/luci/luci-go/logdog/api/logpb" |
| "github.com/luci/luci-go/logdog/appengine/coordinator" |
| "github.com/luci/luci-go/logdog/common/storage" |
| - "github.com/luci/luci-go/logdog/common/storage/archive" |
| "github.com/luci/luci-go/logdog/common/types" |
| + ds "github.com/luci/gae/service/datastore" |
| + |
| "golang.org/x/net/context" |
| "google.golang.org/grpc/codes" |
| ) |
| @@ -59,6 +61,7 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( |
| "project": req.Project, |
| "path": req.Path, |
| "index": req.Index, |
| + "sign": req.SignEntryUrlLifetime.Duration(), |
| "tail": tail, |
| }.Debugf(c, "Received get request.") |
| @@ -94,12 +97,7 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( |
| } |
| } |
| - // If nothing was requested, return nothing. |
| resp := logdog.GetResponse{} |
| - if !(req.State || tail) && req.LogCount < 0 { |
| - return &resp, nil |
| - } |
| - |
| if req.State { |
| resp.State = buildLogStreamState(ls, lst) |
| @@ -112,13 +110,9 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( |
| } |
| // Retrieve requested logs from storage, if requested. |
| - if tail || req.LogCount >= 0 { |
| - var err error |
| - resp.Logs, err = s.getLogs(c, req, tail, ls, lst) |
| - if err != nil { |
| - log.WithError(err).Errorf(c, "Failed to get logs.") |
| - return nil, grpcutil.Internal |
| - } |
| + if err := s.getLogs(c, req, &resp, tail, ls, lst); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to get logs.") |
| + return nil, grpcutil.Internal |
| } |
| log.Fields{ |
| @@ -127,74 +121,63 @@ func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( |
| return &resp, nil |
| } |
| -func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, ls *coordinator.LogStream, |
| - lst *coordinator.LogStreamState) ([]*logpb.LogEntry, error) { |
| - byteLimit := int(req.ByteCount) |
| - if byteLimit <= 0 || byteLimit > getBytesLimit { |
| - byteLimit = getBytesLimit |
| +func (s *server) getLogs(c context.Context, req *logdog.GetRequest, resp *logdog.GetResponse, |
| + tail bool, ls *coordinator.LogStream, lst *coordinator.LogStreamState) error { |
| + |
| + signURLLifetime := req.SignEntryUrlLifetime.Duration() |
| + if !tail && req.LogCount < 0 && signURLLifetime <= 0 { |
| + // No log operations are acutally needed, so don't bother instanting our |
| + // Storage instance only to do nothing. |
| + return nil |
| } |
| svc := coordinator.GetServices(c) |
| - var st storage.Storage |
| - if !lst.ArchivalState().Archived() { |
| - log.Debugf(c, "Log is not archived. Fetching from intermediate storage.") |
| - |
| - // Logs are not archived. Fetch from intermediate storage. |
| - var err error |
| - st, err = svc.IntermediateStorage(c) |
| - if err != nil { |
| - return nil, err |
| - } |
| - } else { |
| - log.Fields{ |
| - "indexURL": lst.ArchiveIndexURL, |
| - "streamURL": lst.ArchiveStreamURL, |
| - "archiveTime": lst.ArchivedTime, |
| - }.Debugf(c, "Log is archived. Fetching from archive storage.") |
| - |
| - var err error |
| - gs, err := svc.GSClient(c) |
| - if err != nil { |
| - log.WithError(err).Errorf(c, "Failed to create Google Storage client.") |
| - return nil, err |
| - } |
| - defer func() { |
| - if err := gs.Close(); err != nil { |
| - log.WithError(err).Warningf(c, "Failed to close Google Storage client.") |
| - } |
| - }() |
| - |
| - st, err = archive.New(c, archive.Options{ |
| - IndexURL: lst.ArchiveIndexURL, |
| - StreamURL: lst.ArchiveStreamURL, |
| - Client: gs, |
| - Cache: svc.StorageCache(), |
| - }) |
| - if err != nil { |
| - log.WithError(err).Errorf(c, "Failed to create Google Storage storage instance.") |
| - return nil, err |
| - } |
| + st, err := svc.StorageForStream(c, lst) |
| + if err != nil { |
| + return errors.Annotate(err).InternalReason("failed to create storage instance").Err() |
| } |
| defer st.Close() |
| project, path := coordinator.Project(c), ls.Path() |
| - var fetchedLogs []*logpb.LogEntry |
| - var err error |
| if tail { |
| - fetchedLogs, err = getTail(c, st, project, path) |
| - } else { |
| - fetchedLogs, err = getHead(c, req, st, project, path, byteLimit) |
| + resp.Logs, err = getTail(c, st, project, path) |
| + } else if req.LogCount >= 0 { |
| + byteLimit := int(req.ByteCount) |
| + if byteLimit <= 0 || byteLimit > getBytesLimit { |
| + byteLimit = getBytesLimit |
| + } |
| + |
| + resp.Logs, err = getHead(c, req, st, project, path, byteLimit) |
| } |
| if err != nil { |
| log.WithError(err).Errorf(c, "Failed to fetch log records.") |
| - return nil, err |
| + return err |
| + } |
| + |
| + // If we're requesting a signedl URL, try and get that too. |
| + if signURLLifetime > 0 { |
|
Vadim Sh.
2016/11/30 21:03:52
is a request with 'tail == true' and 'signURLLifet
dnj
2016/12/01 17:39:30
Nope, b/c that parameter is not part of the TailRe
|
| + value, expire, err := st.SignStreamURL(c, signURLLifetime) |
| + switch err { |
| + case nil: |
| + resp.SignedEntryUrl = &logdog.GetResponse_SignedEntryUrl{ |
| + Value: value, |
| + Expiration: google.NewTimestamp(expire), |
| + } |
| + |
| + case coordinator.ErrSigningNotSupported: |
| + log.Debugf(c, "Signed URL was requested, but is not supported by storage.") |
| + break |
| + |
| + default: |
| + return errors.Annotate(err).InternalReason("failed to generate signed URL").Err() |
| + } |
| } |
| - return fetchedLogs, nil |
| + return nil |
| } |
| -func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, project config.ProjectName, |
| +func getHead(c context.Context, req *logdog.GetRequest, st coordinator.Storage, project config.ProjectName, |
| path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) { |
| log.Fields{ |
| "project": project, |
| @@ -278,7 +261,7 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj |
| } |
| } |
| -func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) ( |
| +func getTail(c context.Context, st coordinator.Storage, project config.ProjectName, path types.StreamPath) ( |
| []*logpb.LogEntry, error) { |
| log.Fields{ |
| "project": project, |