Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(291)

Side by Side Diff: common/prpc/client.go

Issue 1637193002: common/prpc, tools/cmd/cproto: prpc client (Closed) Base URL: https://github.com/luci/luci-go@prpc-server
Patch Set: Add some content length tests. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/logging/fields.go ('k') | common/prpc/client_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package prpc 5 package prpc
6 6
7 import (
8 "bytes"
9 "fmt"
10 "io"
11 "io/ioutil"
12 "net/http"
13 "net/url"
14 "strconv"
15 "strings"
16 "time"
17
18 "github.com/golang/protobuf/proto"
19 "golang.org/x/net/context"
20 "golang.org/x/net/context/ctxhttp"
21 "google.golang.org/grpc"
22 "google.golang.org/grpc/codes"
23 "google.golang.org/grpc/metadata"
24
25 "github.com/luci/luci-go/common/clock"
26 "github.com/luci/luci-go/common/errors"
27 "github.com/luci/luci-go/common/grpcutil"
28 "github.com/luci/luci-go/common/logging"
29 "github.com/luci/luci-go/common/retry"
30 )
31
7 const ( 32 const (
8 // HeaderGRPCCode is a name of the HTTP header that specifies the 33 // HeaderGRPCCode is a name of the HTTP header that specifies the
9 // gRPC code in the response. 34 // gRPC code in the response.
10 // A pRPC server must always specify it. 35 // A pRPC server must always specify it.
11 HeaderGRPCCode = "X-Prpc-Grpc-Code" 36 HeaderGRPCCode = "X-Prpc-Grpc-Code"
37
38 // HeaderTimeout is HTTP header used to set pRPC request timeout.
39 // The single value should match regexp `\d+[HMSmun]`.
40 HeaderTimeout = "X-Prpc-Grpc-Timeout"
41
42 // DefaultMaxContentLength is the default maximum content length (in byt es)
43 // for a Client. It is 32MiB.
44 DefaultMaxContentLength = 32 * 1024 * 1024
12 ) 45 )
46
47 var (
48 // DefaultUserAgent is default User-Agent HTTP header for pRPC requests.
49 DefaultUserAgent = "pRPC Client 1.0"
50
51 // ErrResponseTooBig is returned by Call when the Response's body size e xceeds
52 // the Client's soft limit, MaxContentLength.
53 ErrResponseTooBig = errors.New("response too big")
54 )
55
56 // Client can make pRPC calls.
57 type Client struct {
58 C *http.Client // if nil, uses http.DefaultClient
59
60 // ErrBodySize is the number of bytes to read from a HTTP response
61 // with error status and include in the error.
62 // If non-positive, defaults to 256.
63 ErrBodySize int
64
65 // MaxContentLength, if > 0, is the maximum content length, in bytes, th at a
66 // pRPC is willing to read from the server. If a larger content length i s
67 // present in the response, ErrResponseTooBig will be returned.
68 //
69 // If <= 0, DefaultMaxContentLength will be used.
70 MaxContentLength int
71
72 Host string // host and optionally a port number of the target serv er.
73 Options *Options // if nil, DefaultOptions() are used.
74 }
75
76 // renderOptions copies client options and applies opts.
77 func (c *Client) renderOptions(opts []grpc.CallOption) (*Options, error) {
78 var options *Options
79 if c.Options != nil {
80 cpy := *c.Options
81 options = &cpy
82 } else {
83 options = DefaultOptions()
84 }
85 if err := options.apply(opts); err != nil {
86 return nil, err
87 }
88 return options, nil
89 }
90
91 func (c *Client) getHTTPClient() *http.Client {
92 if c.C == nil {
93 return http.DefaultClient
94 }
95 return c.C
96 }
97
98 // Call makes an RPC.
99 // Retries on transient errors according to retry options.
100 // Logs HTTP errors.
101 //
102 // opts must be created by this package.
103 // Calling from multiple goroutines concurrently is safe, unless Client is mutat ed.
104 // Called from generated code.
105 //
106 // If there is a Deadline applied to the Context, it will be forwarded to the
107 // server using the HeaderTimeout haeder.
108 func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, o ut proto.Message, opts ...grpc.CallOption) error {
109 options, err := c.renderOptions(opts)
110 if err != nil {
111 return err
112 }
113
114 reqBody, err := proto.Marshal(in)
115 if err != nil {
116 return err
117 }
118
119 req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), opt ions)
120 ctx = logging.SetFields(ctx, logging.Fields{
121 "host": c.Host,
122 "service": serviceName,
123 "method": methodName,
124 })
125
126 // Send the request in a retry loop.
127 var buf bytes.Buffer
128 err = retry.Retry(
129 ctx,
130 retry.TransientOnly(options.Retry),
131 func() error {
132 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName, methodName)
133
134 // If there is a deadline on our Context, set the timeou t header on the
135 // request.
136 if deadline, ok := ctx.Deadline(); ok {
137 delta := deadline.Sub(clock.Now(ctx))
138 if delta <= 0 {
139 // The request has already expired. This will likely never happen,
140 // since the outer Retry loop will have expired, but there is a very
141 // slight possibility of a race.
142 return ctx.Err()
143 }
144
145 req.Header.Set(HeaderTimeout, EncodeTimeout(delt a))
146 }
147
148 // Send the request.
149 req.Body = ioutil.NopCloser(bytes.NewReader(reqBody))
150 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req)
151 if err != nil {
152 return errors.WrapTransient(fmt.Errorf("failed t o send request: %s", err))
153 }
154 defer res.Body.Close()
155
156 if options.resHeaderMetadata != nil {
157 *options.resHeaderMetadata = metadata.MD(res.Hea der).Copy()
158 }
159
160 // Read the response body.
161 buf.Reset()
162 var body io.Reader = res.Body
163
164 limit := c.MaxContentLength
165 if limit <= 0 {
166 limit = DefaultMaxContentLength
167 }
168 if l := res.ContentLength; l > 0 {
169 if l > int64(limit) {
170 logging.Fields{
171 "contentLength": l,
172 "limit": limit,
173 }.Errorf(ctx, "ContentLength header exce eds soft response body limit.")
174 return ErrResponseTooBig
175 }
176 limit = int(l)
177 buf.Grow(limit)
178 }
179 body = io.LimitReader(body, int64(limit))
180 if _, err = buf.ReadFrom(body); err != nil {
181 return fmt.Errorf("failed to read response body: %s", err)
182 }
183
184 // If there is more data in the body Reader, it means th at the response
185 // size has exceeded our limit.
186 var probeB [1]byte
187 if amt, err := body.Read(probeB[:]); amt > 0 || err != i o.EOF {
188 logging.Fields{
189 "limit": limit,
190 }.Errorf(ctx, "Soft response body limit exceeded .")
191 return ErrResponseTooBig
192 }
193
194 if options.resTrailerMetadata != nil {
195 *options.resTrailerMetadata = metadata.MD(res.Tr ailer).Copy()
196 }
197
198 codeHeader := res.Header.Get(HeaderGRPCCode)
199 if codeHeader == "" {
200 // Not a valid pRPC response.
201 body := buf.String()
202 bodySize := c.ErrBodySize
203 if bodySize <= 0 {
204 bodySize = 256
205 }
206 if len(body) > bodySize {
207 body = body[:bodySize] + "..."
208 }
209 return fmt.Errorf("HTTP %d: no gRPC code. Body: %q", res.StatusCode, body)
210 }
211
212 codeInt, err := strconv.Atoi(codeHeader)
213 if err != nil {
214 // Not a valid pRPC response.
215 return fmt.Errorf("invalid grpc code %q: %s", co deHeader, err)
216 }
217
218 code := codes.Code(codeInt)
219 if code != codes.OK {
220 desc := strings.TrimSuffix(buf.String(), "\n")
221 err := grpcutil.Errf(code, "%s", desc)
222 if isTransientCode(code) {
223 err = errors.WrapTransient(err)
224 }
225 return err
226 }
227
228 return proto.Unmarshal(buf.Bytes(), out) // non-transien t error
229 },
230 func(err error, sleepTime time.Duration) {
231 logging.Fields{
232 logging.ErrorKey: err,
233 "sleepTime": sleepTime,
234 }.Warningf(ctx, "RPC failed transiently. Will retry in % s", sleepTime)
235 },
236 )
237
238 if err != nil {
239 logging.WithError(err).Warningf(ctx, "RPC failed permanently: %s ", err)
240 }
241
242 // We have to unwrap gRPC errors because
243 // grpc.Code and grpc.ErrorDesc functions do not work with error wrapper s.
244 // https://github.com/grpc/grpc-go/issues/494
245 return errors.UnwrapAll(err)
246 }
247
248 // prepareRequest creates an HTTP request for an RPC,
249 // except it does not set the request body.
250 func prepareRequest(host, serviceName, methodName string, contentLength int, opt ions *Options) *http.Request {
251 if host == "" {
252 panic("Host is not set")
253 }
254 req := &http.Request{
255 Method: "POST",
256 URL: &url.URL{
257 Scheme: "https",
258 Host: host,
259 Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodNa me),
260 },
261 Header: http.Header{},
262 }
263 if options.Insecure {
264 req.URL.Scheme = "http"
265 }
266
267 // Set headers.
268 const mediaType = "application/prpc" // binary
269 req.Header.Set("Content-Type", mediaType)
270 req.Header.Set("Accept", mediaType)
271 userAgent := options.UserAgent
272 if userAgent == "" {
273 userAgent = DefaultUserAgent
274 }
275 req.Header.Set("User-Agent", userAgent)
276 req.ContentLength = int64(contentLength)
277 req.Header.Set("Content-Length", strconv.Itoa(contentLength))
278 // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it .
279 return req
280 }
281
282 func isTransientCode(code codes.Code) bool {
283 switch code {
284 case codes.Internal, codes.Unknown, codes.Unavailable:
285 return true
286
287 default:
288 return false
289 }
290 }
OLDNEW
« no previous file with comments | « common/logging/fields.go ('k') | common/prpc/client_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698