Index: client/isolate_storage.py |
diff --git a/client/isolate_storage.py b/client/isolate_storage.py |
index a158bbf3a3e5c5dc69e2ce70102abe944760cf6d..ef0d79eae0d7a0899af1ba19600363976f7b0493 100644 |
--- a/client/isolate_storage.py |
+++ b/client/isolate_storage.py |
@@ -21,32 +21,15 @@ from utils import net |
import isolated_format |
-# gRPC may not be installed on the worker machine. This is fine, as long as |
-# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). |
-# Full external requirements are: grpcio, certifi. |
try: |
- import grpc |
- from google import auth as google_auth |
- from google.auth.transport import grpc as google_auth_transport_grpc |
- from google.auth.transport import requests as google_auth_transport_requests |
+ import grpc # for error codes |
+ from utils import grpc_proxy |
from proto import bytestream_pb2 |
except ImportError as err: |
grpc = None |
+ grpc_proxy = None |
bytestream_pb2 = None |
-# If gRPC is installed, at least give a warning if certifi is not. This is not |
-# actually used anywhere in this module, but if certifi is missing, |
-# google.auth.transport will fail with |
-# https://stackoverflow.com/questions/24973326 |
-certifi = None |
-if grpc is not None: |
- try: |
- import certifi |
- except ImportError: |
- # Could not import certifi; gRPC HTTPS connections may fail. This will be |
- # logged in IsolateServerGrpc.__init__, since the logger is not configured |
- # during the import time. |
- pass |
# Chunk size to use when reading from network stream. |
NET_IO_FILE_CHUNK = 16 * 1024 |
@@ -533,65 +516,17 @@ class IsolateServerGrpc(StorageApi): |
def __init__(self, server, namespace, proxy): |
super(IsolateServerGrpc, self).__init__() |
logging.info('Using gRPC for Isolate') |
- if not certifi: |
- logging.warning( |
- 'Could not import certifi; gRPC HTTPS connections may fail') |
+ # Proxies only support the default-gzip namespace for now. |
+ # TODO(aludwin): support other namespaces if necessary |
+ assert namespace == 'default-gzip' |
self._server = server |
self._lock = threading.Lock() |
self._memory_use = 0 |
self._num_pushes = 0 |
self._already_exists = 0 |
- |
- # Proxies only support the default-gzip namespace for now. |
- # TODO(aludwin): support other namespaces if necessary |
- assert namespace == 'default-gzip' |
+ self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub) |
self._namespace = namespace |
- # Make sure grpc was successfully imported |
- assert grpc |
- assert bytestream_pb2 |
- |
- roots = os.environ.get('ISOLATE_GRPC_PROXY_TLS_ROOTS') |
- overd = os.environ.get('ISOLATE_GRPC_PROXY_TLS_OVERRIDE') |
- |
- # The "proxy" envvar must be of the form: |
- # http[s]://<server>[:port][/prefix] |
- m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) |
- if not m: |
- raise ValueError(('gRPC proxy must have the form: ' |
- 'http[s]://<server>[:port][/prefix] ' |
- '(given: %s)') % proxy) |
- transport = m.group(1) |
- host = m.group(2) |
- prefix = m.group(3) |
- if not prefix.endswith('/'): |
- prefix = prefix + '/' |
- logging.info('gRPC proxy: transport %s, host %s, prefix %s', |
- transport, host, prefix) |
- self._prefix = prefix |
- |
- if transport == 'http': |
- self._channel = grpc.insecure_channel(host) |
- elif transport == 'https': |
- # Using cloud container builder scopes for testing: |
- scopes = ('https://www.googleapis.com/auth/cloud-source-tools',) |
- credentials, _ = google_auth.default(scopes=scopes) |
- request = google_auth_transport_requests.Request() |
- options = () |
- root_certs = None |
- if roots is not None: |
- logging.info('Using root CA %s', roots) |
- with open(roots) as f: |
- root_certs = f.read() |
- if overd is not None: |
- logging.info('Using TLS server override %s', overd) |
- options=(('grpc.ssl_target_name_override', overd),) |
- ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) |
- self._channel = google_auth_transport_grpc.secure_authorized_channel( |
- credentials, request, host, ssl_creds, options=options) |
- else: |
- raise ValueError('unknown transport %s (should be http[s])' % transport) |
- self._stub = bytestream_pb2.ByteStreamStub(self._channel) |
@property |
def location(self): |
@@ -611,10 +546,10 @@ class IsolateServerGrpc(StorageApi): |
assert offset == 0 |
request = bytestream_pb2.ReadRequest() |
#TODO(aludwin): send the expected size of the item |
- request.resource_name = '%sblobs/%s/0' % ( |
- self._prefix, digest) |
+ request.resource_name = '%s/blobs/%s/0' % ( |
+ self._proxy.prefix, digest) |
try: |
- for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT): |
+ for response in self._proxy.get_stream('Read', request): |
yield response.data |
except grpc.RpcError as g: |
logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
@@ -645,8 +580,8 @@ class IsolateServerGrpc(StorageApi): |
# proto messages to send via gRPC. |
request = bytestream_pb2.WriteRequest() |
u = uuid.uuid4() |
- request.resource_name = '%suploads/%s/blobs/%s/%d' % ( |
- self._prefix, u, item.digest, item.size) |
+ request.resource_name = '%s/uploads/%s/blobs/%s/%d' % ( |
+ self._proxy.prefix, u, item.digest, item.size) |
request.write_offset = 0 |
for chunk in chunker(): |
# Make sure we send at least one chunk for zero-length blobs |
@@ -663,7 +598,7 @@ class IsolateServerGrpc(StorageApi): |
response = None |
try: |
- response = self._stub.Write(slicer()) |
+ response = self._proxy.call_no_retries('Write', slicer()) |
except grpc.RpcError as r: |
if r.code() == grpc.StatusCode.ALREADY_EXISTS: |
# This is legit - we didn't check before we pushed so no problem if |