OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package prpc | 5 package prpc |
6 | 6 |
7 import ( | |
8 "bytes" | |
9 "fmt" | |
10 "io" | |
11 "io/ioutil" | |
12 "net/http" | |
13 "net/url" | |
14 "strconv" | |
15 "strings" | |
16 "time" | |
17 | |
18 "github.com/golang/protobuf/proto" | |
19 "golang.org/x/net/context" | |
20 "golang.org/x/net/context/ctxhttp" | |
21 "google.golang.org/grpc" | |
22 "google.golang.org/grpc/codes" | |
23 "google.golang.org/grpc/metadata" | |
24 | |
25 "github.com/luci/luci-go/common/clock" | |
26 "github.com/luci/luci-go/common/errors" | |
27 "github.com/luci/luci-go/common/grpcutil" | |
28 "github.com/luci/luci-go/common/logging" | |
29 "github.com/luci/luci-go/common/retry" | |
30 ) | |
31 | |
7 const ( | 32 const ( |
8 // HeaderGRPCCode is a name of the HTTP header that specifies the | 33 // HeaderGRPCCode is a name of the HTTP header that specifies the |
9 // gRPC code in the response. | 34 // gRPC code in the response. |
10 // A pRPC server must always specify it. | 35 // A pRPC server must always specify it. |
11 HeaderGRPCCode = "X-Prpc-Grpc-Code" | 36 HeaderGRPCCode = "X-Prpc-Grpc-Code" |
37 | |
38 // HeaderTimeout is HTTP header used to set pRPC request timeout. | |
39 // The single value should match regexp `\d+[HMSmun]`. | |
40 HeaderTimeout = "X-Prpc-Grpc-Deadline" | |
nodir
2016/01/27 03:04:18
was accidentally renamed to deadline?
fwiu, usual
dnj (Google)
2016/01/27 03:25:53
Done.
| |
12 ) | 41 ) |
42 | |
43 // DefaultUserAgent is default User-Agent HTTP header for pRPC requests. | |
44 var DefaultUserAgent = "pRPC Client 1.0" | |
45 | |
46 // Client can make pRPC calls. | |
47 type Client struct { | |
48 C *http.Client // if nil, uses http.DefaultClient | |
49 | |
50 // ErrBodySize is the number of bytes to read from a HTTP response | |
51 // with error status and include in the error. | |
52 // If non-positive, defaults to 256. | |
53 ErrBodySize int | |
54 | |
55 Host string // host and optionally a port number of the target serv er. | |
56 Options *Options // if nil, DefaultOptions() are used. | |
57 } | |
58 | |
59 // renderOptions copies client options and applies opts. | |
60 func (c *Client) renderOptions(opts []grpc.CallOption) *Options { | |
61 var options *Options | |
62 if c.Options != nil { | |
63 cpy := *c.Options | |
64 options = &cpy | |
65 } else { | |
66 options = DefaultOptions() | |
67 } | |
68 options.apply(opts) | |
69 return options | |
70 } | |
71 | |
72 func (c *Client) getHTTPClient() *http.Client { | |
73 if c.C == nil { | |
74 return http.DefaultClient | |
75 } | |
76 return c.C | |
77 } | |
78 | |
79 // Call makes an RPC. | |
80 // Retries on transient errors according to retry options. | |
81 // Logs HTTP errors. | |
82 // | |
83 // opts must be created by this package. | |
84 // Calling from multiple goroutines concurrently is safe, unless Client is mutat ed. | |
85 // Called from generated code. | |
86 // | |
87 // If there is a Deadline applied to the Context, it will be forwarded to the | |
88 // server using the HeaderTimeout heaer. | |
dnj (Google)
2016/01/27 02:42:54
typo will fix.
iannucci
2016/01/27 02:55:50
heaer typo
| |
89 func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, o ut proto.Message, opts ...grpc.CallOption) error { | |
iannucci
2016/01/27 02:55:49
this should probably just be CallOption: if we s/p
dnj (Google)
2016/01/27 03:24:26
I don't really mind this. The fall-through is nice
| |
90 options := c.renderOptions(opts) | |
91 | |
92 reqBody, err := proto.Marshal(in) | |
93 if err != nil { | |
94 return err | |
95 } | |
96 | |
97 req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), opt ions) | |
98 ctx = logging.SetFields(ctx, logging.Fields{ | |
99 "host": c.Host, | |
100 "service": serviceName, | |
101 "method": methodName, | |
102 }) | |
103 | |
104 // Send the request in a retry loop. | |
105 var buf bytes.Buffer | |
106 err = retry.Retry( | |
107 ctx, | |
108 retry.TransientOnly(options.Retry), | |
109 func() error { | |
110 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName, methodName) | |
111 | |
112 // If there is a deadline on our Context, set the timeou t header on the | |
113 // request. | |
114 if deadline, ok := ctx.Deadline(); ok { | |
115 delta := deadline.Sub(clock.Now(ctx)) | |
116 if delta <= 0 { | |
117 // The request has already expired. This will likely never happen, | |
118 // since the outer Retry loop will have expired, but there is a very | |
119 // slight possibility of a race. | |
120 return ctx.Err() | |
121 } | |
122 | |
123 req.Header.Set(HeaderTimeout, EncodeTimeout(delt a)) | |
124 } | |
125 | |
126 // Send the request. | |
127 req.Body = ioutil.NopCloser(bytes.NewReader(reqBody)) | |
128 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) | |
129 if err != nil { | |
130 return errors.WrapTransient(fmt.Errorf("failed t o send request: %s", err)) | |
131 } | |
132 defer res.Body.Close() | |
133 | |
134 if options.resHeaderMetadata != nil { | |
135 *options.resHeaderMetadata = metadata.MD(res.Hea der).Copy() | |
136 } | |
137 | |
138 // Read the response body. | |
139 buf.Reset() | |
140 var body io.Reader = res.Body | |
141 if res.ContentLength > 0 { | |
142 buf.Grow(int(res.ContentLength)) | |
iannucci
2016/01/27 02:55:49
hm... what if this number is enormous? Can we cap
dnj (Google)
2016/01/27 03:24:25
I've added a soft limit, default limits, and check
| |
143 body = io.LimitReader(body, res.ContentLength) | |
144 } | |
145 if _, err = buf.ReadFrom(body); err != nil { | |
146 return fmt.Errorf("failed to read response body: %s", err) | |
147 } | |
148 | |
149 if options.resTrailerMetadata != nil { | |
150 *options.resTrailerMetadata = metadata.MD(res.Tr ailer).Copy() | |
151 } | |
152 | |
153 codeHeader := res.Header.Get(HeaderGRPCCode) | |
154 if codeHeader == "" { | |
155 // Not a valid pRPC response. | |
156 body := buf.String() | |
157 bodySize := c.ErrBodySize | |
158 if bodySize <= 0 { | |
159 bodySize = 256 | |
160 } | |
161 if len(body) > bodySize { | |
162 body = body[:bodySize] + "..." | |
163 } | |
164 return fmt.Errorf("HTTP %d: no gRPC code. Body: %s", res.StatusCode, body) | |
iannucci
2016/01/27 02:55:49
what if it's not valid utf8? can we quote it?
dnj (Google)
2016/01/27 03:24:26
Done.
| |
165 } | |
166 | |
167 codeInt, err := strconv.Atoi(codeHeader) | |
168 if err != nil { | |
169 // Not a valid pRPC response. | |
170 return fmt.Errorf("invalid grpc code %q: %s", co deHeader, err) | |
171 } | |
172 | |
173 code := codes.Code(codeInt) | |
174 if code != codes.OK { | |
175 desc := strings.TrimSuffix(buf.String(), "\n") | |
176 err := grpcutil.Errf(code, "%s", desc) | |
177 if isTransientCode(code) { | |
178 err = errors.WrapTransient(err) | |
179 } | |
180 return err | |
181 } | |
182 | |
183 return proto.Unmarshal(buf.Bytes(), out) // non-transien t error | |
184 }, | |
185 func(err error, sleepTime time.Duration) { | |
186 logging.Fields{ | |
187 logging.ErrorKey: err, | |
188 "sleepTime": sleepTime, | |
189 }.Warningf(ctx, "RPC failed transiently. Will retry in % s", sleepTime) | |
190 }, | |
191 ) | |
192 | |
193 if err != nil { | |
194 logging.WithError(err).Warningf(ctx, "RPC failed permanently: %s ", err) | |
195 } | |
196 | |
197 // We have to unwrap gRPC errors because | |
198 // grpc.Code and grpc.ErrorDesc functions do not work with error wrapper s. | |
199 // https://github.com/grpc/grpc-go/issues/494 | |
200 return errors.UnwrapAll(err) | |
201 } | |
202 | |
203 // prepareRequest creates an HTTP request for an RPC, | |
204 // except it does not set the request body. | |
205 func prepareRequest(host, serviceName, methodName string, contentLength int, opt ions *Options) *http.Request { | |
206 if host == "" { | |
207 panic("Host is not set") | |
208 } | |
209 req := &http.Request{ | |
210 Method: "POST", | |
211 URL: &url.URL{ | |
212 Scheme: "https", | |
213 Host: host, | |
214 Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodNa me), | |
215 }, | |
216 Header: http.Header{}, | |
217 } | |
218 if options.Insecure { | |
219 req.URL.Scheme = "http" | |
220 } | |
221 | |
222 // Set headers. | |
223 const mediaType = "application/prpc" // binary | |
224 req.Header.Set("Content-Type", mediaType) | |
225 req.Header.Set("Accept", mediaType) | |
226 userAgent := options.UserAgent | |
227 if userAgent == "" { | |
228 userAgent = DefaultUserAgent | |
229 } | |
230 req.Header.Set("User-Agent", userAgent) | |
231 req.ContentLength = int64(contentLength) | |
232 req.Header.Set("Content-Length", strconv.Itoa(contentLength)) | |
233 // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it . | |
234 return req | |
235 } | |
236 | |
237 func isTransientCode(code codes.Code) bool { | |
238 switch code { | |
239 case codes.Internal, codes.Unknown, codes.Unavailable: | |
240 return true | |
241 | |
242 default: | |
243 return false | |
244 } | |
245 } | |
OLD | NEW |