Chromium Code Reviews| Index: milo/appengine/logdog/build.go |
| diff --git a/milo/appengine/logdog/build.go b/milo/appengine/logdog/build.go |
| index 43667b86f2dcf4c0c9ba1876b49dfc68bdb09d04..d6b837137cf5307f387921208d9a93e64dfe74b3 100644 |
| --- a/milo/appengine/logdog/build.go |
| +++ b/milo/appengine/logdog/build.go |
| @@ -9,9 +9,11 @@ import ( |
| "net/http" |
| "time" |
| + "github.com/luci/luci-go/common/errors" |
| log "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/proto/google" |
| miloProto "github.com/luci/luci-go/common/proto/milo" |
| + "github.com/luci/luci-go/common/retry" |
| "github.com/luci/luci-go/grpc/grpcutil" |
| "github.com/luci/luci-go/logdog/api/logpb" |
| "github.com/luci/luci-go/logdog/client/coordinator" |
| @@ -25,7 +27,6 @@ import ( |
| "github.com/golang/protobuf/proto" |
| mc "github.com/luci/gae/service/memcache" |
| "golang.org/x/net/context" |
| - "google.golang.org/grpc/codes" |
| ) |
| const ( |
| @@ -118,24 +119,25 @@ func (as *AnnotationStream) Fetch(c context.Context) (*miloProto.Step, error) { |
| state coordinator.LogStream |
| stream = as.Client.Stream(as.Project, as.Path) |
| ) |
| - le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Complete()) |
| - switch code := grpcutil.Code(err); code { |
| - case codes.OK: |
| - break |
| - case codes.NotFound: |
| - return nil, &miloerror.Error{ |
| - Message: "Stream not found", |
| - Code: http.StatusNotFound, |
| - } |
| + var le *logpb.LogEntry |
| + err = retry.Retry(c, retry.TransientOnly(streamLoadRetry), func() (err error) { |
| + le, err = stream.Tail(c, coordinator.WithState(&state), coordinator.Complete()) |
| + switch errors.Unwrap(err) { |
| + case nil: |
| + return nil |
| - default: |
| - // TODO: Once we switch to delegation tokens and are making the request on |
| - // behalf of a user rather than the Milo service, handle PermissionDenied. |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "code": code, |
| - }.Errorf(c, "Failed to load LogDog annotation stream.") |
| + case coordinator.ErrNoSuchStream: |
|
hinoka
2017/02/24 01:30:49
Consider making this just a non-transient error.
dnj
2017/02/24 03:24:09
Good idea, Done.
|
| + log.Warningf(c, "Stream not found (maybe transient).") |
| + return errors.WrapTransient(err) |
| + |
| + default: |
| + // If this is a transient gRPC failure, retry. |
| + return grpcutil.WrapIfTransient(err) |
| + } |
| + }, retry.LogCallback(c, "load stream")) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to load stream.") |
| return nil, &miloerror.Error{ |
| Message: "Failed to load stream", |
| Code: http.StatusInternalServerError, |
| @@ -312,3 +314,16 @@ func (b *ViewerURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { |
| return nil |
| } |
| } |
| + |
| +// streamLoadRetry is a retry iterator to apply to repeatedly load a log stream. |
| +// |
| +// This implements exponential backoff, capped at 5 seconds. |
| +func streamLoadRetry() retry.Iterator { |
| + return &retry.ExponentialBackoff{ |
| + Limited: retry.Limited{ |
| + Delay: 500 * time.Millisecond, |
| + Retries: 10, |
| + }, |
| + MaxDelay: 5 * time.Second, |
| + } |
| +} |