| Index: client/isolate_storage.py
|
| diff --git a/client/isolate_storage.py b/client/isolate_storage.py
|
| index 63234b0a1295d85e9e700aa057373ff5516f05be..67314c58411b61587780d25a435d2def15048d23 100644
|
| --- a/client/isolate_storage.py
|
| +++ b/client/isolate_storage.py
|
| @@ -9,11 +9,13 @@ import base64
|
| import binascii
|
| import collections
|
| import logging
|
| +import os
|
| import re
|
| import sys
|
| import threading
|
| import time
|
| import types
|
| +import uuid
|
|
|
| from utils import file_path
|
| from utils import net
|
| @@ -22,13 +24,23 @@ import isolated_format
|
|
|
| # gRPC may not be installed on the worker machine. This is fine, as long as
|
| # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
|
| +# Full external requirements are: grpcio, certifi.
|
| try:
|
| import grpc
|
| - from proto import isolate_bot_pb2
|
| -except ImportError:
|
| + from google import auth as google_auth
|
| + from google.auth.transport import grpc as google_auth_transport_grpc
|
| + from google.auth.transport import requests as google_auth_transport_requests
|
| + from proto import bytestream_pb2
|
| +except ImportError as err:
|
| grpc = None
|
| - isolate_bot_pb2 = None
|
| + bytestream_pb2 = None
|
|
|
| +# If gRPC is installed, at least give a warning if certifi is not.
|
| +if grpc is not None:
|
| + try:
|
| + import certifi
|
| + except ImportError as err:
|
| + logging.warning('could not import certifi; gRPC HTTPS connections may fail')
|
|
|
| # Chunk size to use when reading from network stream.
|
| NET_IO_FILE_CHUNK = 16 * 1024
|
| @@ -506,20 +518,61 @@ class IsolateServerGrpc(StorageApi):
|
| def __init__(self, server, namespace):
|
| super(IsolateServerGrpc, self).__init__()
|
| logging.info('Using gRPC for Isolate')
|
| + self._server = server
|
| + self._lock = threading.Lock()
|
| + self._memory_use = 0
|
| + self._num_pushes = 0
|
| + self._already_exists = 0
|
|
|
| # Make sure grpc was successfully imported
|
| assert grpc
|
| - assert isolate_bot_pb2
|
| -
|
| + assert bytestream_pb2
|
| # Proxies only support the default-gzip namespace for now.
|
| - # TODO(aludwin): support other namespaces
|
| + # TODO(aludwin): support other namespaces if necessary
|
| assert namespace == 'default-gzip'
|
| - self._server = server
|
| - self._channel = grpc.insecure_channel(server)
|
| - self._stub = isolate_bot_pb2.FileServiceStub(self._channel)
|
| - self._lock = threading.Lock()
|
| - self._memory_use = 0
|
| - logging.info('...gRPC successfully initialized')
|
| +
|
| + proxy = os.environ.get('ISOLATED_GRPC_PROXY', '')
|
| + roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS')
|
| + overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE')
|
| +
|
| + # The "proxy" envvar must be of the form:
|
| + # http[s]://<server>[:port][/prefix]
|
| + m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
|
| + if not m:
|
| + raise ValueError(('gRPC proxy must have the form: '
|
| + 'http[s]://<server>[:port][/prefix] '
|
| + '(given: %s)') % proxy)
|
| + transport = m.group(1)
|
| + host = m.group(2)
|
| + prefix = m.group(3)
|
| + if not prefix.endswith('/'):
|
| + prefix = prefix + '/'
|
| + logging.info('gRPC proxy: transport %s, host %s, prefix %s',
|
| + transport, host, prefix)
|
| + self._prefix = prefix
|
| +
|
| + if transport == 'http':
|
| + self._channel = grpc.insecure_channel(host)
|
| + elif transport == 'https':
|
| + # Using cloud container builder scopes for testing:
|
| + scopes = ['https://www.googleapis.com/auth/cloud-build-service']
|
| + credentials, _ = google_auth.default(scopes=scopes)
|
| + request = google_auth_transport_requests.Request()
|
| + options = ()
|
| + root_certs = None
|
| + if roots is not None:
|
| + logging.info('Using root CA %s', roots)
|
| + with open(roots) as f:
|
| + root_certs = f.read()
|
| + if overd is not None:
|
| + logging.info('Using TLS server override %s', overd)
|
| + options=(('grpc.ssl_target_name_override', overd),)
|
| + ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
|
| + self._channel = google_auth_transport_grpc.secure_authorized_channel(
|
| + credentials, request, host, ssl_creds, options=options)
|
| + else:
|
| + raise ValueError('unknown transport %s (should be http[s])' % transport)
|
| + self._stub = bytestream_pb2.ByteStreamStub(self._channel)
|
|
|
| @property
|
| def location(self):
|
| @@ -536,24 +589,13 @@ class IsolateServerGrpc(StorageApi):
|
| def fetch(self, digest, offset=0):
|
| # The gRPC APIs only work with an offset of 0
|
| assert offset == 0
|
| - request = isolate_bot_pb2.FetchBlobsRequest()
|
| - req_digest = request.digest.add()
|
| - # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte
|
| - # array (like [0x01, 0x2a, 0xbc]).
|
| - req_digest.digest = binascii.unhexlify(digest)
|
| - expected_offset = 0
|
| + request = bytestream_pb2.ReadRequest()
|
| + #TODO(aludwin): send the expected size of the item
|
| + request.resource_name = '%sblobs/%s/0' % (
|
| + self._prefix, digest)
|
| try:
|
| - for response in self._stub.FetchBlobs(request,
|
| - timeout=DOWNLOAD_READ_TIMEOUT):
|
| - if not response.status.succeeded:
|
| - raise IOError(
|
| - 'Error while fetching %s: %s' % (digest, response.status))
|
| - if not expected_offset == response.data.offset:
|
| - raise IOError(
|
| - 'Error while fetching %s: expected offset %d, got %d' % (
|
| - digest, expected_offset, response.data.offset))
|
| - expected_offset += len(response.data.data)
|
| - yield response.data.data
|
| + for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT):
|
| + yield response.data
|
| except grpc.RpcError as g:
|
| logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
|
| raise IOError(g)
|
| @@ -567,6 +609,7 @@ class IsolateServerGrpc(StorageApi):
|
| # Default to item.content().
|
| content = item.content() if content is None else content
|
| guard_memory_use(self, content, item.size)
|
| + self._num_pushes += 1
|
|
|
| try:
|
| def chunker():
|
| @@ -580,34 +623,48 @@ class IsolateServerGrpc(StorageApi):
|
| def slicer():
|
| # Ensures every bit of content is under the gRPC max size; yields
|
| # proto messages to send via gRPC.
|
| - request = isolate_bot_pb2.PushBlobsRequest()
|
| - request.data.digest.digest = binascii.unhexlify(item.digest)
|
| - request.data.digest.size_bytes = item.size
|
| - request.data.offset = 0
|
| + request = bytestream_pb2.WriteRequest()
|
| + u = uuid.uuid4()
|
| + request.resource_name = '%suploads/%s/blobs/%s/%d' % (
|
| + self._prefix, u, item.digest, item.size)
|
| + request.write_offset = 0
|
| for chunk in chunker():
|
| # Make sure we send at least one chunk for zero-length blobs
|
| has_sent_anything = False
|
| while chunk or not has_sent_anything:
|
| + has_sent_anything = True
|
| slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
|
| - request.data.data = chunk[:slice_len]
|
| + request.data = chunk[:slice_len]
|
| + if request.write_offset + slice_len == item.size:
|
| + request.finish_write = True
|
| yield request
|
| - has_sent_anything = True
|
| - request.data.offset += slice_len
|
| - # The proxy only expects the first chunk to have the digest
|
| - request.data.ClearField("digest")
|
| + request.write_offset += slice_len
|
| chunk = chunk[slice_len:]
|
|
|
| - # TODO(aludwin): batch up several requests to reuse TCP connections
|
| try:
|
| - response = self._stub.PushBlobs(slicer())
|
| - except grpc.RpcError as g:
|
| - logging.error('gRPC error during push: re-throwing as IOError (%s)' % g)
|
| - raise IOError(g)
|
| + response = self._stub.Write(slicer())
|
| + except grpc.Call as c:
|
| + # You might think that errors from gRPC would be rpc.RpcError. You'd
|
| + # be... right... but it's *also* an instance of grpc.Call, and that's
|
| + # where the status code actually lives.
|
| + if c.code() == grpc.StatusCode.ALREADY_EXISTS:
|
| + # This is legit - we didn't check before we pushed so no problem if
|
| + # it's already there.
|
| + self._already_exists += 1
|
| + if self._already_exists % 100 == 0:
|
| + logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
|
| + self._already_exists, self._num_pushes,
|
| + 100.0 * self._already_exists / self._num_pushes))
|
| + else:
|
| + logging.error('gRPC error during push: throwing as IOError (%s)' % c)
|
| + raise IOError(c)
|
| + except Exception as e:
|
| + logging.error('error during push: throwing as IOError (%s)' % e)
|
| + raise IOError(e)
|
|
|
| - if not response.status.succeeded:
|
| - raise IOError(
|
| - 'Error while uploading %s: %s' % (
|
| - item.digest, response.status.error_detail))
|
| + if response.committed_size != item.size:
|
| + raise IOError('%s/%d: incorrect size written (%d)' % (
|
| + item.digest, item.size, response.committed_size))
|
|
|
| finally:
|
| with self._lock:
|
| @@ -615,38 +672,16 @@ class IsolateServerGrpc(StorageApi):
|
|
|
| def contains(self, items):
|
| """Returns the set of all missing items."""
|
| + # TODO(aludwin): this isn't supported directly in Bytestream, so for now
|
| + # assume that nothing is present in the cache.
|
| # Ensure all items were initialized with 'prepare' call. Storage does that.
|
| assert all(i.digest is not None and i.size is not None for i in items)
|
| - request = isolate_bot_pb2.ContainsRequest()
|
| - items_by_digest = {}
|
| - for item in items:
|
| - cd = request.digest.add()
|
| - cd.digest = binascii.unhexlify(item.digest)
|
| - items_by_digest[cd.digest] = item
|
| - try:
|
| - response = self._stub.Contains(request)
|
| - except grpc.RpcError as g:
|
| - logging.error('gRPC error during contains: re-throwing as IOError (%s)'
|
| - % g)
|
| - raise IOError(g)
|
| -
|
| - # If everything's present, return the empty set.
|
| - if response.status.succeeded:
|
| - return {}
|
| -
|
| - if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST:
|
| - raise IOError('Unknown response during lookup: %s' % response.status)
|
| -
|
| - # Pick Items that are missing, attach _PushState to them. The gRPC
|
| - # implementation doesn't actually have a push state, we just attach
|
| - # empty objects to satisfy the StorageApi interface.
|
| + # Assume all Items are missing, and attach _PushState to them. The gRPC
|
| + # implementation doesn't actually have a push state, we just attach empty
|
| + # objects to satisfy the StorageApi interface.
|
| missing_items = {}
|
| - for missing in response.status.missing_digest:
|
| - item = items_by_digest[missing.digest]
|
| + for item in items:
|
| missing_items[item] = _IsolateServerGrpcPushState()
|
| -
|
| - logging.info('Queried %d files, %d cache hit',
|
| - len(items), len(items) - len(missing_items))
|
| return missing_items
|
|
|
|
|
|
|