Chromium Code Reviews

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py

Issue 2592683002: Retry non-streaming gRPC calls (Closed)
Patch Set: Call calculate_sleep_before_retry and add unit tests Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff |
OLDNEW
1 # Copyright 2016 The LUCI Authors. All rights reserved. 1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 # This is a reimplementation of RemoteClientNative but it uses (will use) 5 # This is a reimplementation of RemoteClientNative but it uses (will use)
6 # a gRPC method to communicate with a server instead of REST. 6 # a gRPC method to communicate with a server instead of REST.
7 7
8 import json 8 import json
9 import logging 9 import logging
10 import math
11 import random
12 import time
10 13
11 import grpc 14 import grpc
12 import google.protobuf.json_format 15 import google.protobuf.json_format
13 from proto_bot import swarming_bot_pb2 16 from proto_bot import swarming_bot_pb2
14 from remote_client_errors import InternalError 17 from remote_client_errors import InternalError
18 from utils import net
15 19
16 20
17 # How long to wait for a response from the server. Keeping the same as 21 # How long to wait for a response from the server. Keeping the same as
18 # the equivalent in remote_client.py for now. 22 # the equivalent in remote_client.py for now.
19 NET_CONNECTION_TIMEOUT_SEC = 5*60 23 NET_CONNECTION_TIMEOUT_SEC = 5*60
20 24
21 25
26 # How many times to retry a gRPC call
27 MAX_GRPC_ATTEMPTS = 30
28
29
30 # Longest time to sleep between gRPC calls
31 MAX_GRPC_SLEEP = 10.
32
33
22 class RemoteClientGrpc(object): 34 class RemoteClientGrpc(object):
23 """RemoteClientGrpc knows how to make calls via gRPC. 35 """RemoteClientGrpc knows how to make calls via gRPC.
24 """ 36 """
25 37
26 def __init__(self, server): 38 def __init__(self, server):
27 logging.info('Communicating with host %s via gRPC', server) 39 logging.info('Communicating with host %s via gRPC', server)
28 self._server = server 40 self._server = server
29 self._channel = grpc.insecure_channel(server) 41 self._channel = grpc.insecure_channel(server)
30 self._stub = swarming_bot_pb2.BotServiceStub(self._channel) 42 self._stub = swarming_bot_pb2.BotServiceStub(self._channel)
31 self._log_is_asleep = False 43 self._log_is_asleep = False
(...skipping 28 matching lines...)
60 request.output_chunk.data = stdout_and_chunk[0] 72 request.output_chunk.data = stdout_and_chunk[0]
61 request.output_chunk.offset = stdout_and_chunk[1] 73 request.output_chunk.offset = stdout_and_chunk[1]
62 if exit_code != None: 74 if exit_code != None:
63 request.exit_status.code = exit_code 75 request.exit_status.code = exit_code
64 76
65 # Insert everything else. Note that the b64-encoded strings in the dict 77 # Insert everything else. Note that the b64-encoded strings in the dict
66 # are automatically decoded by ParseDict. 78 # are automatically decoded by ParseDict.
67 google.protobuf.json_format.ParseDict(params, request) 79 google.protobuf.json_format.ParseDict(params, request)
68 80
69 # Perform update 81 # Perform update
70 response = self._stub.TaskUpdate(request, 82 response = call_grpc(self._stub.TaskUpdate, request)
71 timeout=NET_CONNECTION_TIMEOUT_SEC)
72 logging.debug('post_task_update() = %s', request) 83 logging.debug('post_task_update() = %s', request)
73 if response.error: 84 if response.error:
74 raise InternalError(response.error) 85 raise InternalError(response.error)
75 return not response.must_stop 86 return not response.must_stop
76 87
77 def post_task_error(self, task_id, bot_id, message): 88 def post_task_error(self, task_id, bot_id, message):
78 request = swarming_bot_pb2.TaskErrorRequest() 89 request = swarming_bot_pb2.TaskErrorRequest()
79 request.bot_id = bot_id 90 request.bot_id = bot_id
80 request.task_id = task_id 91 request.task_id = task_id
81 request.msg = message 92 request.msg = message
82 logging.error('post_task_error() = %s', request) 93 logging.error('post_task_error() = %s', request)
83 94
84 response = self._stub.TaskError(request, timeout=NET_CONNECTION_TIMEOUT_SEC) 95 response = call_grpc(self._stub.TaskError, request)
85 return response.ok 96 return response.ok
86 97
87 def _attributes_json_to_proto(self, json_attr, msg): 98 def _attributes_json_to_proto(self, json_attr, msg):
88 msg.version = json_attr['version'] 99 msg.version = json_attr['version']
89 for k, values in sorted(json_attr['dimensions'].iteritems()): 100 for k, values in sorted(json_attr['dimensions'].iteritems()):
90 pair = msg.dimensions.add() 101 pair = msg.dimensions.add()
91 pair.name = k 102 pair.name = k
92 pair.values.extend(values) 103 pair.values.extend(values)
93 create_state_proto(json_attr['state'], msg.state) 104 create_state_proto(json_attr['state'], msg.state)
94 105
95 def do_handshake(self, attributes): 106 def do_handshake(self, attributes):
96 request = swarming_bot_pb2.HandshakeRequest() 107 request = swarming_bot_pb2.HandshakeRequest()
97 self._attributes_json_to_proto(attributes, request.attributes) 108 self._attributes_json_to_proto(attributes, request.attributes)
98 response = self._stub.Handshake(request, timeout=NET_CONNECTION_TIMEOUT_SEC) 109 response = call_grpc(self._stub.Handshake, request)
99 resp = { 110 resp = {
100 'server_version': response.server_version, 111 'server_version': response.server_version,
101 'bot_version': response.bot_version, 112 'bot_version': response.bot_version,
102 'bot_group_cfg_version': response.bot_group_cfg_version, 113 'bot_group_cfg_version': response.bot_group_cfg_version,
103 'bot_group_cfg': { 114 'bot_group_cfg': {
104 'dimensions': { 115 'dimensions': {
105 d.name: d.values for d in response.bot_group_cfg.dimensions 116 d.name: d.values for d in response.bot_group_cfg.dimensions
106 }, 117 },
107 }, 118 },
108 } 119 }
109 logging.info('Completed handshake: %s', resp) 120 logging.info('Completed handshake: %s', resp)
110 return resp 121 return resp
111 122
112 def poll(self, attributes): 123 def poll(self, attributes):
113 request = swarming_bot_pb2.PollRequest() 124 request = swarming_bot_pb2.PollRequest()
114 self._attributes_json_to_proto(attributes, request.attributes) 125 self._attributes_json_to_proto(attributes, request.attributes)
115 # TODO(aludwin): gRPC-specific exception handling 126 # TODO(aludwin): gRPC-specific exception handling
116 response = self._stub.Poll(request, timeout=NET_CONNECTION_TIMEOUT_SEC) 127 response = call_grpc(self._stub.Poll, request)
117 128
118 if response.cmd == swarming_bot_pb2.PollResponse.UPDATE: 129 if response.cmd == swarming_bot_pb2.PollResponse.UPDATE:
119 return 'update', response.version 130 return 'update', response.version
120 131
121 if response.cmd == swarming_bot_pb2.PollResponse.SLEEP: 132 if response.cmd == swarming_bot_pb2.PollResponse.SLEEP:
122 if not self._log_is_asleep: 133 if not self._log_is_asleep:
123 logging.info('Going to sleep') 134 logging.info('Going to sleep')
124 self._log_is_asleep = True 135 self._log_is_asleep = True
125 return 'sleep', response.sleep_time 136 return 'sleep', response.sleep_time
126 137
(...skipping 39 matching lines...)
166 self._log_is_asleep = False 177 self._log_is_asleep = False
167 return 'run', manifest 178 return 'run', manifest
168 179
169 raise ValueError('Unknown command in response: %s' % response) 180 raise ValueError('Unknown command in response: %s' % response)
170 181
171 def get_bot_code(self, new_zip_fn, bot_version, _bot_id): 182 def get_bot_code(self, new_zip_fn, bot_version, _bot_id):
172 # TODO(aludwin): exception handling, pass bot_id 183 # TODO(aludwin): exception handling, pass bot_id
173 logging.info('Updating to version: %s', bot_version) 184 logging.info('Updating to version: %s', bot_version)
174 request = swarming_bot_pb2.BotUpdateRequest() 185 request = swarming_bot_pb2.BotUpdateRequest()
175 request.bot_version = bot_version 186 request.bot_version = bot_version
176 response = self._stub.BotUpdate(request, timeout=NET_CONNECTION_TIMEOUT_SEC) 187 response = call_grpc(self._stub.BotUpdate, request)
177 with open(new_zip_fn, 'wb') as f: 188 with open(new_zip_fn, 'wb') as f:
178 f.write(response.bot_code) 189 f.write(response.bot_code)
179 190
180 def ping(self): 191 def ping(self):
181 pass 192 pass
182 193
183 194
184 def create_state_proto(state_dict, message): 195 def create_state_proto(state_dict, message):
185 """ Constructs a State message out of a state dict. 196 """ Constructs a State message out of a state dict.
186 197
(...skipping 27 matching lines...)
214 def insert_dict_as_submessage(message, keyname, value): 225 def insert_dict_as_submessage(message, keyname, value):
215 """Encodes a dict as a Protobuf message. 226 """Encodes a dict as a Protobuf message.
216 227
217 The keyname for the message field is passed in to simplify the creation 228 The keyname for the message field is passed in to simplify the creation
218 of the submessage in the first place - you need to say getattr and not 229 of the submessage in the first place - you need to say getattr and not
219 simply refer to message.keyname since the former actually creates the 230 simply refer to message.keyname since the former actually creates the
220 submessage while the latter does not. 231 submessage while the latter does not.
221 """ 232 """
222 sub_msg = getattr(message, keyname) 233 sub_msg = getattr(message, keyname)
223 google.protobuf.json_format.Parse(json.dumps(value), sub_msg) 234 google.protobuf.json_format.Parse(json.dumps(value), sub_msg)
235
236
237 def call_grpc(method, request):
238 """Retries a command a set number of times"""
239 for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
240 try:
241 return method(request, timeout=NET_CONNECTION_TIMEOUT_SEC)
242 except grpc.RpcError as g:
243 if g.code() is not grpc.StatusCode.UNAVAILABLE:
244 raise
245 logging.warning('call_grpc - proxy is unavailable (attempt %d/%d)',
246 attempt, MAX_GRPC_ATTEMPTS)
247 grpc_error = g
248 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
249 # If we get here, it must be because we got (and saved) an error
250 assert grpc_error is not None
251 raise grpc_error
OLDNEW

Powered by Google App Engine