Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(196)

Side by Side Diff: common/prpc/client.go

Issue 1605363002: common/prpc, tools/cmd/cproto: prpc client (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: addressed comments Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package prpc
6
7 import (
8 "bytes"
9 "fmt"
10 "io"
11 "io/ioutil"
12 "net/http"
13 "net/url"
14 "strconv"
15 "time"
16
17 "github.com/golang/protobuf/proto"
18 "golang.org/x/net/context"
19 "golang.org/x/net/context/ctxhttp"
20 "google.golang.org/grpc"
21 "google.golang.org/grpc/codes"
22 "google.golang.org/grpc/metadata"
23
24 "github.com/luci/luci-go/common/errors"
25 "github.com/luci/luci-go/common/logging"
26 "github.com/luci/luci-go/common/retry"
27 )
28
29 const (
30 // HeaderGrpcCode is a name of the HTTP header that specifies the
31 // gRPC code in the response. If not response does not specify it,
32 // the gRPC code is derived from HTTP status code.
33 HeaderGrpcCode = "X-Prpc-Grpc-Code"
dnj 2016/01/22 01:16:27 nit: HeaderGRPCCode?
nodir 2016/01/22 02:57:50 Done.
34 )
35
36 // DefaultUserAgent is default User-Agent HTTP header for pRPC requests.
37 var DefaultUserAgent = "pRPC Client 1.0"
38
39 // Client can make pRPC calls.
40 type Client struct {
41 C *http.Client // if nil, uses http.DefaultClient
42
43 // ErrBodySize is the number of bytes to read from a HTTP response
44 // with status >= 300 and include in the error.
45 // If non-positive, defaults to 256.
46 ErrBodySize int
47
48 Host string // host and optionally a port number of the target serv er.
49 Options *Options // if nil, DefaultOptions() are used.
50 }
51
52 // renderOptions copies client options and applies opts.
53 func (c *Client) renderOptions(opts []grpc.CallOption) *Options {
54 var options *Options
55 if c.Options != nil {
56 cpy := *c.Options
57 options = &cpy
58 } else {
59 options = DefaultOptions()
60 }
61 options.apply(opts)
62 return options
63 }
64
65 func (c *Client) getHTTPClient() *http.Client {
66 if c.C == nil {
67 return http.DefaultClient
68 }
69 return c.C
70 }
71
72 // Call makes an RPC.
73 // Retries on transient errors according to retry options.
74 // Logs HTTP errors.
75 //
76 // opts must be created by this package.
77 // Calling from multiple goroutines concurrently is safe, unless Client is mutat ed.
78 // Called from generated code.
79 func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, o ut proto.Message, opts ...grpc.CallOption) error {
80 options := c.renderOptions(opts)
81
82 reqBody, err := proto.Marshal(in)
83 if err != nil {
84 return err
85 }
86
87 req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), opt ions)
88 ctx = logging.SetFields(ctx, logging.Fields{
89 "Host": c.Host,
90 "Service": serviceName,
91 "Method": methodName,
92 })
93
94 // Send the request in a retry loop.
95 var iter retry.Iterator
96 if options.Retry != nil {
97 iter = retry.TransientOnly(options.Retry())
98 }
99 var buf bytes.Buffer
100 err = retry.Retry(
101 ctx,
102 iter,
103 func() error {
104 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName, methodName)
105
106 // Send the request.
107 req.Body = ioutil.NopCloser(bytes.NewReader(reqBody))
108 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req)
109 if err != nil {
110 return errors.WrapTransient(fmt.Errorf("failed t o send request: %s", err))
111 }
112 defer res.Body.Close()
113
114 if options.resHeaderMetadata != nil {
115 *options.resHeaderMetadata = metadata.MD(res.Hea der).Copy()
116 }
117
118 // Check response status code.
119 if res.StatusCode >= 300 {
120 bodySize := c.ErrBodySize
dnj 2016/01/22 01:16:27 Move this logic into responseErr and make response
nodir 2016/01/22 02:57:50 Done.
121 if bodySize <= 0 {
122 bodySize = 256
123 }
124 return responseErr(ctx, res, bodySize)
125 }
126
127 // Read the response body.
128 buf.Reset()
129 var body io.Reader = res.Body
130 if req.ContentLength > 0 {
131 buf.Grow(int(req.ContentLength))
dnj 2016/01/22 01:16:27 <res>.ContentLength** (and above)
nodir 2016/01/22 02:57:50 oops done
132 body = io.LimitReader(body, res.ContentLength)
133 }
134 if _, err = io.Copy(&buf, body); err != nil {
dnj 2016/01/22 01:16:27 io.Copy reads in fixed-size chunks. You should use
nodir 2016/01/22 02:57:50 Done.
135 return errors.WrapTransient(fmt.Errorf("failed t o read response body: %s", err))
136 }
137
138 // Unmarshal the response message.
139 if options.resTrailerMetadata != nil {
140 *options.resTrailerMetadata = metadata.MD(res.Tr ailer).Copy()
141 }
142
143 return proto.Unmarshal(buf.Bytes(), out) // non-transien t error
144 },
145 func(err error, sleepTime time.Duration) {
146 logging.Fields{
147 logging.ErrorKey: err,
148 "SleepTime": sleepTime,
149 }.Warningf(ctx, "RPC failed transiently. Will retry in % s", sleepTime)
dnj 2016/01/22 01:16:27 nit: Since you're emitting sleepTime as a Field, y
nodir 2016/01/22 02:57:50 I don't like how fields are printed and consider t
dnj (Google) 2016/01/22 04:06:23 The purpose of "fields" is to remove useful data f
150 },
151 )
152
153 if err != nil {
154 logging.WithError(err).Warningf(ctx, "RPC failed permanently: %s ", err)
155 }
156
157 // We have to unwrap gRPC errors because
158 // grpc.Code and grpc.ErrorDesc functions do not work with error wrapper s.
159 // https://github.com/grpc/grpc-go/issues/494
160 return errors.UnwrapAll(err)
161 }
162
163 // prepareRequest creates an HTTP request for an RPC,
164 // except it does not set the request body.
165 func prepareRequest(host, serviceName, methodName string, contentLength int, opt ions *Options) *http.Request {
166 if host == "" {
167 panic("Host is not set")
168 }
169 req := &http.Request{
170 Method: "POST",
171 URL: &url.URL{
172 Scheme: "https",
173 Host: host,
174 Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodNa me),
175 },
176 Header: http.Header{},
177 }
178 if options.Insecure {
179 req.URL.Scheme = "http"
180 }
181
182 // Set headers.
183 const mediaType = "application/prpc" // binary
184 req.Header.Set("Content-Type", mediaType)
185 req.Header.Set("Accept", mediaType)
186 userAgent := options.UserAgent
187 if userAgent == "" {
188 userAgent = DefaultUserAgent
189 }
190 req.Header.Set("User-Agent", userAgent)
191 req.ContentLength = int64(contentLength)
192 req.Header.Set("Content-Length", strconv.Itoa(contentLength))
193 // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it .
194 return req
195 }
196
197 // responseErr converts an HTTP response to a gRPC error.
198 // The returned error may be wrapped with errors.WrapTransient.
199 func responseErr(c context.Context, res *http.Response, bodySize int) error {
200 // Read first 256 bytes of the body. Leave empty if cannot read.
201 var body string
202 bodyBytes, err := ioutil.ReadAll(io.LimitReader(res.Body, int64(bodySize )))
203 if err == nil {
204 if len(bodyBytes) == bodySize {
205 bodyBytes = append(bodyBytes, []byte("...")...)
206 }
207 body = string(bodyBytes)
208 }
209
210 code := codes.Unknown
211 // Read explicit gRPC code.
212 if codeHeader := res.Header.Get(HeaderGrpcCode); codeHeader != "" {
213 if intCode, err := strconv.Atoi(codeHeader); err != nil {
214 logging.WithError(err).Warningf(c, "Could not parse %s h eader: %s", HeaderGrpcCode, err)
215 } else {
216 code = codes.Code(intCode)
217 }
218 } else {
219 // No explicit gRPC code provided, derive it from status code.
220 code = StatusCode(res.StatusCode)
221 }
222
223 // Return the error.
224 err = grpc.Errorf(code, "HTTP %s: %s", res.Status, body)
225 if isTransientStatus(res.StatusCode) {
226 err = errors.WrapTransient(err)
227 }
228 return err
229 }
230
231 // isTransientStatus returns true if an HTTP status code indicates a transient e rror.
232 func isTransientStatus(status int) bool {
233 return status >= 500 || status == http.StatusRequestTimeout
234 }
OLDNEW
« no previous file with comments | « common/logging/fields.go ('k') | common/prpc/client_test.go » ('j') | common/prpc/codes.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698