Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(44)

Side by Side Diff: client/utils/grpc_proxy.py

Issue 2987333002: Refactor all gRPC proxy code into a single class. (Closed)
Patch Set: Response to review Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2017 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file.
4
5 """Common gRPC implementation for Swarming and Isolate"""
6
7 import logging
8 import os
9 import re
10 import urlparse
11 from utils import net
12
13 # gRPC may not be installed on the worker machine. This is fine, as long as
14 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
15 # Full external requirements are: grpcio, certifi.
16 try:
17 import grpc
18 from google import auth as google_auth
19 from google.auth.transport import grpc as google_auth_transport_grpc
20 from google.auth.transport import requests as google_auth_transport_requests
21 except ImportError as err:
22 grpc = None
23
24
25 # If gRPC was successfully imported, try to import certifi as well. This is not
26 # actually used anywhere in this module, but if certifi is missing,
27 # google.auth.transport will fail (see
28 # https://stackoverflow.com/questions/24973326). So checking it here allows us
29 # to print out a somewhat-sane error message.
30 certifi = None
31 if grpc is not None:
32 try:
33 import certifi
34 except ImportError:
35 # Will print out error messages later (ie when we have a logger)
36 pass
37
38
39 # How many times to retry a gRPC call
40 MAX_GRPC_ATTEMPTS = 30
41
42
43 # Longest time to sleep between gRPC calls
44 MAX_GRPC_SLEEP = 10.
45
46
47 # Start the timeout at three minutes.
48 GRPC_TIMEOUT_SEC = 3 * 60
49
50
51 def available():
52 """Returns true if gRPC can be used on this host."""
53 return grpc != None
54
55
56 class Proxy(object):
57 """Represents a gRPC proxy.
58
59 If the proxy begins with 'https', the returned channel will be secure and
60 authorized using default application credentials - see
61 https://developers.google.com/identity/protocols/application-default-credentia ls.
62 Currently, we're using Cloud Container Builder scopes for testing; this may
63 change in the future to allow different scopes to be passed in for different
64 channels.
65
66 To use the returned channel to call methods directly, say:
67
68 proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix',
69 myapi_pb2.MyApiStub)
70
71 To make a unary call with retries (recommended):
72
73 proto_output = proxy.call_unary('MyMethod', proto_input)
74
75 To make a unary call without retries, or to pass in a client side stream
76 (proto_input can be an iterator here):
77
78 proto_output = proxy.call_no_retries('MyMethod', proto_input)
79
80 You can also call the stub directly (not recommended, since no errors will be
81 caught or logged):
82
83 proto_output = proxy.stub.MyMethod(proto_input)
84
85 To make a call to a server-side streaming call (these are not retried):
86
87 for response in proxy.get_stream('MyStreaminingMethod', proto_input):
88 <process response>
89
90 To retrieve the prefix:
91
92 prefix = proxy.prefix # returns "prefix/for/resource/names"
93
94 All exceptions are logged using "logging.warning."
95 """
96
97 def __init__(self, proxy, stub_class):
98 url = urlparse.urlparse(proxy)
99 if url.scheme is 'http':
100 self._secure = False
101 elif url.scheme is 'https':
102 self._secure = True
103 else:
104 raise ValueError('gRPC proxy must use http or https: %s' % proxy)
105 if url.netloc is '':
106 raise ValueError('gRPC proxy is missing hostname: %s' % proxy)
107 self._host = url.netloc
108 self._prefix = url.path
109 if not self._prefix.endswith('/'):
110 self._prefix += '/'
111 if url.params is not '' or url.fragment is not '':
112 raise ValueError('gRPC proxy may not contain params or fragments: %s' %
113 proxy)
114 self._debug_info = ['full proxy name: ' + proxy]
M-A Ruel 2017/08/04 02:05:37 instead of _debug_info, you can create a local log
aludwin 2017/08/04 14:53:10 The idea here is to *save* these bits of informati
115 self._channel = self._create_channel()
116 self._stub = stub_class(self._channel)
117 logging.info('%s: initialized', self.name)
118
119 @property
M-A Ruel 2017/08/04 02:05:37 Most of these are not needed, you can add only onc
aludwin 2017/08/04 14:53:10 Done.
120 def host(self):
121 return self._host
122
123 @property
124 def prefix(self):
125 return self._prefix
126
127 @property
128 def secure(self):
129 return self._secure
130
131 @property
132 def channel(self):
133 return self._channel
134
135 @property
136 def stub(self):
137 return self._stub
138
139 @property
140 def name(self):
141 security = 'insecure'
142 if self._secure:
143 security = 'secure'
144 return 'gRPC %s proxy %s/%s' % (
145 security, self.host, self._stub.__class__.__name__)
146
147 def call_unary(self, name, request):
148 """Calls a method, waiting if the service is not available.
149
150 Usage: proto_output = proxy.call_unary('MyMethod', proto_input)
151 """
152 grpc_error = None
153 for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
154 try:
155 return self.call_no_retries(name, request)
156 except grpc.RpcError as g:
157 if g.code() is not grpc.StatusCode.UNAVAILABLE:
158 raise
159 logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)',
160 self.name, attempt, MAX_GRPC_ATTEMPTS)
161 # Save the error in case we need to return it
162 grpc_error = g
163 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
164 # If we get here, it must be because we got (and saved) an error
165 assert grpc_error is not None
166 raise grpc_error
167
168 def get_stream(self, name, request):
169 """Calls a server-side streaming method, returning an iterator.
170
171 Usage: for resp in proxy.get_stream('MyMethod', proto_input'):
172 """
173 def wrap_grpc_iterator(stream, name):
174 while True:
175 # The lambda "next(stream, 1)" will return a protobuf on success, or the
176 # integer 1 if the stream has ended. This allows us to avoid attempting
177 # to catch StopIteration, which gets logged by _wrap_grpc_operation.
178 response = self._wrap_grpc_operation(name + ' pull from stream',
179 lambda: next(stream, 1))
180 if isinstance(response, int):
181 # Iteration is finished
182 return
183 yield response
184 stream = self.call_no_retries(name, request)
185 return wrap_grpc_iterator(stream, name)
M-A Ruel 2017/08/04 02:05:37 There's still no value in wrap_grpc_iterator, you
aludwin 2017/08/04 14:53:10 Oh, right :P Thanks!
186
187 def call_no_retries(self, name, request):
188 """Calls a method without any retries.
189
190 Recommended for client-side streaming or nonidempotent unary calls.
191 """
192 method = getattr(self._stub, name)
193 if method is None:
194 raise NameError('%s: "%s" is not a valid method name', self.name, name)
195 return self._wrap_grpc_operation(
196 name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC))
197
198 def _wrap_grpc_operation(self, name, fn):
199 """Wraps a gRPC operation (call or iterator increment) for logging."""
200 try:
201 return fn()
202 except grpc.RpcError as g:
203 logging.warning('%s/%s - gRPC error: %s', self.name, name, g)
204 self._dump_proxy_info()
205 raise g
206 except Exception as e:
207 logging.warning('caught exception %s; none is %s', repr(e), e is None)
208 logging.warning('%s/%s - generic error: %s', self.name, name, e)
209 self._dump_proxy_info()
210 raise e
211
212 def _dump_proxy_info(self):
213 logging.warning('%s: DETAILED PROXY INFO', self.name)
214 logging.warning('%s: prefix = %s', self.name, self.prefix)
215 logging.warning('%s: debug info:\n\t%s', self.name,
216 '\n\t'.join(self._debug_info))
217
218 def _create_channel(self):
219 # Make sure grpc was successfully imported
220 assert available()
221
222 if not self._secure:
223 return grpc.insecure_channel(self._host)
224
225 # Authenticate the host.
226 #
227 # You're allowed to override the root certs and server if necessary. For
228 # example, if you're running your proxy on localhost, you'll need to set
229 # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate fo r
M-A Ruel 2017/08/04 02:05:37 wrap all
aludwin 2017/08/04 14:53:10 Done.
230 # the root CA that your localhost server has used to certify itself, and the
231 # GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to identify
232 # itself. For example, the ROOTS env var might be "/path/to/roots.crt" while
233 # the OVERRIDE env var might be "test_server," if this is what's used by the
234 # server you're running.
235 #
236 # If you're connecting to a real server with real SSL, none of this should b e
237 # used.
238 if not certifi:
239 self._debug_info.append('CERTIFI IS NOT PRESENT; gRPC HTTPS CONNECTIONS MA Y FAIL')
240 root_certs = None
241 roots = os.environ.get('GRPC_PROXY_TLS_ROOTS')
242 if roots is not None:
M-A Ruel 2017/08/04 02:05:37 if roots:
aludwin 2017/08/04 14:53:10 Done.
243 self._debug_info.append('Overridden root CA: %s' % roots)
244 with open(roots) as f:
245 root_certs = f.read()
246 else:
247 self._debug_info.append('Using default root CAs from certifi')
248 overd = os.environ.get('GRPC_PROXY_TLS_OVERRIDE')
249 options = ()
250 if overd is not None:
M-A Ruel 2017/08/04 02:05:37 if overd:
aludwin 2017/08/04 14:53:10 Done.
251 options=(('grpc.ssl_target_name_override', overd),)
252 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
253
254 # Authenticate the user.
255 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
256 self._debug_info.append('Scopes are: %s' % repr(scopes))
M-A Ruel 2017/08/04 02:05:37 %r instead of repr
aludwin 2017/08/04 14:53:10 Done.
257 user_creds, _ = google_auth.default(scopes=scopes)
258
259 # Create the channel.
260 request = google_auth_transport_requests.Request()
261 self._debug_info.append('Options are: %s' % repr(options))
262 return google_auth_transport_grpc.secure_authorized_channel(
263 user_creds, request, self._host, ssl_creds, options=options)
OLDNEW
« appengine/swarming/doc/Magic-Values.md ('K') | « client/isolate_storage.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698