OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package prpc | |
6 | |
7 import ( | |
8 "bytes" | |
9 "fmt" | |
10 "io" | |
11 "io/ioutil" | |
12 "net/http" | |
13 "net/url" | |
14 "strconv" | |
15 "time" | |
16 | |
17 "github.com/golang/protobuf/proto" | |
18 "golang.org/x/net/context" | |
19 "golang.org/x/net/context/ctxhttp" | |
20 "google.golang.org/grpc" | |
21 "google.golang.org/grpc/codes" | |
22 "google.golang.org/grpc/metadata" | |
23 | |
24 "github.com/luci/luci-go/common/errors" | |
25 "github.com/luci/luci-go/common/logging" | |
26 "github.com/luci/luci-go/common/retry" | |
27 ) | |
28 | |
29 const ( | |
30 // HeaderGrpcCode is a name of the HTTP header that specifies the | |
31 // gRPC code in the response. If not response does not specify it, | |
32 // the gRPC code is derived from HTTP status code. | |
33 HeaderGrpcCode = "X-Prpc-Grpc-Code" | |
dnj
2016/01/22 01:16:27
nit: HeaderGRPCCode?
nodir
2016/01/22 02:57:50
Done.
| |
34 ) | |
35 | |
36 // DefaultUserAgent is default User-Agent HTTP header for pRPC requests. | |
37 var DefaultUserAgent = "pRPC Client 1.0" | |
38 | |
39 // Client can make pRPC calls. | |
40 type Client struct { | |
41 C *http.Client // if nil, uses http.DefaultClient | |
42 | |
43 // ErrBodySize is the number of bytes to read from a HTTP response | |
44 // with status >= 300 and include in the error. | |
45 // If non-positive, defaults to 256. | |
46 ErrBodySize int | |
47 | |
48 Host string // host and optionally a port number of the target serv er. | |
49 Options *Options // if nil, DefaultOptions() are used. | |
50 } | |
51 | |
52 // renderOptions copies client options and applies opts. | |
53 func (c *Client) renderOptions(opts []grpc.CallOption) *Options { | |
54 var options *Options | |
55 if c.Options != nil { | |
56 cpy := *c.Options | |
57 options = &cpy | |
58 } else { | |
59 options = DefaultOptions() | |
60 } | |
61 options.apply(opts) | |
62 return options | |
63 } | |
64 | |
65 func (c *Client) getHTTPClient() *http.Client { | |
66 if c.C == nil { | |
67 return http.DefaultClient | |
68 } | |
69 return c.C | |
70 } | |
71 | |
72 // Call makes an RPC. | |
73 // Retries on transient errors according to retry options. | |
74 // Logs HTTP errors. | |
75 // | |
76 // opts must be created by this package. | |
77 // Calling from multiple goroutines concurrently is safe, unless Client is mutat ed. | |
78 // Called from generated code. | |
79 func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, o ut proto.Message, opts ...grpc.CallOption) error { | |
80 options := c.renderOptions(opts) | |
81 | |
82 reqBody, err := proto.Marshal(in) | |
83 if err != nil { | |
84 return err | |
85 } | |
86 | |
87 req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), opt ions) | |
88 ctx = logging.SetFields(ctx, logging.Fields{ | |
89 "Host": c.Host, | |
90 "Service": serviceName, | |
91 "Method": methodName, | |
92 }) | |
93 | |
94 // Send the request in a retry loop. | |
95 var iter retry.Iterator | |
96 if options.Retry != nil { | |
97 iter = retry.TransientOnly(options.Retry()) | |
98 } | |
99 var buf bytes.Buffer | |
100 err = retry.Retry( | |
101 ctx, | |
102 iter, | |
103 func() error { | |
104 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName, methodName) | |
105 | |
106 // Send the request. | |
107 req.Body = ioutil.NopCloser(bytes.NewReader(reqBody)) | |
108 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) | |
109 if err != nil { | |
110 return errors.WrapTransient(fmt.Errorf("failed t o send request: %s", err)) | |
111 } | |
112 defer res.Body.Close() | |
113 | |
114 if options.resHeaderMetadata != nil { | |
115 *options.resHeaderMetadata = metadata.MD(res.Hea der).Copy() | |
116 } | |
117 | |
118 // Check response status code. | |
119 if res.StatusCode >= 300 { | |
120 bodySize := c.ErrBodySize | |
dnj
2016/01/22 01:16:27
Move this logic into responseErr and make response
nodir
2016/01/22 02:57:50
Done.
| |
121 if bodySize <= 0 { | |
122 bodySize = 256 | |
123 } | |
124 return responseErr(ctx, res, bodySize) | |
125 } | |
126 | |
127 // Read the response body. | |
128 buf.Reset() | |
129 var body io.Reader = res.Body | |
130 if req.ContentLength > 0 { | |
131 buf.Grow(int(req.ContentLength)) | |
dnj
2016/01/22 01:16:27
<res>.ContentLength** (and above)
nodir
2016/01/22 02:57:50
oops done
| |
132 body = io.LimitReader(body, res.ContentLength) | |
133 } | |
134 if _, err = io.Copy(&buf, body); err != nil { | |
dnj
2016/01/22 01:16:27
io.Copy reads in fixed-size chunks. You should use
nodir
2016/01/22 02:57:50
Done.
| |
135 return errors.WrapTransient(fmt.Errorf("failed t o read response body: %s", err)) | |
136 } | |
137 | |
138 // Unmarshal the response message. | |
139 if options.resTrailerMetadata != nil { | |
140 *options.resTrailerMetadata = metadata.MD(res.Tr ailer).Copy() | |
141 } | |
142 | |
143 return proto.Unmarshal(buf.Bytes(), out) // non-transien t error | |
144 }, | |
145 func(err error, sleepTime time.Duration) { | |
146 logging.Fields{ | |
147 logging.ErrorKey: err, | |
148 "SleepTime": sleepTime, | |
149 }.Warningf(ctx, "RPC failed transiently. Will retry in % s", sleepTime) | |
dnj
2016/01/22 01:16:27
nit: Since you're emitting sleepTime as a Field, y
nodir
2016/01/22 02:57:50
I don't like how fields are printed and consider t
dnj (Google)
2016/01/22 04:06:23
The purpose of "fields" is to remove useful data f
| |
150 }, | |
151 ) | |
152 | |
153 if err != nil { | |
154 logging.WithError(err).Warningf(ctx, "RPC failed permanently: %s ", err) | |
155 } | |
156 | |
157 // We have to unwrap gRPC errors because | |
158 // grpc.Code and grpc.ErrorDesc functions do not work with error wrapper s. | |
159 // https://github.com/grpc/grpc-go/issues/494 | |
160 return errors.UnwrapAll(err) | |
161 } | |
162 | |
163 // prepareRequest creates an HTTP request for an RPC, | |
164 // except it does not set the request body. | |
165 func prepareRequest(host, serviceName, methodName string, contentLength int, opt ions *Options) *http.Request { | |
166 if host == "" { | |
167 panic("Host is not set") | |
168 } | |
169 req := &http.Request{ | |
170 Method: "POST", | |
171 URL: &url.URL{ | |
172 Scheme: "https", | |
173 Host: host, | |
174 Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodNa me), | |
175 }, | |
176 Header: http.Header{}, | |
177 } | |
178 if options.Insecure { | |
179 req.URL.Scheme = "http" | |
180 } | |
181 | |
182 // Set headers. | |
183 const mediaType = "application/prpc" // binary | |
184 req.Header.Set("Content-Type", mediaType) | |
185 req.Header.Set("Accept", mediaType) | |
186 userAgent := options.UserAgent | |
187 if userAgent == "" { | |
188 userAgent = DefaultUserAgent | |
189 } | |
190 req.Header.Set("User-Agent", userAgent) | |
191 req.ContentLength = int64(contentLength) | |
192 req.Header.Set("Content-Length", strconv.Itoa(contentLength)) | |
193 // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it . | |
194 return req | |
195 } | |
196 | |
197 // responseErr converts an HTTP response to a gRPC error. | |
198 // The returned error may be wrapped with errors.WrapTransient. | |
199 func responseErr(c context.Context, res *http.Response, bodySize int) error { | |
200 // Read first 256 bytes of the body. Leave empty if cannot read. | |
201 var body string | |
202 bodyBytes, err := ioutil.ReadAll(io.LimitReader(res.Body, int64(bodySize ))) | |
203 if err == nil { | |
204 if len(bodyBytes) == bodySize { | |
205 bodyBytes = append(bodyBytes, []byte("...")...) | |
206 } | |
207 body = string(bodyBytes) | |
208 } | |
209 | |
210 code := codes.Unknown | |
211 // Read explicit gRPC code. | |
212 if codeHeader := res.Header.Get(HeaderGrpcCode); codeHeader != "" { | |
213 if intCode, err := strconv.Atoi(codeHeader); err != nil { | |
214 logging.WithError(err).Warningf(c, "Could not parse %s h eader: %s", HeaderGrpcCode, err) | |
215 } else { | |
216 code = codes.Code(intCode) | |
217 } | |
218 } else { | |
219 // No explicit gRPC code provided, derive it from status code. | |
220 code = StatusCode(res.StatusCode) | |
221 } | |
222 | |
223 // Return the error. | |
224 err = grpc.Errorf(code, "HTTP %s: %s", res.Status, body) | |
225 if isTransientStatus(res.StatusCode) { | |
226 err = errors.WrapTransient(err) | |
227 } | |
228 return err | |
229 } | |
230 | |
231 // isTransientStatus returns true if an HTTP status code indicates a transient e rror. | |
232 func isTransientStatus(status int) bool { | |
233 return status >= 500 || status == http.StatusRequestTimeout | |
234 } | |
OLD | NEW |