Chromium Code Reviews| Index: appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py |
| diff --git a/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py b/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py |
| index 3ef861d0017d73dc99af579aa4cbc7c56ac65bf7..62ff1f4a238cbffaa0c66e5696d1ea7a51cb52fa 100644 |
| --- a/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py |
| +++ b/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py |
| @@ -7,6 +7,7 @@ |
| import json |
| import logging |
| +import time |
| import grpc |
| import google.protobuf.json_format |
| @@ -19,6 +20,10 @@ from remote_client_errors import InternalError |
| NET_CONNECTION_TIMEOUT_SEC = 5*60 |
| +# How many times to retry a gRPC call |
| +MAX_GRPC_ATTEMPTS = 30 |
| + |
| + |
| class RemoteClientGrpc(object): |
| """RemoteClientGrpc knows how to make calls via gRPC. |
| """ |
| @@ -67,8 +72,7 @@ class RemoteClientGrpc(object): |
| google.protobuf.json_format.ParseDict(params, request) |
| # Perform update |
| - response = self._stub.TaskUpdate(request, |
| - timeout=NET_CONNECTION_TIMEOUT_SEC) |
| + response = call_grpc(self._stub.TaskUpdate, request) |
| logging.debug('post_task_update() = %s', request) |
| if response.error: |
| raise InternalError(response.error) |
| @@ -81,7 +85,7 @@ class RemoteClientGrpc(object): |
| request.msg = message |
| logging.error('post_task_error() = %s', request) |
| - response = self._stub.TaskError(request, timeout=NET_CONNECTION_TIMEOUT_SEC) |
| + response = call_grpc(self._stub.TaskError, request) |
| return response.ok |
| def _attributes_json_to_proto(self, json_attr, msg): |
| @@ -95,7 +99,7 @@ class RemoteClientGrpc(object): |
| def do_handshake(self, attributes): |
| request = swarming_bot_pb2.HandshakeRequest() |
| self._attributes_json_to_proto(attributes, request.attributes) |
| - response = self._stub.Handshake(request, timeout=NET_CONNECTION_TIMEOUT_SEC) |
| + response = call_grpc(self._stub.Handshake, request) |
| resp = { |
| 'server_version': response.server_version, |
| 'bot_version': response.bot_version, |
| @@ -113,7 +117,7 @@ class RemoteClientGrpc(object): |
| request = swarming_bot_pb2.PollRequest() |
| self._attributes_json_to_proto(attributes, request.attributes) |
| # TODO(aludwin): gRPC-specific exception handling |
| - response = self._stub.Poll(request, timeout=NET_CONNECTION_TIMEOUT_SEC) |
| + response = call_grpc(self._stub.Poll, request) |
| if response.cmd == swarming_bot_pb2.PollResponse.UPDATE: |
| return 'update', response.version |
| @@ -173,7 +177,7 @@ class RemoteClientGrpc(object): |
| logging.info('Updating to version: %s', bot_version) |
| request = swarming_bot_pb2.BotUpdateRequest() |
| request.bot_version = bot_version |
| - response = self._stub.BotUpdate(request, timeout=NET_CONNECTION_TIMEOUT_SEC) |
| + response = call_grpc(self._stub.BotUpdate, request) |
| with open(new_zip_fn, 'wb') as f: |
| f.write(response.bot_code) |
| @@ -221,3 +225,28 @@ def insert_dict_as_submessage(message, keyname, value): |
| """ |
| sub_msg = getattr(message, keyname) |
| google.protobuf.json_format.Parse(json.dumps(value), sub_msg) |
| + |
| + |
| +def call_grpc(method, request): |
| + """Retries a command a set number of times""" |
| + num_attempts = 0 |
|
M-A Ruel
2016/12/20 15:05:59
for num_attempts in xrange(MAX_GRPC_ATTEMPTS):
aludwin
2016/12/20 19:51:44
Hmm, the reason I made it a for loop in the first
M-A Ruel
2016/12/20 19:55:36
I don't mind much.
|
| + while True: |
| + try: |
| + num_attempts += 1 |
| + response = method(request, timeout=NET_CONNECTION_TIMEOUT_SEC) |
| + if num_attempts > 1: |
| + logging.warning('call_grpc succeeded after %d attempts', num_attempts) |
|
M-A Ruel
2016/12/20 15:05:59
IMHO it'd not needed since we can infer from the o
aludwin
2016/12/20 16:42:35
I thought it was nice to see but I agree it's tech
|
| + return response |
| + except grpc.RpcError as rpc_error: |
| + logging.warning('call_grpc - gRPC error: %s', str(rpc_error)) |
|
M-A Ruel
2016/12/20 15:05:59
str() is not needed
aludwin
2016/12/20 16:42:35
Done.
|
| + if (rpc_error.code() is grpc.StatusCode.UNAVAILABLE |
| + and num_attempts < MAX_GRPC_ATTEMPTS): |
| + logging.warning('Swallowing UNAVAILABLE error (attempt %d/%d)', |
| + num_attempts, MAX_GRPC_ATTEMPTS) |
| + time.sleep(1) |
|
M-A Ruel
2016/12/20 15:05:59
You need exponential backoff.
e.g. https://en.wiki
aludwin
2016/12/20 16:42:35
What would you recommend as the initial, maximum a
M-A Ruel
2016/12/20 17:01:45
Here's an idea:
https://github.com/luci/luci-py/bl
|
| + else: |
| + logging.error('Cannot recover from gRPC error; propagating') |
| + raise |
| + except Exception as e: |
|
M-A Ruel
2016/12/20 15:05:59
I don't think this is needed.
aludwin
2016/12/20 16:42:35
Which part? I just want to catch it so I can log i
M-A Ruel
2016/12/20 17:01:45
Whatever catches the unrelated exception will like
|
| + logging.error('call_grpc - non-gRPC error: %s', str(e)) |
| + raise |