Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package logdog | 5 package logdog |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "net/http" | 9 "net/http" |
| 10 "time" | 10 "time" |
| 11 | 11 |
| 12 "github.com/luci/luci-go/common/errors" | |
| 12 log "github.com/luci/luci-go/common/logging" | 13 log "github.com/luci/luci-go/common/logging" |
| 13 "github.com/luci/luci-go/common/proto/google" | 14 "github.com/luci/luci-go/common/proto/google" |
| 14 miloProto "github.com/luci/luci-go/common/proto/milo" | 15 miloProto "github.com/luci/luci-go/common/proto/milo" |
| 16 "github.com/luci/luci-go/common/retry" | |
| 15 "github.com/luci/luci-go/grpc/grpcutil" | 17 "github.com/luci/luci-go/grpc/grpcutil" |
| 16 "github.com/luci/luci-go/logdog/api/logpb" | 18 "github.com/luci/luci-go/logdog/api/logpb" |
| 17 "github.com/luci/luci-go/logdog/client/coordinator" | 19 "github.com/luci/luci-go/logdog/client/coordinator" |
| 18 "github.com/luci/luci-go/logdog/common/types" | 20 "github.com/luci/luci-go/logdog/common/types" |
| 19 "github.com/luci/luci-go/logdog/common/viewer" | 21 "github.com/luci/luci-go/logdog/common/viewer" |
| 20 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 22 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 21 "github.com/luci/luci-go/milo/api/resp" | 23 "github.com/luci/luci-go/milo/api/resp" |
| 22 "github.com/luci/luci-go/milo/appengine/logdog/internal" | 24 "github.com/luci/luci-go/milo/appengine/logdog/internal" |
| 23 "github.com/luci/luci-go/milo/common/miloerror" | 25 "github.com/luci/luci-go/milo/common/miloerror" |
| 24 | 26 |
| 25 "github.com/golang/protobuf/proto" | 27 "github.com/golang/protobuf/proto" |
| 26 mc "github.com/luci/gae/service/memcache" | 28 mc "github.com/luci/gae/service/memcache" |
| 27 "golang.org/x/net/context" | 29 "golang.org/x/net/context" |
| 28 "google.golang.org/grpc/codes" | |
| 29 ) | 30 ) |
| 30 | 31 |
| 31 const ( | 32 const ( |
| 32 // intermediateCacheLifetime is the amount of time to cache intermediate (non- | 33 // intermediateCacheLifetime is the amount of time to cache intermediate (non- |
| 33 // terminal) annotation streams. Terminal annotation streams are cached | 34 // terminal) annotation streams. Terminal annotation streams are cached |
| 34 // indefinitely. | 35 // indefinitely. |
| 35 intermediateCacheLifetime = 10 * time.Second | 36 intermediateCacheLifetime = 10 * time.Second |
| 36 | 37 |
| 37 // defaultLogDogHost is the default LogDog host, if one isn't specified via | 38 // defaultLogDogHost is the default LogDog host, if one isn't specified via |
| 38 // query string. | 39 // query string. |
| (...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 111 log.Fields{ | 112 log.Fields{ |
| 112 "host": as.Client.Host, | 113 "host": as.Client.Host, |
| 113 "project": as.Project, | 114 "project": as.Project, |
| 114 "path": as.Path, | 115 "path": as.Path, |
| 115 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.") | 116 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.") |
| 116 | 117 |
| 117 var ( | 118 var ( |
| 118 state coordinator.LogStream | 119 state coordinator.LogStream |
| 119 stream = as.Client.Stream(as.Project, as.Path) | 120 stream = as.Client.Stream(as.Project, as.Path) |
| 120 ) | 121 ) |
| 121 le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Com plete()) | |
| 122 switch code := grpcutil.Code(err); code { | |
| 123 case codes.OK: | |
| 124 break | |
| 125 | 122 |
| 126 » case codes.NotFound: | 123 » var le *logpb.LogEntry |
| 127 » » return nil, &miloerror.Error{ | 124 » err = retry.Retry(c, retry.TransientOnly(streamLoadRetry), func() (err e rror) { |
| 128 » » » Message: "Stream not found", | 125 » » le, err = stream.Tail(c, coordinator.WithState(&state), coordina tor.Complete()) |
| 129 » » » Code: http.StatusNotFound, | 126 » » switch errors.Unwrap(err) { |
| 127 » » case nil: | |
| 128 » » » return nil | |
| 129 | |
| 130 » » case coordinator.ErrNoSuchStream: | |
|
hinoka
2017/02/24 01:30:49
Consider making this just a non-transient error.
dnj
2017/02/24 03:24:09
Good idea, Done.
| |
| 131 » » » log.Warningf(c, "Stream not found (maybe transient).") | |
| 132 » » » return errors.WrapTransient(err) | |
| 133 | |
| 134 » » default: | |
| 135 » » » // If this is a transient gRPC failure, retry. | |
| 136 » » » return grpcutil.WrapIfTransient(err) | |
| 130 } | 137 } |
| 131 | 138 » }, retry.LogCallback(c, "load stream")) |
| 132 » default: | 139 » if err != nil { |
| 133 » » // TODO: Once we switch to delegation tokens and are making the request on | 140 » » log.WithError(err).Errorf(c, "Failed to load stream.") |
| 134 » » // behalf of a user rather than the Milo service, handle Permiss ionDenied. | |
| 135 » » log.Fields{ | |
| 136 » » » log.ErrorKey: err, | |
| 137 » » » "code": code, | |
| 138 » » }.Errorf(c, "Failed to load LogDog annotation stream.") | |
| 139 return nil, &miloerror.Error{ | 141 return nil, &miloerror.Error{ |
| 140 Message: "Failed to load stream", | 142 Message: "Failed to load stream", |
| 141 Code: http.StatusInternalServerError, | 143 Code: http.StatusInternalServerError, |
| 142 } | 144 } |
| 143 } | 145 } |
| 144 | 146 |
| 145 // Make sure that this is an annotation stream. | 147 // Make sure that this is an annotation stream. |
| 146 switch { | 148 switch { |
| 147 case state.Desc.ContentType != miloProto.ContentTypeAnnotations: | 149 case state.Desc.ContentType != miloProto.ContentTypeAnnotations: |
| 148 return nil, &miloerror.Error{ | 150 return nil, &miloerror.Error{ |
| (...skipping 156 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 305 if link.Label == "" { | 307 if link.Label == "" { |
| 306 link.Label = "unnamed" | 308 link.Label = "unnamed" |
| 307 } | 309 } |
| 308 return &link | 310 return &link |
| 309 | 311 |
| 310 default: | 312 default: |
| 311 // Don't know how to render. | 313 // Don't know how to render. |
| 312 return nil | 314 return nil |
| 313 } | 315 } |
| 314 } | 316 } |
| 317 | |
| 318 // streamLoadRetry is a retry iterator to apply to repeatedly load a log stream. | |
| 319 // | |
| 320 // This implements exponential backoff, capped at 5 seconds. | |
| 321 func streamLoadRetry() retry.Iterator { | |
| 322 return &retry.ExponentialBackoff{ | |
| 323 Limited: retry.Limited{ | |
| 324 Delay: 500 * time.Millisecond, | |
| 325 Retries: 10, | |
| 326 }, | |
| 327 MaxDelay: 5 * time.Second, | |
| 328 } | |
| 329 } | |
| OLD | NEW |