Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(87)

Side by Side Diff: client/utils/grpc_proxy.py

Issue 2987333002: Refactor all gRPC proxy code into a single class. (Closed)
Patch Set: Refactor all gRPC proxy code into a single class. Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #!/usr/bin/env python
M-A Ruel 2017/08/02 21:05:11 This file is not executable.
aludwin 2017/08/04 01:44:27 Done.
2 # Copyright 2017 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file.
5
6 """Common gRPC implementation for Swarming and Isolate"""
7
8 import logging
9 import os
10 import re
11 from utils import net
12
13 # gRPC may not be installed on the worker machine. This is fine, as long as
14 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
15 # Full external requirements are: grpcio, certifi.
16 try:
17 import grpc
18 from google import auth as google_auth
19 from google.auth.transport import grpc as google_auth_transport_grpc
20 from google.auth.transport import requests as google_auth_transport_requests
21 except ImportError as err:
22 grpc = None
23
24
25 # If gRPC was successfully imported, try to import certifi as well. This is not
26 # actually used anywhere in this module, but if certifi is missing,
27 # google.auth.transport (https://stackoverflow.com/questions/24973326). So this
M-A Ruel 2017/08/02 21:05:11 the sentence is incomplete
aludwin 2017/08/04 01:44:26 Done.
28 # allows us to print out a somewhat-sane error message.
29 certifi = None
30 if grpc is not None:
31 try:
32 import certifi
33 except ImportError:
34 # Will print out error messages later (ie when we have a logger)
35 pass
36
37
38 # How many times to retry a gRPC call
39 MAX_GRPC_ATTEMPTS = 30
40
41
42 # Longest time to sleep between gRPC calls
43 MAX_GRPC_SLEEP = 10.
44
45
46 # Start the timeout at three minutes.
47 GRPC_TIMEOUT_SEC = 3 * 60
48
49
50 def available():
51 """Returns true if gRPC can be used on this host."""
52 return grpc != None
53
54
55 class Proxy(object):
56 """Represents a gRPC proxy.
57
58 If the proxy begins with 'https', the returned channel will be secure and
59 authorized using default application credentials
60 (https://developers.google.com/identity/protocols/application-default-credenti als).
M-A Ruel 2017/08/02 21:05:11 () are unnecessary
aludwin 2017/08/04 01:44:26 Done.
61 Currently, we're using Cloud Container Builder scopes for testing; this may
62 change in the future to allow different scopes to be passed in for different
63 channels.
64
65 To use the returned channel to call methods directly, say:
66
67 proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix',
68 myapi_pb2.MyApiStub)
69
70 To make a unary call with retries (recommended):
71
72 proto_output = proxy.call_unary('MyMethod', proto_input)
73
74 To make a unary call without retries, or to pass in a client side stream
75 (proto_input can be an iterator here):
76
77 proto_output = proxy.call_no_retries('MyMethod', proto_input)
78
79 You can also call the stub directly (not recommended, since no errors will be
80 caught or logged):
81
82 proto_output = proxy.stub.MyMethod(proto_input)
83
84 To make a call to a server-side streaming call (these are not retried):
85
86 for response in proxy.get_stream('MyStreaminingMethod', proto_input):
87 <process response>
88
89 To retrieve the prefix:
90
91 prefix = proxy.prefix # returns "prefix/for/resource/names"
92
93 All exceptions are logged using "logging.warning."
94 """
95
96 def __init__(self, proxy, stub_class):
97 # `proxy` must be of the form: http[s]://<server>[:port][/prefix]
98 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
M-A Ruel 2017/08/02 21:05:11 I prefer to use urlparse + startswith(('http://',
aludwin 2017/08/04 01:44:26 Done.
99 if not m:
100 raise ValueError(('gRPC proxy must have the form: '
101 'http[s]://<server>[:port][/prefix] '
102 '(given: %s)') % proxy)
103 transport = m.group(1)
104 if transport == 'http':
105 self._secure = False
106 elif transport == 'https':
107 self._secure = True
108 else:
109 raise ValueError('unknown transport %s (should be http[s])' % transport)
110 self._host = m.group(2)
111 self._prefix = m.group(3)
112 self._debug_info = []
113 self._channel = self._create_channel()
114 self._stub = stub_class(self._channel)
115 logging.info('%s: initialized', self.name)
116
117 @property
118 def host(self):
119 return self._host
120
121 @property
122 def prefix(self):
123 return self._prefix
124
125 @property
126 def secure(self):
127 return self._secure
128
129 @property
130 def channel(self):
131 return self._channel
132
133 @property
134 def stub(self):
135 return self._stub
136
137 @property
138 def name(self):
139 security = 'insecure'
140 if self._secure:
141 security = 'secure'
142 return 'gRPC %s proxy %s/%s' % (
143 security, self.host, self._stub.__class__.__name__)
144
145 def call_unary(self, name, request):
146 """Calls a method, waiting if the service is not available.
147
148 Usage: proto_output = proxy.call_unary('MyMethod', proto_input)
149 """
150 grpc_error = None
151 for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
152 try:
153 return self.call_no_retries(name, request)
154 except grpc.RpcError as g:
155 if g.code() is not grpc.StatusCode.UNAVAILABLE:
156 raise
157 logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)',
158 self.name, attempt, MAX_GRPC_ATTEMPTS)
159 # Save the error in case we need to return it
160 grpc_error = g
161 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
162 # If we get here, it must be because we got (and saved) an error
163 assert grpc_error is not None
164 raise grpc_error
165
166 def get_stream(self, name, request):
167 """Calls a server-side streaming method, returning an iterator.
168
169 Usage: for resp in proxy.get_stream('MyMethod', proto_input'):
170 """
171 def stream_proxy(stream, name):
172 while True:
173 # The lambda "next(stream, 1)" will return a protobuf on success, or the
174 # integer 1 if the stream has ended. This allows us to avoid attempting
175 # to catch StopIteration, which gets logged by _wrap_grpc_operation.
176 response = self._wrap_grpc_operation(name + ' pull from stream',
177 lambda: next(stream, 1))
178 if isinstance(response, int):
179 # Iteration is finished
180 return
181 yield response
182 stream = self.call_no_retries(name, request)
183 sp = lambda x: stream_proxy(x, name)
M-A Ruel 2017/08/02 21:05:11 I don't understand why you create a lambda just to
aludwin 2017/08/04 01:44:26 I created a lambda because I forgot how generators
184 return sp(stream)
185
186 def call_no_retries(self, name, request):
187 """Calls a method without any retries.
188
189 Recommended for client-side streaming or nonidempotent unary calls.
190 """
191 method = getattr(self._stub, name)
192 if method is None:
193 raise NameError('%s: "%s" is not a valid method name', self.name, name)
194 return self._wrap_grpc_operation(name, lambda: method(request, timeout=GRPC_ TIMEOUT_SEC))
M-A Ruel 2017/08/02 21:05:11 wrap (Everywhere)
aludwin 2017/08/04 01:44:27 Done.
195
196 def _wrap_grpc_operation(self, name, fn):
197 """Wraps a gRPC operation (call or iterator increment) for logging."""
198 try:
199 return fn()
200 except grpc.RpcError as g:
201 logging.warning('%s/%s - gRPC error: %s', self.name, name, g)
202 self._dump_proxy_info()
203 raise g
204 except Exception as e:
205 logging.warning('caught exception %s; none is %s', repr(e), e is None)
206 logging.warning('%s/%s - generic error: %s', self.name, name, e)
207 self._dump_proxy_info()
208 raise e
209
210 def _dump_proxy_info(self):
211 logging.warning('%s: DETAILED PROXY INFO', self.name)
212 logging.warning('%s: prefix = %s', self.name, self.prefix)
213 logging.warning('%s: debug info:\n\t%s', self.name, '\n\t'.join(self._debug_ info))
214
215 def _create_channel(self):
216 # Make sure grpc was successfully imported
217 assert available()
218
219 if not self._secure:
220 return grpc.insecure_channel(self._host)
221
222 # Authenticate the host.
223 #
224 # You're allowed to override the root certs and server if necessary. For
225 # example, if you're running your proxy on localhost, you'll need to set
226 # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate fo r
227 # the root CA that your localhost server has used to certify itself, and the
228 # GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to identify
229 # itself. For example, the ROOTS env var might be "/path/to/roots.crt" while
230 # the OVERRIDE env var might be "test_server," if this is what's used by the
231 # server you're running.
232 #
233 # If you're connecting to a real server with real SSL, none of this should b e
234 # used.
235 if not certifi:
236 self._debug_info.append('CERTIFI IS NOT PRESENT; gRPC HTTPS CONNECTIONS MA Y FAIL')
237 root_certs = None
238 roots = os.environ.get('GRPC_PROXY_TLS_ROOTS')
M-A Ruel 2017/08/02 21:05:11 We'd need to document these under doc/
aludwin 2017/08/04 01:44:26 Done.
239 if roots is not None:
240 self._debug_info.append('Overridden root CA: %s' % roots)
241 with open(roots) as f:
242 root_certs = f.read()
243 else:
244 self._debug_info.append('Using default root CAs from certifi')
245 overd = os.environ.get('GRPC_PROXY_TLS_OVERRIDE')
246 options = ()
247 if overd is not None:
248 options=(('grpc.ssl_target_name_override', overd),)
249 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
250
251 # Authenticate the user.
252 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
253 self._debug_info.append('Scopes are: %s' % repr(scopes))
254 user_creds, _ = google_auth.default(scopes=scopes)
255
256 # Create the channel.
257 request = google_auth_transport_requests.Request()
258 self._debug_info.append('Options are: %s' % repr(options))
259 return google_auth_transport_grpc.secure_authorized_channel(
260 user_creds, request, self._host, ssl_creds, options=options)
OLDNEW
« appengine/swarming/swarming_bot/bot_code/remote_client.py ('K') | « client/isolate_storage.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698