Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(80)

Side by Side Diff: common/prpc/client.go

Issue 1637193002: common/prpc, tools/cmd/cproto: prpc client (Closed) Base URL: https://github.com/luci/luci-go@prpc-server
Patch Set: Add Context-derived timeout for Client. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package prpc 5 package prpc
6 6
7 import (
8 "bytes"
9 "fmt"
10 "io"
11 "io/ioutil"
12 "net/http"
13 "net/url"
14 "strconv"
15 "strings"
16 "time"
17
18 "github.com/golang/protobuf/proto"
19 "golang.org/x/net/context"
20 "golang.org/x/net/context/ctxhttp"
21 "google.golang.org/grpc"
22 "google.golang.org/grpc/codes"
23 "google.golang.org/grpc/metadata"
24
25 "github.com/luci/luci-go/common/clock"
26 "github.com/luci/luci-go/common/errors"
27 "github.com/luci/luci-go/common/grpcutil"
28 "github.com/luci/luci-go/common/logging"
29 "github.com/luci/luci-go/common/retry"
30 )
31
7 const ( 32 const (
8 // HeaderGRPCCode is a name of the HTTP header that specifies the 33 // HeaderGRPCCode is a name of the HTTP header that specifies the
9 // gRPC code in the response. 34 // gRPC code in the response.
10 // A pRPC server must always specify it. 35 // A pRPC server must always specify it.
11 HeaderGRPCCode = "X-Prpc-Grpc-Code" 36 HeaderGRPCCode = "X-Prpc-Grpc-Code"
37
38 // HeaderTimeout is HTTP header used to set pRPC request timeout.
39 // The single value should match regexp `\d+[HMSmun]`.
40 HeaderTimeout = "X-Prpc-Grpc-Deadline"
nodir 2016/01/27 03:04:18 was accidentally renamed to deadline? fwiu, usual
dnj (Google) 2016/01/27 03:25:53 Done.
12 ) 41 )
42
43 // DefaultUserAgent is default User-Agent HTTP header for pRPC requests.
44 var DefaultUserAgent = "pRPC Client 1.0"
45
46 // Client can make pRPC calls.
47 type Client struct {
48 C *http.Client // if nil, uses http.DefaultClient
49
50 // ErrBodySize is the number of bytes to read from a HTTP response
51 // with error status and include in the error.
52 // If non-positive, defaults to 256.
53 ErrBodySize int
54
55 Host string // host and optionally a port number of the target serv er.
56 Options *Options // if nil, DefaultOptions() are used.
57 }
58
59 // renderOptions copies client options and applies opts.
60 func (c *Client) renderOptions(opts []grpc.CallOption) *Options {
61 var options *Options
62 if c.Options != nil {
63 cpy := *c.Options
64 options = &cpy
65 } else {
66 options = DefaultOptions()
67 }
68 options.apply(opts)
69 return options
70 }
71
72 func (c *Client) getHTTPClient() *http.Client {
73 if c.C == nil {
74 return http.DefaultClient
75 }
76 return c.C
77 }
78
79 // Call makes an RPC.
80 // Retries on transient errors according to retry options.
81 // Logs HTTP errors.
82 //
83 // opts must be created by this package.
84 // Calling from multiple goroutines concurrently is safe, unless Client is mutat ed.
85 // Called from generated code.
86 //
87 // If there is a Deadline applied to the Context, it will be forwarded to the
88 // server using the HeaderTimeout heaer.
dnj (Google) 2016/01/27 02:42:54 typo will fix.
iannucci 2016/01/27 02:55:50 heaer typo
89 func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, o ut proto.Message, opts ...grpc.CallOption) error {
iannucci 2016/01/27 02:55:49 this should probably just be CallOption: if we s/p
dnj (Google) 2016/01/27 03:24:26 I don't really mind this. The fall-through is nice
90 options := c.renderOptions(opts)
91
92 reqBody, err := proto.Marshal(in)
93 if err != nil {
94 return err
95 }
96
97 req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), opt ions)
98 ctx = logging.SetFields(ctx, logging.Fields{
99 "host": c.Host,
100 "service": serviceName,
101 "method": methodName,
102 })
103
104 // Send the request in a retry loop.
105 var buf bytes.Buffer
106 err = retry.Retry(
107 ctx,
108 retry.TransientOnly(options.Retry),
109 func() error {
110 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName, methodName)
111
112 // If there is a deadline on our Context, set the timeou t header on the
113 // request.
114 if deadline, ok := ctx.Deadline(); ok {
115 delta := deadline.Sub(clock.Now(ctx))
116 if delta <= 0 {
117 // The request has already expired. This will likely never happen,
118 // since the outer Retry loop will have expired, but there is a very
119 // slight possibility of a race.
120 return ctx.Err()
121 }
122
123 req.Header.Set(HeaderTimeout, EncodeTimeout(delt a))
124 }
125
126 // Send the request.
127 req.Body = ioutil.NopCloser(bytes.NewReader(reqBody))
128 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req)
129 if err != nil {
130 return errors.WrapTransient(fmt.Errorf("failed t o send request: %s", err))
131 }
132 defer res.Body.Close()
133
134 if options.resHeaderMetadata != nil {
135 *options.resHeaderMetadata = metadata.MD(res.Hea der).Copy()
136 }
137
138 // Read the response body.
139 buf.Reset()
140 var body io.Reader = res.Body
141 if res.ContentLength > 0 {
142 buf.Grow(int(res.ContentLength))
iannucci 2016/01/27 02:55:49 hm... what if this number is enormous? Can we cap
dnj (Google) 2016/01/27 03:24:25 I've added a soft limit, default limits, and check
143 body = io.LimitReader(body, res.ContentLength)
144 }
145 if _, err = buf.ReadFrom(body); err != nil {
146 return fmt.Errorf("failed to read response body: %s", err)
147 }
148
149 if options.resTrailerMetadata != nil {
150 *options.resTrailerMetadata = metadata.MD(res.Tr ailer).Copy()
151 }
152
153 codeHeader := res.Header.Get(HeaderGRPCCode)
154 if codeHeader == "" {
155 // Not a valid pRPC response.
156 body := buf.String()
157 bodySize := c.ErrBodySize
158 if bodySize <= 0 {
159 bodySize = 256
160 }
161 if len(body) > bodySize {
162 body = body[:bodySize] + "..."
163 }
164 return fmt.Errorf("HTTP %d: no gRPC code. Body: %s", res.StatusCode, body)
iannucci 2016/01/27 02:55:49 what if it's not valid utf8? can we quote it?
dnj (Google) 2016/01/27 03:24:26 Done.
165 }
166
167 codeInt, err := strconv.Atoi(codeHeader)
168 if err != nil {
169 // Not a valid pRPC response.
170 return fmt.Errorf("invalid grpc code %q: %s", co deHeader, err)
171 }
172
173 code := codes.Code(codeInt)
174 if code != codes.OK {
175 desc := strings.TrimSuffix(buf.String(), "\n")
176 err := grpcutil.Errf(code, "%s", desc)
177 if isTransientCode(code) {
178 err = errors.WrapTransient(err)
179 }
180 return err
181 }
182
183 return proto.Unmarshal(buf.Bytes(), out) // non-transien t error
184 },
185 func(err error, sleepTime time.Duration) {
186 logging.Fields{
187 logging.ErrorKey: err,
188 "sleepTime": sleepTime,
189 }.Warningf(ctx, "RPC failed transiently. Will retry in % s", sleepTime)
190 },
191 )
192
193 if err != nil {
194 logging.WithError(err).Warningf(ctx, "RPC failed permanently: %s ", err)
195 }
196
197 // We have to unwrap gRPC errors because
198 // grpc.Code and grpc.ErrorDesc functions do not work with error wrapper s.
199 // https://github.com/grpc/grpc-go/issues/494
200 return errors.UnwrapAll(err)
201 }
202
203 // prepareRequest creates an HTTP request for an RPC,
204 // except it does not set the request body.
205 func prepareRequest(host, serviceName, methodName string, contentLength int, opt ions *Options) *http.Request {
206 if host == "" {
207 panic("Host is not set")
208 }
209 req := &http.Request{
210 Method: "POST",
211 URL: &url.URL{
212 Scheme: "https",
213 Host: host,
214 Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodNa me),
215 },
216 Header: http.Header{},
217 }
218 if options.Insecure {
219 req.URL.Scheme = "http"
220 }
221
222 // Set headers.
223 const mediaType = "application/prpc" // binary
224 req.Header.Set("Content-Type", mediaType)
225 req.Header.Set("Accept", mediaType)
226 userAgent := options.UserAgent
227 if userAgent == "" {
228 userAgent = DefaultUserAgent
229 }
230 req.Header.Set("User-Agent", userAgent)
231 req.ContentLength = int64(contentLength)
232 req.Header.Set("Content-Length", strconv.Itoa(contentLength))
233 // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it .
234 return req
235 }
236
237 func isTransientCode(code codes.Code) bool {
238 switch code {
239 case codes.Internal, codes.Unknown, codes.Unavailable:
240 return true
241
242 default:
243 return false
244 }
245 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698