OLD | NEW |
---|---|
(Empty) | |
1 # Copyright 2017 The LUCI Authors. All rights reserved. | |
2 # Use of this source code is governed under the Apache License, Version 2.0 | |
3 # that can be found in the LICENSE file. | |
4 | |
5 """Common gRPC implementation for Swarming and Isolate""" | |
6 | |
7 import logging | |
8 import os | |
9 import re | |
10 import urlparse | |
11 from utils import net | |
12 | |
13 # gRPC may not be installed on the worker machine. This is fine, as long as | |
14 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). | |
15 # Full external requirements are: grpcio, certifi. | |
16 try: | |
17 import grpc | |
18 from google import auth as google_auth | |
19 from google.auth.transport import grpc as google_auth_transport_grpc | |
20 from google.auth.transport import requests as google_auth_transport_requests | |
21 except ImportError as err: | |
22 grpc = None | |
23 | |
24 | |
25 # If gRPC was successfully imported, try to import certifi as well. This is not | |
26 # actually used anywhere in this module, but if certifi is missing, | |
27 # google.auth.transport will fail (see | |
28 # https://stackoverflow.com/questions/24973326). So checking it here allows us | |
29 # to print out a somewhat-sane error message. | |
30 certifi = None | |
31 if grpc is not None: | |
32 try: | |
33 import certifi | |
34 except ImportError: | |
35 # Will print out error messages later (ie when we have a logger) | |
36 pass | |
37 | |
38 | |
39 # How many times to retry a gRPC call | |
40 MAX_GRPC_ATTEMPTS = 30 | |
41 | |
42 | |
43 # Longest time to sleep between gRPC calls | |
44 MAX_GRPC_SLEEP = 10. | |
45 | |
46 | |
47 # Start the timeout at three minutes. | |
48 GRPC_TIMEOUT_SEC = 3 * 60 | |
49 | |
50 | |
51 def available(): | |
52 """Returns true if gRPC can be used on this host.""" | |
53 return grpc != None | |
54 | |
55 | |
56 class Proxy(object): | |
57 """Represents a gRPC proxy. | |
58 | |
59 If the proxy begins with 'https', the returned channel will be secure and | |
60 authorized using default application credentials - see | |
61 https://developers.google.com/identity/protocols/application-default-credentia ls. | |
62 Currently, we're using Cloud Container Builder scopes for testing; this may | |
63 change in the future to allow different scopes to be passed in for different | |
64 channels. | |
65 | |
66 To use the returned channel to call methods directly, say: | |
67 | |
68 proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix', | |
69 myapi_pb2.MyApiStub) | |
70 | |
71 To make a unary call with retries (recommended): | |
72 | |
73 proto_output = proxy.call_unary('MyMethod', proto_input) | |
74 | |
75 To make a unary call without retries, or to pass in a client side stream | |
76 (proto_input can be an iterator here): | |
77 | |
78 proto_output = proxy.call_no_retries('MyMethod', proto_input) | |
79 | |
80 You can also call the stub directly (not recommended, since no errors will be | |
81 caught or logged): | |
82 | |
83 proto_output = proxy.stub.MyMethod(proto_input) | |
84 | |
85 To make a call to a server-side streaming call (these are not retried): | |
86 | |
87 for response in proxy.get_stream('MyStreaminingMethod', proto_input): | |
88 <process response> | |
89 | |
90 To retrieve the prefix: | |
91 | |
92 prefix = proxy.prefix # returns "prefix/for/resource/names" | |
93 | |
94 All exceptions are logged using "logging.warning." | |
95 """ | |
96 | |
97 def __init__(self, proxy, stub_class): | |
98 url = urlparse.urlparse(proxy) | |
99 if url.scheme is 'http': | |
100 self._secure = False | |
101 elif url.scheme is 'https': | |
102 self._secure = True | |
103 else: | |
104 raise ValueError('gRPC proxy must use http or https: %s' % proxy) | |
105 if url.netloc is '': | |
106 raise ValueError('gRPC proxy is missing hostname: %s' % proxy) | |
107 self._host = url.netloc | |
108 self._prefix = url.path | |
109 if not self._prefix.endswith('/'): | |
110 self._prefix += '/' | |
111 if url.params is not '' or url.fragment is not '': | |
112 raise ValueError('gRPC proxy may not contain params or fragments: %s' % | |
113 proxy) | |
114 self._debug_info = ['full proxy name: ' + proxy] | |
M-A Ruel
2017/08/04 02:05:37
instead of _debug_info, you can create a local log
aludwin
2017/08/04 14:53:10
The idea here is to *save* these bits of informati
| |
115 self._channel = self._create_channel() | |
116 self._stub = stub_class(self._channel) | |
117 logging.info('%s: initialized', self.name) | |
118 | |
119 @property | |
M-A Ruel
2017/08/04 02:05:37
Most of these are not needed, you can add only onc
aludwin
2017/08/04 14:53:10
Done.
| |
120 def host(self): | |
121 return self._host | |
122 | |
123 @property | |
124 def prefix(self): | |
125 return self._prefix | |
126 | |
127 @property | |
128 def secure(self): | |
129 return self._secure | |
130 | |
131 @property | |
132 def channel(self): | |
133 return self._channel | |
134 | |
135 @property | |
136 def stub(self): | |
137 return self._stub | |
138 | |
139 @property | |
140 def name(self): | |
141 security = 'insecure' | |
142 if self._secure: | |
143 security = 'secure' | |
144 return 'gRPC %s proxy %s/%s' % ( | |
145 security, self.host, self._stub.__class__.__name__) | |
146 | |
147 def call_unary(self, name, request): | |
148 """Calls a method, waiting if the service is not available. | |
149 | |
150 Usage: proto_output = proxy.call_unary('MyMethod', proto_input) | |
151 """ | |
152 grpc_error = None | |
153 for attempt in range(1, MAX_GRPC_ATTEMPTS+1): | |
154 try: | |
155 return self.call_no_retries(name, request) | |
156 except grpc.RpcError as g: | |
157 if g.code() is not grpc.StatusCode.UNAVAILABLE: | |
158 raise | |
159 logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)', | |
160 self.name, attempt, MAX_GRPC_ATTEMPTS) | |
161 # Save the error in case we need to return it | |
162 grpc_error = g | |
163 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP)) | |
164 # If we get here, it must be because we got (and saved) an error | |
165 assert grpc_error is not None | |
166 raise grpc_error | |
167 | |
168 def get_stream(self, name, request): | |
169 """Calls a server-side streaming method, returning an iterator. | |
170 | |
171 Usage: for resp in proxy.get_stream('MyMethod', proto_input'): | |
172 """ | |
173 def wrap_grpc_iterator(stream, name): | |
174 while True: | |
175 # The lambda "next(stream, 1)" will return a protobuf on success, or the | |
176 # integer 1 if the stream has ended. This allows us to avoid attempting | |
177 # to catch StopIteration, which gets logged by _wrap_grpc_operation. | |
178 response = self._wrap_grpc_operation(name + ' pull from stream', | |
179 lambda: next(stream, 1)) | |
180 if isinstance(response, int): | |
181 # Iteration is finished | |
182 return | |
183 yield response | |
184 stream = self.call_no_retries(name, request) | |
185 return wrap_grpc_iterator(stream, name) | |
M-A Ruel
2017/08/04 02:05:37
There's still no value in wrap_grpc_iterator, you
aludwin
2017/08/04 14:53:10
Oh, right :P Thanks!
| |
186 | |
187 def call_no_retries(self, name, request): | |
188 """Calls a method without any retries. | |
189 | |
190 Recommended for client-side streaming or nonidempotent unary calls. | |
191 """ | |
192 method = getattr(self._stub, name) | |
193 if method is None: | |
194 raise NameError('%s: "%s" is not a valid method name', self.name, name) | |
195 return self._wrap_grpc_operation( | |
196 name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC)) | |
197 | |
198 def _wrap_grpc_operation(self, name, fn): | |
199 """Wraps a gRPC operation (call or iterator increment) for logging.""" | |
200 try: | |
201 return fn() | |
202 except grpc.RpcError as g: | |
203 logging.warning('%s/%s - gRPC error: %s', self.name, name, g) | |
204 self._dump_proxy_info() | |
205 raise g | |
206 except Exception as e: | |
207 logging.warning('caught exception %s; none is %s', repr(e), e is None) | |
208 logging.warning('%s/%s - generic error: %s', self.name, name, e) | |
209 self._dump_proxy_info() | |
210 raise e | |
211 | |
212 def _dump_proxy_info(self): | |
213 logging.warning('%s: DETAILED PROXY INFO', self.name) | |
214 logging.warning('%s: prefix = %s', self.name, self.prefix) | |
215 logging.warning('%s: debug info:\n\t%s', self.name, | |
216 '\n\t'.join(self._debug_info)) | |
217 | |
218 def _create_channel(self): | |
219 # Make sure grpc was successfully imported | |
220 assert available() | |
221 | |
222 if not self._secure: | |
223 return grpc.insecure_channel(self._host) | |
224 | |
225 # Authenticate the host. | |
226 # | |
227 # You're allowed to override the root certs and server if necessary. For | |
228 # example, if you're running your proxy on localhost, you'll need to set | |
229 # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate fo r | |
M-A Ruel
2017/08/04 02:05:37
wrap all
aludwin
2017/08/04 14:53:10
Done.
| |
230 # the root CA that your localhost server has used to certify itself, and the | |
231 # GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to identify | |
232 # itself. For example, the ROOTS env var might be "/path/to/roots.crt" while | |
233 # the OVERRIDE env var might be "test_server," if this is what's used by the | |
234 # server you're running. | |
235 # | |
236 # If you're connecting to a real server with real SSL, none of this should b e | |
237 # used. | |
238 if not certifi: | |
239 self._debug_info.append('CERTIFI IS NOT PRESENT; gRPC HTTPS CONNECTIONS MA Y FAIL') | |
240 root_certs = None | |
241 roots = os.environ.get('GRPC_PROXY_TLS_ROOTS') | |
242 if roots is not None: | |
M-A Ruel
2017/08/04 02:05:37
if roots:
aludwin
2017/08/04 14:53:10
Done.
| |
243 self._debug_info.append('Overridden root CA: %s' % roots) | |
244 with open(roots) as f: | |
245 root_certs = f.read() | |
246 else: | |
247 self._debug_info.append('Using default root CAs from certifi') | |
248 overd = os.environ.get('GRPC_PROXY_TLS_OVERRIDE') | |
249 options = () | |
250 if overd is not None: | |
M-A Ruel
2017/08/04 02:05:37
if overd:
aludwin
2017/08/04 14:53:10
Done.
| |
251 options=(('grpc.ssl_target_name_override', overd),) | |
252 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) | |
253 | |
254 # Authenticate the user. | |
255 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',) | |
256 self._debug_info.append('Scopes are: %s' % repr(scopes)) | |
M-A Ruel
2017/08/04 02:05:37
%r instead of repr
aludwin
2017/08/04 14:53:10
Done.
| |
257 user_creds, _ = google_auth.default(scopes=scopes) | |
258 | |
259 # Create the channel. | |
260 request = google_auth_transport_requests.Request() | |
261 self._debug_info.append('Options are: %s' % repr(options)) | |
262 return google_auth_transport_grpc.secure_authorized_channel( | |
263 user_creds, request, self._host, ssl_creds, options=options) | |
OLD | NEW |