| Index: appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
|
| diff --git a/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py b/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
|
| index 8990ef17e7af4532c3043e043b71938a57211804..d2639eb05f9c976cfd006ac459f50ee954887d1d 100644
|
| --- a/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
|
| +++ b/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
|
| @@ -10,26 +10,13 @@ import json
|
| import logging
|
| import time
|
|
|
| -import grpc
|
| import google.protobuf.json_format
|
| from proto_bot import swarming_bot_pb2
|
| from remote_client_errors import InternalError
|
| from remote_client_errors import MintOAuthTokenError
|
| from remote_client_errors import PollError
|
| from utils import net
|
| -
|
| -
|
| -# How long to wait for a response from the server. Keeping the same as
|
| -# the equivalent in remote_client.py for now.
|
| -NET_CONNECTION_TIMEOUT_SEC = 5*60
|
| -
|
| -
|
| -# How many times to retry a gRPC call
|
| -MAX_GRPC_ATTEMPTS = 30
|
| -
|
| -
|
| -# Longest time to sleep between gRPC calls
|
| -MAX_GRPC_SLEEP = 10.
|
| +from utils import grpc_proxy
|
|
|
|
|
| class RemoteClientGrpc(object):
|
| @@ -42,9 +29,8 @@ class RemoteClientGrpc(object):
|
|
|
| def __init__(self, server):
|
| logging.info('Communicating with host %s via gRPC', server)
|
| + self._proxy = grpc_proxy.Proxy(server, swarming_bot_pb2.BotServiceStub)
|
| self._server = server
|
| - self._channel = grpc.insecure_channel(server)
|
| - self._stub = swarming_bot_pb2.BotServiceStub(self._channel)
|
| self._log_is_asleep = False
|
|
|
| def is_grpc(self):
|
| @@ -84,7 +70,7 @@ class RemoteClientGrpc(object):
|
| google.protobuf.json_format.ParseDict(params, request)
|
|
|
| # Perform update
|
| - response = call_grpc(self._stub.TaskUpdate, request)
|
| + response = self._proxy.call_unary('TaskUpdate', request)
|
| logging.debug('post_task_update() = %s', request)
|
| if response.error:
|
| raise InternalError(response.error)
|
| @@ -97,7 +83,7 @@ class RemoteClientGrpc(object):
|
| request.msg = message
|
| logging.error('post_task_error() = %s', request)
|
|
|
| - response = call_grpc(self._stub.TaskError, request)
|
| + response = self._proxy.call_unary('TaskError', request)
|
| return response.ok
|
|
|
| def _attributes_json_to_proto(self, json_attr, msg):
|
| @@ -111,7 +97,7 @@ class RemoteClientGrpc(object):
|
| def do_handshake(self, attributes):
|
| request = swarming_bot_pb2.HandshakeRequest()
|
| self._attributes_json_to_proto(attributes, request.attributes)
|
| - response = call_grpc(self._stub.Handshake, request)
|
| + response = self._proxy.call_unary('Handshake', request)
|
| resp = {
|
| 'server_version': response.server_version,
|
| 'bot_version': response.bot_version,
|
| @@ -129,7 +115,7 @@ class RemoteClientGrpc(object):
|
| request = swarming_bot_pb2.PollRequest()
|
| self._attributes_json_to_proto(attributes, request.attributes)
|
| # TODO(aludwin): gRPC-specific exception handling (raise PollError).
|
| - response = call_grpc(self._stub.Poll, request)
|
| + response = self._proxy.call_unary('Poll', request)
|
|
|
| if response.cmd == swarming_bot_pb2.PollResponse.UPDATE:
|
| return 'update', response.version
|
| @@ -189,7 +175,7 @@ class RemoteClientGrpc(object):
|
| logging.info('Updating to version: %s', bot_version)
|
| request = swarming_bot_pb2.BotUpdateRequest()
|
| request.bot_version = bot_version
|
| - response = call_grpc(self._stub.BotUpdate, request)
|
| + response = self._proxy.call_unary('BotUpdate', request)
|
| with open(new_zip_fn, 'wb') as f:
|
| f.write(response.bot_code)
|
|
|
| @@ -244,18 +230,3 @@ def insert_dict_as_submessage(message, keyname, value):
|
| google.protobuf.json_format.Parse(json.dumps(value), sub_msg)
|
|
|
|
|
| -def call_grpc(method, request):
|
| - """Retries a command a set number of times"""
|
| - for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
|
| - try:
|
| - return method(request, timeout=NET_CONNECTION_TIMEOUT_SEC)
|
| - except grpc.RpcError as g:
|
| - if g.code() is not grpc.StatusCode.UNAVAILABLE:
|
| - raise
|
| - logging.warning('call_grpc - proxy is unavailable (attempt %d/%d)',
|
| - attempt, MAX_GRPC_ATTEMPTS)
|
| - grpc_error = g
|
| - time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
|
| - # If we get here, it must be because we got (and saved) an error
|
| - assert grpc_error is not None
|
| - raise grpc_error
|
|
|