Index: appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py |
diff --git a/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py b/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py |
index 8990ef17e7af4532c3043e043b71938a57211804..6bc4a62cc99201f344f552ab3f5e4296512e7764 100644 |
--- a/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py |
+++ b/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py |
@@ -10,26 +10,13 @@ import json |
import logging |
import time |
-import grpc |
import google.protobuf.json_format |
from proto_bot import swarming_bot_pb2 |
from remote_client_errors import InternalError |
from remote_client_errors import MintOAuthTokenError |
from remote_client_errors import PollError |
from utils import net |
- |
- |
-# How long to wait for a response from the server. Keeping the same as |
-# the equivalent in remote_client.py for now. |
-NET_CONNECTION_TIMEOUT_SEC = 5*60 |
- |
- |
-# How many times to retry a gRPC call |
-MAX_GRPC_ATTEMPTS = 30 |
- |
- |
-# Longest time to sleep between gRPC calls |
-MAX_GRPC_SLEEP = 10. |
+from utils import grpc_proxy |
class RemoteClientGrpc(object): |
@@ -40,11 +27,13 @@ class RemoteClientGrpc(object): |
might be garbage-collected before the values are used. |
""" |
- def __init__(self, server): |
+ def __init__(self, server, fake_proxy=None): |
logging.info('Communicating with host %s via gRPC', server) |
+ if fake_proxy: |
+ self._proxy = fake_proxy |
+ else: |
+ self._proxy = grpc_proxy.Proxy(server, swarming_bot_pb2.BotServiceStub) |
self._server = server |
- self._channel = grpc.insecure_channel(server) |
- self._stub = swarming_bot_pb2.BotServiceStub(self._channel) |
self._log_is_asleep = False |
def is_grpc(self): |
@@ -84,7 +73,7 @@ class RemoteClientGrpc(object): |
google.protobuf.json_format.ParseDict(params, request) |
# Perform update |
- response = call_grpc(self._stub.TaskUpdate, request) |
+ response = self._proxy.call_unary('TaskUpdate', request) |
logging.debug('post_task_update() = %s', request) |
if response.error: |
raise InternalError(response.error) |
@@ -97,7 +86,7 @@ class RemoteClientGrpc(object): |
request.msg = message |
logging.error('post_task_error() = %s', request) |
- response = call_grpc(self._stub.TaskError, request) |
+ response = self._proxy.call_unary('TaskError', request) |
return response.ok |
def _attributes_json_to_proto(self, json_attr, msg): |
@@ -111,7 +100,7 @@ class RemoteClientGrpc(object): |
def do_handshake(self, attributes): |
request = swarming_bot_pb2.HandshakeRequest() |
self._attributes_json_to_proto(attributes, request.attributes) |
- response = call_grpc(self._stub.Handshake, request) |
+ response = self._proxy.call_unary('Handshake', request) |
resp = { |
'server_version': response.server_version, |
'bot_version': response.bot_version, |
@@ -129,7 +118,7 @@ class RemoteClientGrpc(object): |
request = swarming_bot_pb2.PollRequest() |
self._attributes_json_to_proto(attributes, request.attributes) |
# TODO(aludwin): gRPC-specific exception handling (raise PollError). |
- response = call_grpc(self._stub.Poll, request) |
+ response = self._proxy.call_unary('Poll', request) |
if response.cmd == swarming_bot_pb2.PollResponse.UPDATE: |
return 'update', response.version |
@@ -189,7 +178,7 @@ class RemoteClientGrpc(object): |
logging.info('Updating to version: %s', bot_version) |
request = swarming_bot_pb2.BotUpdateRequest() |
request.bot_version = bot_version |
- response = call_grpc(self._stub.BotUpdate, request) |
+ response = self._proxy.call_unary('BotUpdate', request) |
with open(new_zip_fn, 'wb') as f: |
f.write(response.bot_code) |
@@ -244,18 +233,3 @@ def insert_dict_as_submessage(message, keyname, value): |
google.protobuf.json_format.Parse(json.dumps(value), sub_msg) |
-def call_grpc(method, request): |
- """Retries a command a set number of times""" |
- for attempt in range(1, MAX_GRPC_ATTEMPTS+1): |
- try: |
- return method(request, timeout=NET_CONNECTION_TIMEOUT_SEC) |
- except grpc.RpcError as g: |
- if g.code() is not grpc.StatusCode.UNAVAILABLE: |
- raise |
- logging.warning('call_grpc - proxy is unavailable (attempt %d/%d)', |
- attempt, MAX_GRPC_ATTEMPTS) |
- grpc_error = g |
- time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP)) |
- # If we get here, it must be because we got (and saved) an error |
- assert grpc_error is not None |
- raise grpc_error |