Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(138)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py

Issue 2987333002: Refactor all gRPC proxy code into a single class. (Closed)
Patch Set: Response to review Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The LUCI Authors. All rights reserved. 1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 # This is a reimplementation of RemoteClientNative but it uses (will use) 5 # This is a reimplementation of RemoteClientNative but it uses (will use)
6 # a gRPC method to communicate with a server instead of REST. 6 # a gRPC method to communicate with a server instead of REST.
7 7
8 import copy 8 import copy
9 import json 9 import json
10 import logging 10 import logging
11 import time 11 import time
12 12
13 import grpc
14 import google.protobuf.json_format 13 import google.protobuf.json_format
15 from proto_bot import swarming_bot_pb2 14 from proto_bot import swarming_bot_pb2
16 from remote_client_errors import InternalError 15 from remote_client_errors import InternalError
17 from remote_client_errors import MintOAuthTokenError 16 from remote_client_errors import MintOAuthTokenError
18 from remote_client_errors import PollError 17 from remote_client_errors import PollError
19 from utils import net 18 from utils import net
20 19 from utils import grpc_proxy
21
22 # How long to wait for a response from the server. Keeping the same as
23 # the equivalent in remote_client.py for now.
24 NET_CONNECTION_TIMEOUT_SEC = 5*60
25
26
27 # How many times to retry a gRPC call
28 MAX_GRPC_ATTEMPTS = 30
29
30
31 # Longest time to sleep between gRPC calls
32 MAX_GRPC_SLEEP = 10.
33 20
34 21
35 class RemoteClientGrpc(object): 22 class RemoteClientGrpc(object):
36 """RemoteClientGrpc knows how to make calls via gRPC. 23 """RemoteClientGrpc knows how to make calls via gRPC.
37 24
38 Any non-scalar value that is returned that references values from the proto 25 Any non-scalar value that is returned that references values from the proto
39 messages should be deepcopy'd since protos make use of weak references and 26 messages should be deepcopy'd since protos make use of weak references and
40 might be garbage-collected before the values are used. 27 might be garbage-collected before the values are used.
41 """ 28 """
42 29
43 def __init__(self, server): 30 def __init__(self, server):
44 logging.info('Communicating with host %s via gRPC', server) 31 logging.info('Communicating with host %s via gRPC', server)
32 self._proxy = grpc_proxy.Proxy(server, swarming_bot_pb2.BotServiceStub)
45 self._server = server 33 self._server = server
46 self._channel = grpc.insecure_channel(server)
47 self._stub = swarming_bot_pb2.BotServiceStub(self._channel)
48 self._log_is_asleep = False 34 self._log_is_asleep = False
49 35
50 def is_grpc(self): 36 def is_grpc(self):
51 return True 37 return True
52 38
53 def initialize(self, quit_bit=None): 39 def initialize(self, quit_bit=None):
54 pass 40 pass
55 41
56 @property 42 @property
57 def uses_auth(self): 43 def uses_auth(self):
(...skipping 19 matching lines...) Expand all
77 request.output_chunk.data = stdout_and_chunk[0] 63 request.output_chunk.data = stdout_and_chunk[0]
78 request.output_chunk.offset = stdout_and_chunk[1] 64 request.output_chunk.offset = stdout_and_chunk[1]
79 if exit_code != None: 65 if exit_code != None:
80 request.exit_status.code = exit_code 66 request.exit_status.code = exit_code
81 67
82 # Insert everything else. Note that the b64-encoded strings in the dict 68 # Insert everything else. Note that the b64-encoded strings in the dict
83 # are automatically decoded by ParseDict. 69 # are automatically decoded by ParseDict.
84 google.protobuf.json_format.ParseDict(params, request) 70 google.protobuf.json_format.ParseDict(params, request)
85 71
86 # Perform update 72 # Perform update
87 response = call_grpc(self._stub.TaskUpdate, request) 73 response = self._proxy.call_unary('TaskUpdate', request)
88 logging.debug('post_task_update() = %s', request) 74 logging.debug('post_task_update() = %s', request)
89 if response.error: 75 if response.error:
90 raise InternalError(response.error) 76 raise InternalError(response.error)
91 return not response.must_stop 77 return not response.must_stop
92 78
93 def post_task_error(self, task_id, bot_id, message): 79 def post_task_error(self, task_id, bot_id, message):
94 request = swarming_bot_pb2.TaskErrorRequest() 80 request = swarming_bot_pb2.TaskErrorRequest()
95 request.bot_id = bot_id 81 request.bot_id = bot_id
96 request.task_id = task_id 82 request.task_id = task_id
97 request.msg = message 83 request.msg = message
98 logging.error('post_task_error() = %s', request) 84 logging.error('post_task_error() = %s', request)
99 85
100 response = call_grpc(self._stub.TaskError, request) 86 response = self._proxy.call_unary('TaskError', request)
101 return response.ok 87 return response.ok
102 88
103 def _attributes_json_to_proto(self, json_attr, msg): 89 def _attributes_json_to_proto(self, json_attr, msg):
104 msg.version = json_attr['version'] 90 msg.version = json_attr['version']
105 for k, values in sorted(json_attr['dimensions'].iteritems()): 91 for k, values in sorted(json_attr['dimensions'].iteritems()):
106 pair = msg.dimensions.add() 92 pair = msg.dimensions.add()
107 pair.name = k 93 pair.name = k
108 pair.values.extend(values) 94 pair.values.extend(values)
109 create_state_proto(json_attr['state'], msg.state) 95 create_state_proto(json_attr['state'], msg.state)
110 96
111 def do_handshake(self, attributes): 97 def do_handshake(self, attributes):
112 request = swarming_bot_pb2.HandshakeRequest() 98 request = swarming_bot_pb2.HandshakeRequest()
113 self._attributes_json_to_proto(attributes, request.attributes) 99 self._attributes_json_to_proto(attributes, request.attributes)
114 response = call_grpc(self._stub.Handshake, request) 100 response = self._proxy.call_unary('Handshake', request)
115 resp = { 101 resp = {
116 'server_version': response.server_version, 102 'server_version': response.server_version,
117 'bot_version': response.bot_version, 103 'bot_version': response.bot_version,
118 'bot_group_cfg_version': response.bot_group_cfg_version, 104 'bot_group_cfg_version': response.bot_group_cfg_version,
119 'bot_group_cfg': { 105 'bot_group_cfg': {
120 'dimensions': { 106 'dimensions': {
121 d.name: d.values for d in response.bot_group_cfg.dimensions 107 d.name: d.values for d in response.bot_group_cfg.dimensions
122 }, 108 },
123 }, 109 },
124 } 110 }
125 logging.info('Completed handshake: %s', resp) 111 logging.info('Completed handshake: %s', resp)
126 return copy.deepcopy(resp) 112 return copy.deepcopy(resp)
127 113
128 def poll(self, attributes): 114 def poll(self, attributes):
129 request = swarming_bot_pb2.PollRequest() 115 request = swarming_bot_pb2.PollRequest()
130 self._attributes_json_to_proto(attributes, request.attributes) 116 self._attributes_json_to_proto(attributes, request.attributes)
131 # TODO(aludwin): gRPC-specific exception handling (raise PollError). 117 # TODO(aludwin): gRPC-specific exception handling (raise PollError).
132 response = call_grpc(self._stub.Poll, request) 118 response = self._proxy.call_unary('Poll', request)
133 119
134 if response.cmd == swarming_bot_pb2.PollResponse.UPDATE: 120 if response.cmd == swarming_bot_pb2.PollResponse.UPDATE:
135 return 'update', response.version 121 return 'update', response.version
136 122
137 if response.cmd == swarming_bot_pb2.PollResponse.SLEEP: 123 if response.cmd == swarming_bot_pb2.PollResponse.SLEEP:
138 if not self._log_is_asleep: 124 if not self._log_is_asleep:
139 logging.info('Going to sleep') 125 logging.info('Going to sleep')
140 self._log_is_asleep = True 126 self._log_is_asleep = True
141 return 'sleep', response.sleep_time 127 return 'sleep', response.sleep_time
142 128
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
182 self._log_is_asleep = False 168 self._log_is_asleep = False
183 return 'run', copy.deepcopy(manifest) 169 return 'run', copy.deepcopy(manifest)
184 170
185 raise PollError('Unknown command in response: %s' % response) 171 raise PollError('Unknown command in response: %s' % response)
186 172
187 def get_bot_code(self, new_zip_fn, bot_version, _bot_id): 173 def get_bot_code(self, new_zip_fn, bot_version, _bot_id):
188 # TODO(aludwin): exception handling, pass bot_id 174 # TODO(aludwin): exception handling, pass bot_id
189 logging.info('Updating to version: %s', bot_version) 175 logging.info('Updating to version: %s', bot_version)
190 request = swarming_bot_pb2.BotUpdateRequest() 176 request = swarming_bot_pb2.BotUpdateRequest()
191 request.bot_version = bot_version 177 request.bot_version = bot_version
192 response = call_grpc(self._stub.BotUpdate, request) 178 response = self._proxy.call_unary('BotUpdate', request)
193 with open(new_zip_fn, 'wb') as f: 179 with open(new_zip_fn, 'wb') as f:
194 f.write(response.bot_code) 180 f.write(response.bot_code)
195 181
196 def ping(self): 182 def ping(self):
197 pass 183 pass
198 184
199 def mint_oauth_token(self, task_id, bot_id, account_id, scopes): 185 def mint_oauth_token(self, task_id, bot_id, account_id, scopes):
200 # pylint: disable=unused-argument 186 # pylint: disable=unused-argument
201 raise MintOAuthTokenError( 187 raise MintOAuthTokenError(
202 'mint_oauth_token is not supported in grpc protocol') 188 'mint_oauth_token is not supported in grpc protocol')
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
237 223
238 The keyname for the message field is passed in to simplify the creation 224 The keyname for the message field is passed in to simplify the creation
239 of the submessage in the first place - you need to say getattr and not 225 of the submessage in the first place - you need to say getattr and not
240 simply refer to message.keyname since the former actually creates the 226 simply refer to message.keyname since the former actually creates the
241 submessage while the latter does not. 227 submessage while the latter does not.
242 """ 228 """
243 sub_msg = getattr(message, keyname) 229 sub_msg = getattr(message, keyname)
244 google.protobuf.json_format.Parse(json.dumps(value), sub_msg) 230 google.protobuf.json_format.Parse(json.dumps(value), sub_msg)
245 231
246 232
247 def call_grpc(method, request):
248 """Retries a command a set number of times"""
249 for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
250 try:
251 return method(request, timeout=NET_CONNECTION_TIMEOUT_SEC)
252 except grpc.RpcError as g:
253 if g.code() is not grpc.StatusCode.UNAVAILABLE:
254 raise
255 logging.warning('call_grpc - proxy is unavailable (attempt %d/%d)',
256 attempt, MAX_GRPC_ATTEMPTS)
257 grpc_error = g
258 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
259 # If we get here, it must be because we got (and saved) an error
260 assert grpc_error is not None
261 raise grpc_error
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698