| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright 2016 The LUCI Authors. All rights reserved. | 2 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
| 5 | 5 |
| 6 """A low-level blob storage/retrieval interface to the Isolate server""" | 6 """A low-level blob storage/retrieval interface to the Isolate server""" |
| 7 | 7 |
| 8 import base64 | 8 import base64 |
| 9 import collections | 9 import collections |
| 10 import logging | 10 import logging |
| 11 import os | 11 import os |
| 12 import re | 12 import re |
| 13 import sys | 13 import sys |
| 14 import threading | 14 import threading |
| 15 import time | 15 import time |
| 16 import types | 16 import types |
| 17 import uuid | 17 import uuid |
| 18 | 18 |
| 19 from utils import file_path | 19 from utils import file_path |
| 20 from utils import net | 20 from utils import net |
| 21 | 21 |
| 22 import isolated_format | 22 import isolated_format |
| 23 | 23 |
| 24 # gRPC may not be installed on the worker machine. This is fine, as long as | |
| 25 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). | |
| 26 # Full external requirements are: grpcio, certifi. | |
| 27 try: | 24 try: |
| 28 import grpc | 25 import grpc # for error codes |
| 29 from google import auth as google_auth | 26 from utils import grpc_proxy |
| 30 from google.auth.transport import grpc as google_auth_transport_grpc | |
| 31 from google.auth.transport import requests as google_auth_transport_requests | |
| 32 from proto import bytestream_pb2 | 27 from proto import bytestream_pb2 |
| 33 except ImportError as err: | 28 except ImportError as err: |
| 34 grpc = None | 29 grpc = None |
| 30 grpc_proxy = None |
| 35 bytestream_pb2 = None | 31 bytestream_pb2 = None |
| 36 | 32 |
| 37 # If gRPC is installed, at least give a warning if certifi is not. This is not | |
| 38 # actually used anywhere in this module, but if certifi is missing, | |
| 39 # google.auth.transport will fail with | |
| 40 # https://stackoverflow.com/questions/24973326 | |
| 41 certifi = None | |
| 42 if grpc is not None: | |
| 43 try: | |
| 44 import certifi | |
| 45 except ImportError: | |
| 46 # Could not import certifi; gRPC HTTPS connections may fail. This will be | |
| 47 # logged in IsolateServerGrpc.__init__, since the logger is not configured | |
| 48 # during the import time. | |
| 49 pass | |
| 50 | 33 |
| 51 # Chunk size to use when reading from network stream. | 34 # Chunk size to use when reading from network stream. |
| 52 NET_IO_FILE_CHUNK = 16 * 1024 | 35 NET_IO_FILE_CHUNK = 16 * 1024 |
| 53 | 36 |
| 54 | 37 |
| 55 # Read timeout in seconds for downloads from isolate storage. If there's no | 38 # Read timeout in seconds for downloads from isolate storage. If there's no |
| 56 # response from the server within this timeout whole download will be aborted. | 39 # response from the server within this timeout whole download will be aborted. |
| 57 DOWNLOAD_READ_TIMEOUT = 60 | 40 DOWNLOAD_READ_TIMEOUT = 60 |
| 58 | 41 |
| 59 | 42 |
| (...skipping 466 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 526 class IsolateServerGrpc(StorageApi): | 509 class IsolateServerGrpc(StorageApi): |
| 527 """StorageApi implementation that downloads and uploads to a gRPC service. | 510 """StorageApi implementation that downloads and uploads to a gRPC service. |
| 528 | 511 |
| 529 Limitations: only works for the default-gzip namespace, and with zero offsets | 512 Limitations: only works for the default-gzip namespace, and with zero offsets |
| 530 while fetching. | 513 while fetching. |
| 531 """ | 514 """ |
| 532 | 515 |
| 533 def __init__(self, server, namespace, proxy): | 516 def __init__(self, server, namespace, proxy): |
| 534 super(IsolateServerGrpc, self).__init__() | 517 super(IsolateServerGrpc, self).__init__() |
| 535 logging.info('Using gRPC for Isolate') | 518 logging.info('Using gRPC for Isolate') |
| 536 if not certifi: | 519 # Proxies only support the default-gzip namespace for now. |
| 537 logging.warning( | 520 # TODO(aludwin): support other namespaces if necessary |
| 538 'Could not import certifi; gRPC HTTPS connections may fail') | 521 assert namespace == 'default-gzip' |
| 539 self._server = server | 522 self._server = server |
| 540 self._lock = threading.Lock() | 523 self._lock = threading.Lock() |
| 541 self._memory_use = 0 | 524 self._memory_use = 0 |
| 542 self._num_pushes = 0 | 525 self._num_pushes = 0 |
| 543 self._already_exists = 0 | 526 self._already_exists = 0 |
| 544 | 527 self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub) |
| 545 # Proxies only support the default-gzip namespace for now. | |
| 546 # TODO(aludwin): support other namespaces if necessary | |
| 547 assert namespace == 'default-gzip' | |
| 548 self._namespace = namespace | 528 self._namespace = namespace |
| 549 | 529 |
| 550 # Make sure grpc was successfully imported | |
| 551 assert grpc | |
| 552 assert bytestream_pb2 | |
| 553 | |
| 554 roots = os.environ.get('ISOLATE_GRPC_PROXY_TLS_ROOTS') | |
| 555 overd = os.environ.get('ISOLATE_GRPC_PROXY_TLS_OVERRIDE') | |
| 556 | |
| 557 # The "proxy" envvar must be of the form: | |
| 558 # http[s]://<server>[:port][/prefix] | |
| 559 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) | |
| 560 if not m: | |
| 561 raise ValueError(('gRPC proxy must have the form: ' | |
| 562 'http[s]://<server>[:port][/prefix] ' | |
| 563 '(given: %s)') % proxy) | |
| 564 transport = m.group(1) | |
| 565 host = m.group(2) | |
| 566 prefix = m.group(3) | |
| 567 if not prefix.endswith('/'): | |
| 568 prefix = prefix + '/' | |
| 569 logging.info('gRPC proxy: transport %s, host %s, prefix %s', | |
| 570 transport, host, prefix) | |
| 571 self._prefix = prefix | |
| 572 | |
| 573 if transport == 'http': | |
| 574 self._channel = grpc.insecure_channel(host) | |
| 575 elif transport == 'https': | |
| 576 # Using cloud container builder scopes for testing: | |
| 577 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',) | |
| 578 credentials, _ = google_auth.default(scopes=scopes) | |
| 579 request = google_auth_transport_requests.Request() | |
| 580 options = () | |
| 581 root_certs = None | |
| 582 if roots is not None: | |
| 583 logging.info('Using root CA %s', roots) | |
| 584 with open(roots) as f: | |
| 585 root_certs = f.read() | |
| 586 if overd is not None: | |
| 587 logging.info('Using TLS server override %s', overd) | |
| 588 options=(('grpc.ssl_target_name_override', overd),) | |
| 589 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) | |
| 590 self._channel = google_auth_transport_grpc.secure_authorized_channel( | |
| 591 credentials, request, host, ssl_creds, options=options) | |
| 592 else: | |
| 593 raise ValueError('unknown transport %s (should be http[s])' % transport) | |
| 594 self._stub = bytestream_pb2.ByteStreamStub(self._channel) | |
| 595 | 530 |
| 596 @property | 531 @property |
| 597 def location(self): | 532 def location(self): |
| 598 return self._server | 533 return self._server |
| 599 | 534 |
| 600 @property | 535 @property |
| 601 def namespace(self): | 536 def namespace(self): |
| 602 return self._namespace | 537 return self._namespace |
| 603 | 538 |
| 604 @property | 539 @property |
| 605 def internal_compression(self): | 540 def internal_compression(self): |
| 606 # gRPC natively compresses all messages before transmission. | 541 # gRPC natively compresses all messages before transmission. |
| 607 return True | 542 return True |
| 608 | 543 |
| 609 def fetch(self, digest, offset=0): | 544 def fetch(self, digest, offset=0): |
| 610 # The gRPC APIs only work with an offset of 0 | 545 # The gRPC APIs only work with an offset of 0 |
| 611 assert offset == 0 | 546 assert offset == 0 |
| 612 request = bytestream_pb2.ReadRequest() | 547 request = bytestream_pb2.ReadRequest() |
| 613 #TODO(aludwin): send the expected size of the item | 548 #TODO(aludwin): send the expected size of the item |
| 614 request.resource_name = '%sblobs/%s/0' % ( | 549 request.resource_name = '%s/blobs/%s/0' % ( |
| 615 self._prefix, digest) | 550 self._proxy.prefix, digest) |
| 616 try: | 551 try: |
| 617 for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT): | 552 for response in self._proxy.get_stream('Read', request): |
| 618 yield response.data | 553 yield response.data |
| 619 except grpc.RpcError as g: | 554 except grpc.RpcError as g: |
| 620 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) | 555 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
| 621 raise IOError(g) | 556 raise IOError(g) |
| 622 | 557 |
| 623 def push(self, item, push_state, content=None): | 558 def push(self, item, push_state, content=None): |
| 624 assert isinstance(item, Item) | 559 assert isinstance(item, Item) |
| 625 assert item.digest is not None | 560 assert item.digest is not None |
| 626 assert item.size is not None | 561 assert item.size is not None |
| 627 assert isinstance(push_state, _IsolateServerGrpcPushState) | 562 assert isinstance(push_state, _IsolateServerGrpcPushState) |
| (...skipping 10 matching lines...) Expand all Loading... |
| 638 or not isinstance(content, collections.Iterable)): | 573 or not isinstance(content, collections.Iterable)): |
| 639 yield content | 574 yield content |
| 640 else: | 575 else: |
| 641 for chunk in content: | 576 for chunk in content: |
| 642 yield chunk | 577 yield chunk |
| 643 def slicer(): | 578 def slicer(): |
| 644 # Ensures every bit of content is under the gRPC max size; yields | 579 # Ensures every bit of content is under the gRPC max size; yields |
| 645 # proto messages to send via gRPC. | 580 # proto messages to send via gRPC. |
| 646 request = bytestream_pb2.WriteRequest() | 581 request = bytestream_pb2.WriteRequest() |
| 647 u = uuid.uuid4() | 582 u = uuid.uuid4() |
| 648 request.resource_name = '%suploads/%s/blobs/%s/%d' % ( | 583 request.resource_name = '%s/uploads/%s/blobs/%s/%d' % ( |
| 649 self._prefix, u, item.digest, item.size) | 584 self._proxy.prefix, u, item.digest, item.size) |
| 650 request.write_offset = 0 | 585 request.write_offset = 0 |
| 651 for chunk in chunker(): | 586 for chunk in chunker(): |
| 652 # Make sure we send at least one chunk for zero-length blobs | 587 # Make sure we send at least one chunk for zero-length blobs |
| 653 has_sent_anything = False | 588 has_sent_anything = False |
| 654 while chunk or not has_sent_anything: | 589 while chunk or not has_sent_anything: |
| 655 has_sent_anything = True | 590 has_sent_anything = True |
| 656 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) | 591 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |
| 657 request.data = chunk[:slice_len] | 592 request.data = chunk[:slice_len] |
| 658 if request.write_offset + slice_len == item.size: | 593 if request.write_offset + slice_len == item.size: |
| 659 request.finish_write = True | 594 request.finish_write = True |
| 660 yield request | 595 yield request |
| 661 request.write_offset += slice_len | 596 request.write_offset += slice_len |
| 662 chunk = chunk[slice_len:] | 597 chunk = chunk[slice_len:] |
| 663 | 598 |
| 664 response = None | 599 response = None |
| 665 try: | 600 try: |
| 666 response = self._stub.Write(slicer()) | 601 response = self._proxy.call_no_retries('Write', slicer()) |
| 667 except grpc.RpcError as r: | 602 except grpc.RpcError as r: |
| 668 if r.code() == grpc.StatusCode.ALREADY_EXISTS: | 603 if r.code() == grpc.StatusCode.ALREADY_EXISTS: |
| 669 # This is legit - we didn't check before we pushed so no problem if | 604 # This is legit - we didn't check before we pushed so no problem if |
| 670 # it's already there. | 605 # it's already there. |
| 671 self._already_exists += 1 | 606 self._already_exists += 1 |
| 672 if self._already_exists % 100 == 0: | 607 if self._already_exists % 100 == 0: |
| 673 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % ( | 608 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % ( |
| 674 self._already_exists, self._num_pushes, | 609 self._already_exists, self._num_pushes, |
| 675 100.0 * self._already_exists / self._num_pushes)) | 610 100.0 * self._already_exists / self._num_pushes)) |
| 676 else: | 611 else: |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 722 namespace: isolate namespace to operate in, also defines hashing and | 657 namespace: isolate namespace to operate in, also defines hashing and |
| 723 compression scheme used, i.e. namespace names that end with '-gzip' | 658 compression scheme used, i.e. namespace names that end with '-gzip' |
| 724 store compressed data. | 659 store compressed data. |
| 725 | 660 |
| 726 Returns: | 661 Returns: |
| 727 Instance of StorageApi subclass. | 662 Instance of StorageApi subclass. |
| 728 """ | 663 """ |
| 729 if _grpc_proxy is not None: | 664 if _grpc_proxy is not None: |
| 730 return IsolateServerGrpc(url, namespace, _grpc_proxy) | 665 return IsolateServerGrpc(url, namespace, _grpc_proxy) |
| 731 return IsolateServer(url, namespace) | 666 return IsolateServer(url, namespace) |
| OLD | NEW |