Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Side by Side Diff: client/utils/grpc_proxy.py

Issue 2987333002: Refactor all gRPC proxy code into a single class. (Closed)
Patch Set: Fix pylint errors Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/isolate_storage.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2017 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file.
4
5 """Common gRPC implementation for Swarming and Isolate"""
6
7 import logging
8 import os
9 import re
10 import time
11 import urlparse
12 from utils import net
13
14 # gRPC may not be installed on the worker machine. This is fine, as long as
15 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
16 # Full external requirements are: grpcio, certifi.
17 try:
18 import grpc
19 from google import auth as google_auth
20 from google.auth.transport import grpc as google_auth_transport_grpc
21 from google.auth.transport import requests as google_auth_transport_requests
22 except ImportError as err:
23 grpc = None
24
25
26 # If gRPC was successfully imported, try to import certifi as well. This is not
27 # actually used anywhere in this module, but if certifi is missing,
28 # google.auth.transport will fail (see
29 # https://stackoverflow.com/questions/24973326). So checking it here allows us
30 # to print out a somewhat-sane error message.
31 certifi = None
32 if grpc is not None:
33 try:
34 import certifi
35 except ImportError:
36 # Will print out error messages later (ie when we have a logger)
37 pass
38
39
40 # How many times to retry a gRPC call
41 MAX_GRPC_ATTEMPTS = 30
42
43
44 # Longest time to sleep between gRPC calls
45 MAX_GRPC_SLEEP = 10.
46
47
48 # Start the timeout at three minutes.
49 GRPC_TIMEOUT_SEC = 3 * 60
50
51
52 def available():
53 """Returns true if gRPC can be used on this host."""
54 return grpc != None
55
56
57 class Proxy(object):
58 """Represents a gRPC proxy.
59
60 If the proxy begins with 'https', the returned channel will be secure and
61 authorized using default application credentials - see
62 https://developers.google.com/identity/protocols/application-default-credentia ls.
63 Currently, we're using Cloud Container Builder scopes for testing; this may
64 change in the future to allow different scopes to be passed in for different
65 channels.
66
67 To use the returned channel to call methods directly, say:
68
69 proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix',
70 myapi_pb2.MyApiStub)
71
72 To make a unary call with retries (recommended):
73
74 proto_output = proxy.call_unary('MyMethod', proto_input)
75
76 To make a unary call without retries, or to pass in a client side stream
77 (proto_input can be an iterator here):
78
79 proto_output = proxy.call_no_retries('MyMethod', proto_input)
80
81 You can also call the stub directly (not recommended, since no errors will be
82 caught or logged):
83
84 proto_output = proxy.stub.MyMethod(proto_input)
85
86 To make a call to a server-side streaming call (these are not retried):
87
88 for response in proxy.get_stream('MyStreaminingMethod', proto_input):
89 <process response>
90
91 To retrieve the prefix:
92
93 prefix = proxy.prefix # returns "prefix/for/resource/names"
94
95 All exceptions are logged using "logging.warning."
96 """
97
98 def __init__(self, proxy, stub_class):
99 self._verbose = os.environ.get('LUCI_GRPC_PROXY_VERBOSE')
100 if self._verbose:
101 logging.info('Enabled verbose mode for %s with stub %s',
102 proxy, stub_class.__name__)
103 # NB: everything in url is unicode; convert to strings where
104 # needed.
105 url = urlparse.urlparse(proxy)
106 if self._verbose:
107 logging.info('Parsed URL for proxy is %r', url)
108 if url.scheme == 'http':
109 self._secure = False
110 elif url.scheme == 'https':
111 self._secure = True
112 else:
113 raise ValueError('gRPC proxy %s must use http[s], not %s' % (
114 proxy, url.scheme))
115 if url.netloc == '':
116 raise ValueError('gRPC proxy is missing hostname: %s' % proxy)
117 self._host = url.netloc
118 self._prefix = url.path
119 if self._prefix.endswith('/'):
120 self._prefix = self._prefix[:-1]
121 if self._prefix.startswith('/'):
122 self._prefix = self._prefix[1:]
123 if url.params != '' or url.fragment != '':
124 raise ValueError('gRPC proxy may not contain params or fragments: %s' %
125 proxy)
126 self._debug_info = ['full proxy name: ' + proxy]
127 self._channel = self._create_channel()
128 self._stub = stub_class(self._channel)
129 logging.info('%s: initialized', self.name)
130 if self._verbose:
131 self._dump_proxy_info()
132
133 @property
134 def prefix(self):
135 return self._prefix
136
137 @property
138 def channel(self):
139 return self._channel
140
141 @property
142 def stub(self):
143 return self._stub
144
145 @property
146 def name(self):
147 security = 'insecure'
148 if self._secure:
149 security = 'secure'
150 return 'gRPC %s proxy %s/%s' % (
151 security, self._host, self._stub.__class__.__name__)
152
153 def call_unary(self, name, request):
154 """Calls a method, waiting if the service is not available.
155
156 Usage: proto_output = proxy.call_unary('MyMethod', proto_input)
157 """
158 for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
159 try:
160 return self.call_no_retries(name, request)
161 except grpc.RpcError as g:
162 if g.code() is not grpc.StatusCode.UNAVAILABLE:
163 raise
164 logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)',
165 self.name, attempt, MAX_GRPC_ATTEMPTS)
166 # Save the error in case we need to return it
167 grpc_error = g
168 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
169 # If we get here, it must be because we got (and saved) an error
170 assert grpc_error is not None
171 raise grpc_error
172
173 def get_stream(self, name, request):
174 """Calls a server-side streaming method, returning an iterator.
175
176 Usage: for resp in proxy.get_stream('MyMethod', proto_input'):
177 """
178 stream = self.call_no_retries(name, request)
179 while True:
180 # The lambda "next(stream, 1)" will return a protobuf on success, or the
181 # integer 1 if the stream has ended. This allows us to avoid attempting
182 # to catch StopIteration, which gets logged by _wrap_grpc_operation.
183 response = self._wrap_grpc_operation(name + ' pull from stream',
184 lambda: next(stream, 1))
185 if isinstance(response, int):
186 # Iteration is finished
187 return
188 yield response
189
190 def call_no_retries(self, name, request):
191 """Calls a method without any retries.
192
193 Recommended for client-side streaming or nonidempotent unary calls.
194 """
195 method = getattr(self._stub, name)
196 if method is None:
197 raise NameError('%s: "%s" is not a valid method name', self.name, name)
198 return self._wrap_grpc_operation(
199 name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC))
200
201 def _wrap_grpc_operation(self, name, fn):
202 """Wraps a gRPC operation (call or iterator increment) for logging."""
203 if self._verbose:
204 logging.info('%s/%s - starting gRPC operation', self.name, name)
205 try:
206 return fn()
207 except grpc.RpcError as g:
208 logging.warning('\n\nFailure in %s/%s: gRPC error %s', self.name, name, g)
209 self._dump_proxy_info()
210 raise g
211 except Exception as e:
212 logging.warning('\n\nFailure in %s/%s: exception %s', self.name, name, e)
213 self._dump_proxy_info()
214 raise e
215
216 def _dump_proxy_info(self):
217 logging.warning('DETAILED PROXY INFO')
218 logging.warning('prefix = %s', self.prefix)
219 logging.warning('debug info:\n\t%s\n\n',
220 '\n\t'.join(self._debug_info))
221
222 def _create_channel(self):
223 # Make sure grpc was successfully imported
224 assert available()
225
226 if not self._secure:
227 return grpc.insecure_channel(self._host)
228
229 # Authenticate the host.
230 #
231 # You're allowed to override the root certs and server if necessary. For
232 # example, if you're running your proxy on localhost, you'll need to set
233 # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate
234 # for the root CA that your localhost server has used to certify itself, and
235 # the GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to
236 # identify itself. For example, the ROOTS env var might be
237 # "/path/to/roots.crt" while the OVERRIDE env var might be "test_server," if
238 # this is what's used by the server you're running.
239 #
240 # If you're connecting to a real server with real SSL, none of this should
241 # be used.
242 if not certifi:
243 self._debug_info.append('CERTIFI IS NOT PRESENT;' +
244 ' gRPC HTTPS CONNECTIONS MAY FAIL')
245 root_certs = None
246 roots = os.environ.get('LUCI_GRPC_PROXY_TLS_ROOTS')
247 if roots:
248 self._debug_info.append('Overridden root CA: %s' % roots)
249 with open(roots) as f:
250 root_certs = f.read()
251 else:
252 self._debug_info.append('Using default root CAs from certifi')
253 overd = os.environ.get('LUCI_GRPC_PROXY_TLS_OVERRIDE')
254 options = ()
255 if overd:
256 options=(('grpc.ssl_target_name_override', overd),)
257 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
258
259 # Authenticate the user.
260 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
261 self._debug_info.append('Scopes are: %r' % scopes)
262 user_creds, _ = google_auth.default(scopes=scopes)
263
264 # Create the channel.
265 request = google_auth_transport_requests.Request()
266 self._debug_info.append('Options are: %r' % options)
267 return google_auth_transport_grpc.secure_authorized_channel(
268 user_creds, request, self._host, ssl_creds, options=options)
OLDNEW
« no previous file with comments | « client/isolate_storage.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698