Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 #!/usr/bin/env python | |
|
M-A Ruel
2017/08/02 21:05:11
This file is not executable.
aludwin
2017/08/04 01:44:27
Done.
| |
| 2 # Copyright 2017 The LUCI Authors. All rights reserved. | |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 | |
| 4 # that can be found in the LICENSE file. | |
| 5 | |
| 6 """Common gRPC implementation for Swarming and Isolate""" | |
| 7 | |
| 8 import logging | |
| 9 import os | |
| 10 import re | |
| 11 from utils import net | |
| 12 | |
| 13 # gRPC may not be installed on the worker machine. This is fine, as long as | |
| 14 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). | |
| 15 # Full external requirements are: grpcio, certifi. | |
| 16 try: | |
| 17 import grpc | |
| 18 from google import auth as google_auth | |
| 19 from google.auth.transport import grpc as google_auth_transport_grpc | |
| 20 from google.auth.transport import requests as google_auth_transport_requests | |
| 21 except ImportError as err: | |
| 22 grpc = None | |
| 23 | |
| 24 | |
| 25 # If gRPC was successfully imported, try to import certifi as well. This is not | |
| 26 # actually used anywhere in this module, but if certifi is missing, | |
| 27 # google.auth.transport (https://stackoverflow.com/questions/24973326). So this | |
|
M-A Ruel
2017/08/02 21:05:11
the sentence is incomplete
aludwin
2017/08/04 01:44:26
Done.
| |
| 28 # allows us to print out a somewhat-sane error message. | |
| 29 certifi = None | |
| 30 if grpc is not None: | |
| 31 try: | |
| 32 import certifi | |
| 33 except ImportError: | |
| 34 # Will print out error messages later (ie when we have a logger) | |
| 35 pass | |
| 36 | |
| 37 | |
| 38 # How many times to retry a gRPC call | |
| 39 MAX_GRPC_ATTEMPTS = 30 | |
| 40 | |
| 41 | |
| 42 # Longest time to sleep between gRPC calls | |
| 43 MAX_GRPC_SLEEP = 10. | |
| 44 | |
| 45 | |
| 46 # Start the timeout at three minutes. | |
| 47 GRPC_TIMEOUT_SEC = 3 * 60 | |
| 48 | |
| 49 | |
| 50 def available(): | |
| 51 """Returns true if gRPC can be used on this host.""" | |
| 52 return grpc != None | |
| 53 | |
| 54 | |
| 55 class Proxy(object): | |
| 56 """Represents a gRPC proxy. | |
| 57 | |
| 58 If the proxy begins with 'https', the returned channel will be secure and | |
| 59 authorized using default application credentials | |
| 60 (https://developers.google.com/identity/protocols/application-default-credenti als). | |
|
M-A Ruel
2017/08/02 21:05:11
() are unnecessary
aludwin
2017/08/04 01:44:26
Done.
| |
| 61 Currently, we're using Cloud Container Builder scopes for testing; this may | |
| 62 change in the future to allow different scopes to be passed in for different | |
| 63 channels. | |
| 64 | |
| 65 To use the returned channel to call methods directly, say: | |
| 66 | |
| 67 proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix', | |
| 68 myapi_pb2.MyApiStub) | |
| 69 | |
| 70 To make a unary call with retries (recommended): | |
| 71 | |
| 72 proto_output = proxy.call_unary('MyMethod', proto_input) | |
| 73 | |
| 74 To make a unary call without retries, or to pass in a client side stream | |
| 75 (proto_input can be an iterator here): | |
| 76 | |
| 77 proto_output = proxy.call_no_retries('MyMethod', proto_input) | |
| 78 | |
| 79 You can also call the stub directly (not recommended, since no errors will be | |
| 80 caught or logged): | |
| 81 | |
| 82 proto_output = proxy.stub.MyMethod(proto_input) | |
| 83 | |
| 84 To make a call to a server-side streaming call (these are not retried): | |
| 85 | |
| 86 for response in proxy.get_stream('MyStreaminingMethod', proto_input): | |
| 87 <process response> | |
| 88 | |
| 89 To retrieve the prefix: | |
| 90 | |
| 91 prefix = proxy.prefix # returns "prefix/for/resource/names" | |
| 92 | |
| 93 All exceptions are logged using "logging.warning." | |
| 94 """ | |
| 95 | |
| 96 def __init__(self, proxy, stub_class): | |
| 97 # `proxy` must be of the form: http[s]://<server>[:port][/prefix] | |
| 98 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) | |
|
M-A Ruel
2017/08/02 21:05:11
I prefer to use urlparse + startswith(('http://',
aludwin
2017/08/04 01:44:26
Done.
| |
| 99 if not m: | |
| 100 raise ValueError(('gRPC proxy must have the form: ' | |
| 101 'http[s]://<server>[:port][/prefix] ' | |
| 102 '(given: %s)') % proxy) | |
| 103 transport = m.group(1) | |
| 104 if transport == 'http': | |
| 105 self._secure = False | |
| 106 elif transport == 'https': | |
| 107 self._secure = True | |
| 108 else: | |
| 109 raise ValueError('unknown transport %s (should be http[s])' % transport) | |
| 110 self._host = m.group(2) | |
| 111 self._prefix = m.group(3) | |
| 112 self._debug_info = [] | |
| 113 self._channel = self._create_channel() | |
| 114 self._stub = stub_class(self._channel) | |
| 115 logging.info('%s: initialized', self.name) | |
| 116 | |
| 117 @property | |
| 118 def host(self): | |
| 119 return self._host | |
| 120 | |
| 121 @property | |
| 122 def prefix(self): | |
| 123 return self._prefix | |
| 124 | |
| 125 @property | |
| 126 def secure(self): | |
| 127 return self._secure | |
| 128 | |
| 129 @property | |
| 130 def channel(self): | |
| 131 return self._channel | |
| 132 | |
| 133 @property | |
| 134 def stub(self): | |
| 135 return self._stub | |
| 136 | |
| 137 @property | |
| 138 def name(self): | |
| 139 security = 'insecure' | |
| 140 if self._secure: | |
| 141 security = 'secure' | |
| 142 return 'gRPC %s proxy %s/%s' % ( | |
| 143 security, self.host, self._stub.__class__.__name__) | |
| 144 | |
| 145 def call_unary(self, name, request): | |
| 146 """Calls a method, waiting if the service is not available. | |
| 147 | |
| 148 Usage: proto_output = proxy.call_unary('MyMethod', proto_input) | |
| 149 """ | |
| 150 grpc_error = None | |
| 151 for attempt in range(1, MAX_GRPC_ATTEMPTS+1): | |
| 152 try: | |
| 153 return self.call_no_retries(name, request) | |
| 154 except grpc.RpcError as g: | |
| 155 if g.code() is not grpc.StatusCode.UNAVAILABLE: | |
| 156 raise | |
| 157 logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)', | |
| 158 self.name, attempt, MAX_GRPC_ATTEMPTS) | |
| 159 # Save the error in case we need to return it | |
| 160 grpc_error = g | |
| 161 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP)) | |
| 162 # If we get here, it must be because we got (and saved) an error | |
| 163 assert grpc_error is not None | |
| 164 raise grpc_error | |
| 165 | |
| 166 def get_stream(self, name, request): | |
| 167 """Calls a server-side streaming method, returning an iterator. | |
| 168 | |
| 169 Usage: for resp in proxy.get_stream('MyMethod', proto_input'): | |
| 170 """ | |
| 171 def stream_proxy(stream, name): | |
| 172 while True: | |
| 173 # The lambda "next(stream, 1)" will return a protobuf on success, or the | |
| 174 # integer 1 if the stream has ended. This allows us to avoid attempting | |
| 175 # to catch StopIteration, which gets logged by _wrap_grpc_operation. | |
| 176 response = self._wrap_grpc_operation(name + ' pull from stream', | |
| 177 lambda: next(stream, 1)) | |
| 178 if isinstance(response, int): | |
| 179 # Iteration is finished | |
| 180 return | |
| 181 yield response | |
| 182 stream = self.call_no_retries(name, request) | |
| 183 sp = lambda x: stream_proxy(x, name) | |
|
M-A Ruel
2017/08/02 21:05:11
I don't understand why you create a lambda just to
aludwin
2017/08/04 01:44:26
I created a lambda because I forgot how generators
| |
| 184 return sp(stream) | |
| 185 | |
| 186 def call_no_retries(self, name, request): | |
| 187 """Calls a method without any retries. | |
| 188 | |
| 189 Recommended for client-side streaming or nonidempotent unary calls. | |
| 190 """ | |
| 191 method = getattr(self._stub, name) | |
| 192 if method is None: | |
| 193 raise NameError('%s: "%s" is not a valid method name', self.name, name) | |
| 194 return self._wrap_grpc_operation(name, lambda: method(request, timeout=GRPC_ TIMEOUT_SEC)) | |
|
M-A Ruel
2017/08/02 21:05:11
wrap (Everywhere)
aludwin
2017/08/04 01:44:27
Done.
| |
| 195 | |
| 196 def _wrap_grpc_operation(self, name, fn): | |
| 197 """Wraps a gRPC operation (call or iterator increment) for logging.""" | |
| 198 try: | |
| 199 return fn() | |
| 200 except grpc.RpcError as g: | |
| 201 logging.warning('%s/%s - gRPC error: %s', self.name, name, g) | |
| 202 self._dump_proxy_info() | |
| 203 raise g | |
| 204 except Exception as e: | |
| 205 logging.warning('caught exception %s; none is %s', repr(e), e is None) | |
| 206 logging.warning('%s/%s - generic error: %s', self.name, name, e) | |
| 207 self._dump_proxy_info() | |
| 208 raise e | |
| 209 | |
| 210 def _dump_proxy_info(self): | |
| 211 logging.warning('%s: DETAILED PROXY INFO', self.name) | |
| 212 logging.warning('%s: prefix = %s', self.name, self.prefix) | |
| 213 logging.warning('%s: debug info:\n\t%s', self.name, '\n\t'.join(self._debug_ info)) | |
| 214 | |
| 215 def _create_channel(self): | |
| 216 # Make sure grpc was successfully imported | |
| 217 assert available() | |
| 218 | |
| 219 if not self._secure: | |
| 220 return grpc.insecure_channel(self._host) | |
| 221 | |
| 222 # Authenticate the host. | |
| 223 # | |
| 224 # You're allowed to override the root certs and server if necessary. For | |
| 225 # example, if you're running your proxy on localhost, you'll need to set | |
| 226 # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate fo r | |
| 227 # the root CA that your localhost server has used to certify itself, and the | |
| 228 # GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to identify | |
| 229 # itself. For example, the ROOTS env var might be "/path/to/roots.crt" while | |
| 230 # the OVERRIDE env var might be "test_server," if this is what's used by the | |
| 231 # server you're running. | |
| 232 # | |
| 233 # If you're connecting to a real server with real SSL, none of this should b e | |
| 234 # used. | |
| 235 if not certifi: | |
| 236 self._debug_info.append('CERTIFI IS NOT PRESENT; gRPC HTTPS CONNECTIONS MA Y FAIL') | |
| 237 root_certs = None | |
| 238 roots = os.environ.get('GRPC_PROXY_TLS_ROOTS') | |
|
M-A Ruel
2017/08/02 21:05:11
We'd need to document these under doc/
aludwin
2017/08/04 01:44:26
Done.
| |
| 239 if roots is not None: | |
| 240 self._debug_info.append('Overridden root CA: %s' % roots) | |
| 241 with open(roots) as f: | |
| 242 root_certs = f.read() | |
| 243 else: | |
| 244 self._debug_info.append('Using default root CAs from certifi') | |
| 245 overd = os.environ.get('GRPC_PROXY_TLS_OVERRIDE') | |
| 246 options = () | |
| 247 if overd is not None: | |
| 248 options=(('grpc.ssl_target_name_override', overd),) | |
| 249 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) | |
| 250 | |
| 251 # Authenticate the user. | |
| 252 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',) | |
| 253 self._debug_info.append('Scopes are: %s' % repr(scopes)) | |
| 254 user_creds, _ = google_auth.default(scopes=scopes) | |
| 255 | |
| 256 # Create the channel. | |
| 257 request = google_auth_transport_requests.Request() | |
| 258 self._debug_info.append('Options are: %s' % repr(options)) | |
| 259 return google_auth_transport_grpc.secure_authorized_channel( | |
| 260 user_creds, request, self._host, ssl_creds, options=options) | |
| OLD | NEW |