Chromium Code Reviews| Index: logdog/appengine/coordinator/endpoints/logs/get.go |
| diff --git a/logdog/appengine/coordinator/endpoints/logs/get.go b/logdog/appengine/coordinator/endpoints/logs/get.go |
| index cd72ca864af3ebb8daac298fa0dba82f5c7cea0c..28b37b2a169a582a7c7b14cff1cbcc16660d53ef 100644 |
| --- a/logdog/appengine/coordinator/endpoints/logs/get.go |
| +++ b/logdog/appengine/coordinator/endpoints/logs/get.go |
| @@ -7,7 +7,6 @@ package logs |
| import ( |
| "time" |
| - "github.com/golang/protobuf/proto" |
| ds "github.com/luci/gae/service/datastore" |
| "github.com/luci/luci-go/common/config" |
| log "github.com/luci/luci-go/common/logging" |
| @@ -169,7 +168,6 @@ func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, l |
| IndexURL: lst.ArchiveIndexURL, |
| StreamURL: lst.ArchiveStreamURL, |
| Client: gs, |
| - MaxBytes: byteLimit, |
|
dnj
2016/10/19 23:18:59
Getting rid of this is no big deal, since it's enf
|
| }) |
| if err != nil { |
| log.WithError(err).Errorf(c, "Failed to create Google Storage storage instance.") |
| @@ -180,7 +178,7 @@ func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, l |
| project, path := coordinator.Project(c), ls.Path() |
| - var fetchedLogs [][]byte |
| + var fetchedLogs []*logpb.LogEntry |
| var err error |
| if tail { |
| fetchedLogs, err = getTail(c, st, project, path) |
| @@ -192,24 +190,11 @@ func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, l |
| return nil, err |
| } |
| - logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) |
| - for idx, ld := range fetchedLogs { |
| - // Deserialize the log entry, then convert it to output value. |
| - le := logpb.LogEntry{} |
| - if err := proto.Unmarshal(ld, &le); err != nil { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "index": idx, |
| - }.Errorf(c, "Failed to generate response log entry.") |
| - return nil, err |
| - } |
| - logEntries[idx] = &le |
| - } |
| - return logEntries, nil |
| + return fetchedLogs, nil |
| } |
| func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, project config.ProjectName, |
| - path types.StreamPath, byteLimit int) ([][]byte, error) { |
| + path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) { |
| log.Fields{ |
| "project": project, |
| "path": path, |
| @@ -225,7 +210,7 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj |
| if logCount > 0 && logCount < asz { |
| asz = logCount |
| } |
| - logs := make([][]byte, 0, asz) |
| + logs := make([]*logpb.LogEntry, 0, asz) |
| sreq := storage.GetRequest{ |
| Project: project, |
| @@ -235,21 +220,28 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj |
| } |
| count := 0 |
| + var ierr error |
| err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { |
| // Issue the Get request. This may return a transient error, in which case |
| // we will retry. |
| - return st.Get(sreq, func(idx types.MessageIndex, ld []byte) bool { |
| - if count > 0 && byteLimit-len(ld) < 0 { |
| + return st.Get(sreq, func(e *storage.Entry) bool { |
| + var le *logpb.LogEntry |
| + if le, ierr = e.GetLogEntry(); ierr != nil { |
| + return false |
| + } |
| + |
| + if count > 0 && byteLimit-len(e.D) < 0 { |
| // Not the first log, and we've exceeded our byte limit. |
| return false |
| } |
| - byteLimit -= len(ld) |
| + byteLimit -= len(e.D) |
|
nodir
2016/10/26 20:25:33
you are mutating byteLimit in a retry loop. I thin
|
| - if !(req.NonContiguous || idx == sreq.Index) { |
| + sidx, _ := e.GetStreamIndex() // GetLogEntry succeeded, so this must. |
| + if !(req.NonContiguous || sidx == sreq.Index) { |
| return false |
| } |
| - logs = append(logs, ld) |
| - sreq.Index = idx + 1 |
| + logs = append(logs, le) |
| + sreq.Index = sidx + 1 |
| count++ |
|
nodir
2016/10/26 20:25:33
same here
|
| return !(logCount > 0 && count >= logCount) |
| }) |
| @@ -264,6 +256,10 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj |
| }) |
| switch err { |
| case nil: |
| + if ierr != nil { |
| + log.WithError(ierr).Errorf(c, "Bad log entry data.") |
| + return nil, ierr |
| + } |
| return logs, nil |
| case storage.ErrDoesNotExist: |
| @@ -280,15 +276,16 @@ func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj |
| } |
| } |
| -func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) ([][]byte, error) { |
| +func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) ( |
| + []*logpb.LogEntry, error) { |
| log.Fields{ |
| "project": project, |
| "path": path, |
| }.Debugf(c, "Issuing Tail request.") |
| - var data []byte |
| + var e *storage.Entry |
| err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err error) { |
| - data, _, err = st.Tail(project, path) |
| + e, err = st.Tail(project, path) |
| return |
| }, func(err error, delay time.Duration) { |
| log.Fields{ |
| @@ -298,7 +295,12 @@ func getTail(c context.Context, st storage.Storage, project config.ProjectName, |
| }) |
| switch err { |
| case nil: |
| - return [][]byte{data}, nil |
| + le, err := e.GetLogEntry() |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to load tail entry data.") |
| + return nil, err |
| + } |
| + return []*logpb.LogEntry{le}, nil |
| case storage.ErrDoesNotExist: |
| return nil, nil |