| Index: client/isolate_storage.py
|
| diff --git a/client/isolate_storage.py b/client/isolate_storage.py
|
| index a158bbf3a3e5c5dc69e2ce70102abe944760cf6d..ef0d79eae0d7a0899af1ba19600363976f7b0493 100644
|
| --- a/client/isolate_storage.py
|
| +++ b/client/isolate_storage.py
|
| @@ -21,32 +21,15 @@ from utils import net
|
|
|
| import isolated_format
|
|
|
| -# gRPC may not be installed on the worker machine. This is fine, as long as
|
| -# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
|
| -# Full external requirements are: grpcio, certifi.
|
| try:
|
| - import grpc
|
| - from google import auth as google_auth
|
| - from google.auth.transport import grpc as google_auth_transport_grpc
|
| - from google.auth.transport import requests as google_auth_transport_requests
|
| + import grpc # for error codes
|
| + from utils import grpc_proxy
|
| from proto import bytestream_pb2
|
| except ImportError as err:
|
| grpc = None
|
| + grpc_proxy = None
|
| bytestream_pb2 = None
|
|
|
| -# If gRPC is installed, at least give a warning if certifi is not. This is not
|
| -# actually used anywhere in this module, but if certifi is missing,
|
| -# google.auth.transport will fail with
|
| -# https://stackoverflow.com/questions/24973326
|
| -certifi = None
|
| -if grpc is not None:
|
| - try:
|
| - import certifi
|
| - except ImportError:
|
| - # Could not import certifi; gRPC HTTPS connections may fail. This will be
|
| - # logged in IsolateServerGrpc.__init__, since the logger is not configured
|
| - # during the import time.
|
| - pass
|
|
|
| # Chunk size to use when reading from network stream.
|
| NET_IO_FILE_CHUNK = 16 * 1024
|
| @@ -533,65 +516,17 @@ class IsolateServerGrpc(StorageApi):
|
| def __init__(self, server, namespace, proxy):
|
| super(IsolateServerGrpc, self).__init__()
|
| logging.info('Using gRPC for Isolate')
|
| - if not certifi:
|
| - logging.warning(
|
| - 'Could not import certifi; gRPC HTTPS connections may fail')
|
| + # Proxies only support the default-gzip namespace for now.
|
| + # TODO(aludwin): support other namespaces if necessary
|
| + assert namespace == 'default-gzip'
|
| self._server = server
|
| self._lock = threading.Lock()
|
| self._memory_use = 0
|
| self._num_pushes = 0
|
| self._already_exists = 0
|
| -
|
| - # Proxies only support the default-gzip namespace for now.
|
| - # TODO(aludwin): support other namespaces if necessary
|
| - assert namespace == 'default-gzip'
|
| + self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub)
|
| self._namespace = namespace
|
|
|
| - # Make sure grpc was successfully imported
|
| - assert grpc
|
| - assert bytestream_pb2
|
| -
|
| - roots = os.environ.get('ISOLATE_GRPC_PROXY_TLS_ROOTS')
|
| - overd = os.environ.get('ISOLATE_GRPC_PROXY_TLS_OVERRIDE')
|
| -
|
| - # The "proxy" envvar must be of the form:
|
| - # http[s]://<server>[:port][/prefix]
|
| - m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
|
| - if not m:
|
| - raise ValueError(('gRPC proxy must have the form: '
|
| - 'http[s]://<server>[:port][/prefix] '
|
| - '(given: %s)') % proxy)
|
| - transport = m.group(1)
|
| - host = m.group(2)
|
| - prefix = m.group(3)
|
| - if not prefix.endswith('/'):
|
| - prefix = prefix + '/'
|
| - logging.info('gRPC proxy: transport %s, host %s, prefix %s',
|
| - transport, host, prefix)
|
| - self._prefix = prefix
|
| -
|
| - if transport == 'http':
|
| - self._channel = grpc.insecure_channel(host)
|
| - elif transport == 'https':
|
| - # Using cloud container builder scopes for testing:
|
| - scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
|
| - credentials, _ = google_auth.default(scopes=scopes)
|
| - request = google_auth_transport_requests.Request()
|
| - options = ()
|
| - root_certs = None
|
| - if roots is not None:
|
| - logging.info('Using root CA %s', roots)
|
| - with open(roots) as f:
|
| - root_certs = f.read()
|
| - if overd is not None:
|
| - logging.info('Using TLS server override %s', overd)
|
| - options=(('grpc.ssl_target_name_override', overd),)
|
| - ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
|
| - self._channel = google_auth_transport_grpc.secure_authorized_channel(
|
| - credentials, request, host, ssl_creds, options=options)
|
| - else:
|
| - raise ValueError('unknown transport %s (should be http[s])' % transport)
|
| - self._stub = bytestream_pb2.ByteStreamStub(self._channel)
|
|
|
| @property
|
| def location(self):
|
| @@ -611,10 +546,10 @@ class IsolateServerGrpc(StorageApi):
|
| assert offset == 0
|
| request = bytestream_pb2.ReadRequest()
|
| #TODO(aludwin): send the expected size of the item
|
| - request.resource_name = '%sblobs/%s/0' % (
|
| - self._prefix, digest)
|
| + request.resource_name = '%s/blobs/%s/0' % (
|
| + self._proxy.prefix, digest)
|
| try:
|
| - for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT):
|
| + for response in self._proxy.get_stream('Read', request):
|
| yield response.data
|
| except grpc.RpcError as g:
|
| logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
|
| @@ -645,8 +580,8 @@ class IsolateServerGrpc(StorageApi):
|
| # proto messages to send via gRPC.
|
| request = bytestream_pb2.WriteRequest()
|
| u = uuid.uuid4()
|
| - request.resource_name = '%suploads/%s/blobs/%s/%d' % (
|
| - self._prefix, u, item.digest, item.size)
|
| + request.resource_name = '%s/uploads/%s/blobs/%s/%d' % (
|
| + self._proxy.prefix, u, item.digest, item.size)
|
| request.write_offset = 0
|
| for chunk in chunker():
|
| # Make sure we send at least one chunk for zero-length blobs
|
| @@ -663,7 +598,7 @@ class IsolateServerGrpc(StorageApi):
|
|
|
| response = None
|
| try:
|
| - response = self._stub.Write(slicer())
|
| + response = self._proxy.call_no_retries('Write', slicer())
|
| except grpc.RpcError as r:
|
| if r.code() == grpc.StatusCode.ALREADY_EXISTS:
|
| # This is legit - we didn't check before we pushed so no problem if
|
|
|