Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 # Copyright 2017 The LUCI Authors. All rights reserved. | |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 # that can be found in the LICENSE file. | |
| 4 | |
| 5 """Common gRPC implementation for Swarming and Isolate""" | |
| 6 | |
| 7 import logging | |
| 8 import os | |
| 9 import re | |
| 10 import urlparse | |
| 11 from utils import net | |
| 12 | |
| 13 # gRPC may not be installed on the worker machine. This is fine, as long as | |
| 14 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). | |
| 15 # Full external requirements are: grpcio, certifi. | |
| 16 try: | |
| 17 import grpc | |
| 18 from google import auth as google_auth | |
| 19 from google.auth.transport import grpc as google_auth_transport_grpc | |
| 20 from google.auth.transport import requests as google_auth_transport_requests | |
| 21 except ImportError as err: | |
| 22 grpc = None | |
| 23 | |
| 24 | |
| 25 # If gRPC was successfully imported, try to import certifi as well. This is not | |
| 26 # actually used anywhere in this module, but if certifi is missing, | |
| 27 # google.auth.transport will fail (see | |
| 28 # https://stackoverflow.com/questions/24973326). So checking it here allows us | |
| 29 # to print out a somewhat-sane error message. | |
| 30 certifi = None | |
| 31 if grpc is not None: | |
| 32 try: | |
| 33 import certifi | |
| 34 except ImportError: | |
| 35 # Will print out error messages later (ie when we have a logger) | |
| 36 pass | |
| 37 | |
| 38 | |
| 39 # How many times to retry a gRPC call | |
| 40 MAX_GRPC_ATTEMPTS = 30 | |
| 41 | |
| 42 | |
| 43 # Longest time to sleep between gRPC calls | |
| 44 MAX_GRPC_SLEEP = 10. | |
| 45 | |
| 46 | |
| 47 # Start the timeout at three minutes. | |
| 48 GRPC_TIMEOUT_SEC = 3 * 60 | |
| 49 | |
| 50 | |
| 51 def available(): | |
| 52 """Returns true if gRPC can be used on this host.""" | |
| 53 return grpc != None | |
| 54 | |
| 55 | |
| 56 class Proxy(object): | |
| 57 """Represents a gRPC proxy. | |
| 58 | |
| 59 If the proxy begins with 'https', the returned channel will be secure and | |
| 60 authorized using default application credentials - see | |
| 61 https://developers.google.com/identity/protocols/application-default-credentia ls. | |
| 62 Currently, we're using Cloud Container Builder scopes for testing; this may | |
| 63 change in the future to allow different scopes to be passed in for different | |
| 64 channels. | |
| 65 | |
| 66 To use the returned channel to call methods directly, say: | |
| 67 | |
| 68 proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix', | |
| 69 myapi_pb2.MyApiStub) | |
| 70 | |
| 71 To make a unary call with retries (recommended): | |
| 72 | |
| 73 proto_output = proxy.call_unary('MyMethod', proto_input) | |
| 74 | |
| 75 To make a unary call without retries, or to pass in a client side stream | |
| 76 (proto_input can be an iterator here): | |
| 77 | |
| 78 proto_output = proxy.call_no_retries('MyMethod', proto_input) | |
| 79 | |
| 80 You can also call the stub directly (not recommended, since no errors will be | |
| 81 caught or logged): | |
| 82 | |
| 83 proto_output = proxy.stub.MyMethod(proto_input) | |
| 84 | |
| 85 To make a call to a server-side streaming call (these are not retried): | |
| 86 | |
| 87 for response in proxy.get_stream('MyStreaminingMethod', proto_input): | |
| 88 <process response> | |
| 89 | |
| 90 To retrieve the prefix: | |
| 91 | |
| 92 prefix = proxy.prefix # returns "prefix/for/resource/names" | |
| 93 | |
| 94 All exceptions are logged using "logging.warning." | |
| 95 """ | |
| 96 | |
| 97 def __init__(self, proxy, stub_class): | |
| 98 url = urlparse.urlparse(proxy) | |
| 99 if url.scheme is 'http': | |
| 100 self._secure = False | |
| 101 elif url.scheme is 'https': | |
| 102 self._secure = True | |
| 103 else: | |
| 104 raise ValueError('gRPC proxy must use http or https: %s' % proxy) | |
| 105 if url.netloc is '': | |
| 106 raise ValueError('gRPC proxy is missing hostname: %s' % proxy) | |
| 107 self._host = url.netloc | |
| 108 self._prefix = url.path | |
| 109 if not self._prefix.endswith('/'): | |
| 110 self._prefix += '/' | |
| 111 if url.params is not '' or url.fragment is not '': | |
| 112 raise ValueError('gRPC proxy may not contain params or fragments: %s' % | |
| 113 proxy) | |
| 114 self._debug_info = ['full proxy name: ' + proxy] | |
|
M-A Ruel
2017/08/04 02:05:37
instead of _debug_info, you can create a local log
aludwin
2017/08/04 14:53:10
The idea here is to *save* these bits of informati
| |
| 115 self._channel = self._create_channel() | |
| 116 self._stub = stub_class(self._channel) | |
| 117 logging.info('%s: initialized', self.name) | |
| 118 | |
| 119 @property | |
|
M-A Ruel
2017/08/04 02:05:37
Most of these are not needed, you can add only onc
aludwin
2017/08/04 14:53:10
Done.
| |
| 120 def host(self): | |
| 121 return self._host | |
| 122 | |
| 123 @property | |
| 124 def prefix(self): | |
| 125 return self._prefix | |
| 126 | |
| 127 @property | |
| 128 def secure(self): | |
| 129 return self._secure | |
| 130 | |
| 131 @property | |
| 132 def channel(self): | |
| 133 return self._channel | |
| 134 | |
| 135 @property | |
| 136 def stub(self): | |
| 137 return self._stub | |
| 138 | |
| 139 @property | |
| 140 def name(self): | |
| 141 security = 'insecure' | |
| 142 if self._secure: | |
| 143 security = 'secure' | |
| 144 return 'gRPC %s proxy %s/%s' % ( | |
| 145 security, self.host, self._stub.__class__.__name__) | |
| 146 | |
| 147 def call_unary(self, name, request): | |
| 148 """Calls a method, waiting if the service is not available. | |
| 149 | |
| 150 Usage: proto_output = proxy.call_unary('MyMethod', proto_input) | |
| 151 """ | |
| 152 grpc_error = None | |
| 153 for attempt in range(1, MAX_GRPC_ATTEMPTS+1): | |
| 154 try: | |
| 155 return self.call_no_retries(name, request) | |
| 156 except grpc.RpcError as g: | |
| 157 if g.code() is not grpc.StatusCode.UNAVAILABLE: | |
| 158 raise | |
| 159 logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)', | |
| 160 self.name, attempt, MAX_GRPC_ATTEMPTS) | |
| 161 # Save the error in case we need to return it | |
| 162 grpc_error = g | |
| 163 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP)) | |
| 164 # If we get here, it must be because we got (and saved) an error | |
| 165 assert grpc_error is not None | |
| 166 raise grpc_error | |
| 167 | |
| 168 def get_stream(self, name, request): | |
| 169 """Calls a server-side streaming method, returning an iterator. | |
| 170 | |
| 171 Usage: for resp in proxy.get_stream('MyMethod', proto_input'): | |
| 172 """ | |
| 173 def wrap_grpc_iterator(stream, name): | |
| 174 while True: | |
| 175 # The lambda "next(stream, 1)" will return a protobuf on success, or the | |
| 176 # integer 1 if the stream has ended. This allows us to avoid attempting | |
| 177 # to catch StopIteration, which gets logged by _wrap_grpc_operation. | |
| 178 response = self._wrap_grpc_operation(name + ' pull from stream', | |
| 179 lambda: next(stream, 1)) | |
| 180 if isinstance(response, int): | |
| 181 # Iteration is finished | |
| 182 return | |
| 183 yield response | |
| 184 stream = self.call_no_retries(name, request) | |
| 185 return wrap_grpc_iterator(stream, name) | |
|
M-A Ruel
2017/08/04 02:05:37
There's still no value in wrap_grpc_iterator, you
aludwin
2017/08/04 14:53:10
Oh, right :P Thanks!
| |
| 186 | |
| 187 def call_no_retries(self, name, request): | |
| 188 """Calls a method without any retries. | |
| 189 | |
| 190 Recommended for client-side streaming or nonidempotent unary calls. | |
| 191 """ | |
| 192 method = getattr(self._stub, name) | |
| 193 if method is None: | |
| 194 raise NameError('%s: "%s" is not a valid method name', self.name, name) | |
| 195 return self._wrap_grpc_operation( | |
| 196 name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC)) | |
| 197 | |
| 198 def _wrap_grpc_operation(self, name, fn): | |
| 199 """Wraps a gRPC operation (call or iterator increment) for logging.""" | |
| 200 try: | |
| 201 return fn() | |
| 202 except grpc.RpcError as g: | |
| 203 logging.warning('%s/%s - gRPC error: %s', self.name, name, g) | |
| 204 self._dump_proxy_info() | |
| 205 raise g | |
| 206 except Exception as e: | |
| 207 logging.warning('caught exception %s; none is %s', repr(e), e is None) | |
| 208 logging.warning('%s/%s - generic error: %s', self.name, name, e) | |
| 209 self._dump_proxy_info() | |
| 210 raise e | |
| 211 | |
| 212 def _dump_proxy_info(self): | |
| 213 logging.warning('%s: DETAILED PROXY INFO', self.name) | |
| 214 logging.warning('%s: prefix = %s', self.name, self.prefix) | |
| 215 logging.warning('%s: debug info:\n\t%s', self.name, | |
| 216 '\n\t'.join(self._debug_info)) | |
| 217 | |
| 218 def _create_channel(self): | |
| 219 # Make sure grpc was successfully imported | |
| 220 assert available() | |
| 221 | |
| 222 if not self._secure: | |
| 223 return grpc.insecure_channel(self._host) | |
| 224 | |
| 225 # Authenticate the host. | |
| 226 # | |
| 227 # You're allowed to override the root certs and server if necessary. For | |
| 228 # example, if you're running your proxy on localhost, you'll need to set | |
| 229 # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate fo r | |
|
M-A Ruel
2017/08/04 02:05:37
wrap all
aludwin
2017/08/04 14:53:10
Done.
| |
| 230 # the root CA that your localhost server has used to certify itself, and the | |
| 231 # GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to identify | |
| 232 # itself. For example, the ROOTS env var might be "/path/to/roots.crt" while | |
| 233 # the OVERRIDE env var might be "test_server," if this is what's used by the | |
| 234 # server you're running. | |
| 235 # | |
| 236 # If you're connecting to a real server with real SSL, none of this should b e | |
| 237 # used. | |
| 238 if not certifi: | |
| 239 self._debug_info.append('CERTIFI IS NOT PRESENT; gRPC HTTPS CONNECTIONS MA Y FAIL') | |
| 240 root_certs = None | |
| 241 roots = os.environ.get('GRPC_PROXY_TLS_ROOTS') | |
| 242 if roots is not None: | |
|
M-A Ruel
2017/08/04 02:05:37
if roots:
aludwin
2017/08/04 14:53:10
Done.
| |
| 243 self._debug_info.append('Overridden root CA: %s' % roots) | |
| 244 with open(roots) as f: | |
| 245 root_certs = f.read() | |
| 246 else: | |
| 247 self._debug_info.append('Using default root CAs from certifi') | |
| 248 overd = os.environ.get('GRPC_PROXY_TLS_OVERRIDE') | |
| 249 options = () | |
| 250 if overd is not None: | |
|
M-A Ruel
2017/08/04 02:05:37
if overd:
aludwin
2017/08/04 14:53:10
Done.
| |
| 251 options=(('grpc.ssl_target_name_override', overd),) | |
| 252 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) | |
| 253 | |
| 254 # Authenticate the user. | |
| 255 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',) | |
| 256 self._debug_info.append('Scopes are: %s' % repr(scopes)) | |
|
M-A Ruel
2017/08/04 02:05:37
%r instead of repr
aludwin
2017/08/04 14:53:10
Done.
| |
| 257 user_creds, _ = google_auth.default(scopes=scopes) | |
| 258 | |
| 259 # Create the channel. | |
| 260 request = google_auth_transport_requests.Request() | |
| 261 self._debug_info.append('Options are: %s' % repr(options)) | |
| 262 return google_auth_transport_grpc.secure_authorized_channel( | |
| 263 user_creds, request, self._host, ssl_creds, options=options) | |
| OLD | NEW |