Chromium Code Reviews| Index: client/isolate_storage.py |
| diff --git a/client/isolate_storage.py b/client/isolate_storage.py |
| index 63234b0a1295d85e9e700aa057373ff5516f05be..a550bfe2dff155a85356e0891743b3db1004b3f7 100644 |
| --- a/client/isolate_storage.py |
| +++ b/client/isolate_storage.py |
| @@ -9,11 +9,13 @@ import base64 |
| import binascii |
| import collections |
| import logging |
| +import os |
| import re |
| import sys |
| import threading |
| import time |
| import types |
| +import uuid |
| from utils import file_path |
| from utils import net |
| @@ -22,13 +24,23 @@ import isolated_format |
| # gRPC may not be installed on the worker machine. This is fine, as long as |
| # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). |
| +# Full external requirements are: grpcio, certifi. |
| try: |
| import grpc |
| - from proto import isolate_bot_pb2 |
| -except ImportError: |
| + from google import auth as google_auth |
| + from google.auth.transport import grpc as google_auth_transport_grpc |
| + from google.auth.transport import requests as google_auth_transport_requests |
| + from proto import bytestream_pb2 |
| +except ImportError as err: |
| grpc = None |
| - isolate_bot_pb2 = None |
| + bytestream_pb2 = None |
| +# If gRPC is installed, at least give a warning if certifi is not. |
| +if grpc is not None: |
| + try: |
| + import certifi |
|
Vadim Sh.
2017/06/27 19:12:49
where is this used?
aludwin
2017/06/27 19:23:06
It's conditionally imported by google.auth.transpo
Vadim Sh.
2017/06/27 19:27:09
Yeah, leave a comment.
aludwin
2017/06/27 20:10:17
Done.
|
| + except ImportError as err: |
| + logging.warning('could not import certifi; gRPC HTTPS connections may fail') |
| # Chunk size to use when reading from network stream. |
| NET_IO_FILE_CHUNK = 16 * 1024 |
| @@ -506,20 +518,61 @@ class IsolateServerGrpc(StorageApi): |
| def __init__(self, server, namespace): |
| super(IsolateServerGrpc, self).__init__() |
| logging.info('Using gRPC for Isolate') |
| + self._server = server |
| + self._lock = threading.Lock() |
| + self._memory_use = 0 |
| + self._num_pushes = 0 |
| + self._already_exists = 0 |
| # Make sure grpc was successfully imported |
| assert grpc |
| - assert isolate_bot_pb2 |
| - |
| + assert bytestream_pb2 |
| # Proxies only support the default-gzip namespace for now. |
| - # TODO(aludwin): support other namespaces |
| + # TODO(aludwin): support other namespaces if necessary |
| assert namespace == 'default-gzip' |
| - self._server = server |
| - self._channel = grpc.insecure_channel(server) |
| - self._stub = isolate_bot_pb2.FileServiceStub(self._channel) |
| - self._lock = threading.Lock() |
| - self._memory_use = 0 |
| - logging.info('...gRPC successfully initialized') |
| + |
| + proxy = os.environ.get('ISOLATED_GRPC_PROXY', '') |
| + roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS') |
| + overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE') |
| + |
| + # The "proxy" envvar must be of the form: |
| + # http[s]://<server>[:port][/prefix] |
| + m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) |
| + if not m: |
| + raise ValueError(('gRPC proxy must have the form: ' |
| + 'http[s]://<server>[:port][/prefix] ' |
| + '(given: %s)') % proxy) |
| + transport = m.group(1) |
| + host = m.group(2) |
| + prefix = m.group(3) |
| + if not prefix.endswith('/'): |
| + prefix = prefix + '/' |
| + logging.info('gRPC proxy: transport %s, host %s, prefix %s', |
| + transport, host, prefix) |
| + self._prefix = prefix |
| + |
| + if transport == 'http': |
| + self._channel = grpc.insecure_channel(host) |
| + elif transport == 'https': |
| + # Using cloud container builder scopes for testing: |
| + scopes = ('https://www.googleapis.com/auth/cloud-build-service',) |
| + credentials, _ = google_auth.default(scopes=scopes) |
|
Vadim Sh.
2017/06/27 19:12:49
If you are requiring configured Application Defaul
|
| + request = google_auth_transport_requests.Request() |
| + options = () |
| + root_certs = None |
| + if roots is not None: |
| + logging.info('Using root CA %s', roots) |
| + with open(roots) as f: |
| + root_certs = f.read() |
| + if overd is not None: |
| + logging.info('Using TLS server override %s', overd) |
| + options=(('grpc.ssl_target_name_override', overd),) |
| + ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) |
| + self._channel = google_auth_transport_grpc.secure_authorized_channel( |
| + credentials, request, host, ssl_creds, options=options) |
| + else: |
| + raise ValueError('unknown transport %s (should be http[s])' % transport) |
| + self._stub = bytestream_pb2.ByteStreamStub(self._channel) |
|
Vadim Sh.
2017/06/27 19:12:49
if this object thread safe? 'fetch' and 'push' wil
aludwin
2017/06/27 20:10:17
Yup: https://github.com/grpc/grpc/issues/9320
|
| @property |
| def location(self): |
| @@ -536,24 +589,13 @@ class IsolateServerGrpc(StorageApi): |
| def fetch(self, digest, offset=0): |
| # The gRPC APIs only work with an offset of 0 |
| assert offset == 0 |
| - request = isolate_bot_pb2.FetchBlobsRequest() |
| - req_digest = request.digest.add() |
| - # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte |
| - # array (like [0x01, 0x2a, 0xbc]). |
| - req_digest.digest = binascii.unhexlify(digest) |
| - expected_offset = 0 |
| + request = bytestream_pb2.ReadRequest() |
| + #TODO(aludwin): send the expected size of the item |
| + request.resource_name = '%sblobs/%s/0' % ( |
| + self._prefix, digest) |
| try: |
| - for response in self._stub.FetchBlobs(request, |
| - timeout=DOWNLOAD_READ_TIMEOUT): |
| - if not response.status.succeeded: |
| - raise IOError( |
| - 'Error while fetching %s: %s' % (digest, response.status)) |
| - if not expected_offset == response.data.offset: |
| - raise IOError( |
| - 'Error while fetching %s: expected offset %d, got %d' % ( |
| - digest, expected_offset, response.data.offset)) |
| - expected_offset += len(response.data.data) |
| - yield response.data.data |
| + for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT): |
| + yield response.data |
| except grpc.RpcError as g: |
| logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
| raise IOError(g) |
| @@ -567,6 +609,7 @@ class IsolateServerGrpc(StorageApi): |
| # Default to item.content(). |
| content = item.content() if content is None else content |
| guard_memory_use(self, content, item.size) |
| + self._num_pushes += 1 |
| try: |
| def chunker(): |
| @@ -580,34 +623,48 @@ class IsolateServerGrpc(StorageApi): |
| def slicer(): |
| # Ensures every bit of content is under the gRPC max size; yields |
| # proto messages to send via gRPC. |
| - request = isolate_bot_pb2.PushBlobsRequest() |
| - request.data.digest.digest = binascii.unhexlify(item.digest) |
| - request.data.digest.size_bytes = item.size |
| - request.data.offset = 0 |
| + request = bytestream_pb2.WriteRequest() |
| + u = uuid.uuid4() |
| + request.resource_name = '%suploads/%s/blobs/%s/%d' % ( |
| + self._prefix, u, item.digest, item.size) |
| + request.write_offset = 0 |
| for chunk in chunker(): |
| # Make sure we send at least one chunk for zero-length blobs |
| has_sent_anything = False |
| while chunk or not has_sent_anything: |
| + has_sent_anything = True |
| slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |
| - request.data.data = chunk[:slice_len] |
| + request.data = chunk[:slice_len] |
| + if request.write_offset + slice_len == item.size: |
| + request.finish_write = True |
| yield request |
| - has_sent_anything = True |
| - request.data.offset += slice_len |
| - # The proxy only expects the first chunk to have the digest |
| - request.data.ClearField("digest") |
| + request.write_offset += slice_len |
| chunk = chunk[slice_len:] |
| - # TODO(aludwin): batch up several requests to reuse TCP connections |
| try: |
| - response = self._stub.PushBlobs(slicer()) |
| - except grpc.RpcError as g: |
| - logging.error('gRPC error during push: re-throwing as IOError (%s)' % g) |
| - raise IOError(g) |
| + response = self._stub.Write(slicer()) |
| + except grpc.Call as c: |
| + # You might think that errors from gRPC would be rpc.RpcError. You'd |
| + # be... right... but it's *also* an instance of grpc.Call, and that's |
| + # where the status code actually lives. |
| + if c.code() == grpc.StatusCode.ALREADY_EXISTS: |
| + # This is legit - we didn't check before we pushed so no problem if |
| + # it's already there. |
| + self._already_exists += 1 |
| + if self._already_exists % 100 == 0: |
| + logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % ( |
| + self._already_exists, self._num_pushes, |
| + 100.0 * self._already_exists / self._num_pushes)) |
| + else: |
| + logging.error('gRPC error during push: throwing as IOError (%s)' % c) |
| + raise IOError(c) |
| + except Exception as e: |
| + logging.error('error during push: throwing as IOError (%s)' % e) |
|
Vadim Sh.
2017/06/27 19:12:49
nit: use 2 spaces for indentation
Vadim Sh.
2017/06/27 21:14:29
Ping. It is 4 spaces now.
aludwin
2017/06/28 13:23:53
Done.
|
| + raise IOError(e) |
| - if not response.status.succeeded: |
| - raise IOError( |
| - 'Error while uploading %s: %s' % ( |
| - item.digest, response.status.error_detail)) |
| + if response.committed_size != item.size: |
| + raise IOError('%s/%d: incorrect size written (%d)' % ( |
| + item.digest, item.size, response.committed_size)) |
| finally: |
| with self._lock: |
| @@ -615,38 +672,16 @@ class IsolateServerGrpc(StorageApi): |
| def contains(self, items): |
| """Returns the set of all missing items.""" |
| + # TODO(aludwin): this isn't supported directly in Bytestream, so for now |
| + # assume that nothing is present in the cache. |
| # Ensure all items were initialized with 'prepare' call. Storage does that. |
| assert all(i.digest is not None and i.size is not None for i in items) |
| - request = isolate_bot_pb2.ContainsRequest() |
| - items_by_digest = {} |
| - for item in items: |
| - cd = request.digest.add() |
| - cd.digest = binascii.unhexlify(item.digest) |
| - items_by_digest[cd.digest] = item |
| - try: |
| - response = self._stub.Contains(request) |
| - except grpc.RpcError as g: |
| - logging.error('gRPC error during contains: re-throwing as IOError (%s)' |
| - % g) |
| - raise IOError(g) |
| - |
| - # If everything's present, return the empty set. |
| - if response.status.succeeded: |
| - return {} |
| - |
| - if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST: |
| - raise IOError('Unknown response during lookup: %s' % response.status) |
| - |
| - # Pick Items that are missing, attach _PushState to them. The gRPC |
| - # implementation doesn't actually have a push state, we just attach |
| - # empty objects to satisfy the StorageApi interface. |
| + # Assume all Items are missing, and attach _PushState to them. The gRPC |
| + # implementation doesn't actually have a push state, we just attach empty |
| + # objects to satisfy the StorageApi interface. |
| missing_items = {} |
| - for missing in response.status.missing_digest: |
| - item = items_by_digest[missing.digest] |
| + for item in items: |
| missing_items[item] = _IsolateServerGrpcPushState() |
| - |
| - logging.info('Queried %d files, %d cache hit', |
| - len(items), len(items) - len(missing_items)) |
| return missing_items |