OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright 2016 The LUCI Authors. All rights reserved. | 2 # Copyright 2016 The LUCI Authors. All rights reserved. |
3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
5 | 5 |
6 """A low-level blob storage/retrieval interface to the Isolate server""" | 6 """A low-level blob storage/retrieval interface to the Isolate server""" |
7 | 7 |
8 import base64 | 8 import base64 |
9 import collections | 9 import collections |
10 import logging | 10 import logging |
11 import os | 11 import os |
12 import re | 12 import re |
13 import sys | 13 import sys |
14 import threading | 14 import threading |
15 import time | 15 import time |
16 import types | 16 import types |
17 import uuid | 17 import uuid |
18 | 18 |
19 from utils import file_path | 19 from utils import file_path |
20 from utils import net | 20 from utils import net |
21 | 21 |
22 import isolated_format | 22 import isolated_format |
23 | 23 |
24 # gRPC may not be installed on the worker machine. This is fine, as long as | |
25 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). | |
26 # Full external requirements are: grpcio, certifi. | |
27 try: | 24 try: |
28 import grpc | 25 import grpc # for error codes |
29 from google import auth as google_auth | 26 from utils import grpc_proxy |
30 from google.auth.transport import grpc as google_auth_transport_grpc | |
31 from google.auth.transport import requests as google_auth_transport_requests | |
32 from proto import bytestream_pb2 | 27 from proto import bytestream_pb2 |
33 except ImportError as err: | 28 except ImportError as err: |
34 grpc = None | 29 grpc = None |
| 30 grpc_proxy = None |
35 bytestream_pb2 = None | 31 bytestream_pb2 = None |
36 | 32 |
37 # If gRPC is installed, at least give a warning if certifi is not. This is not | |
38 # actually used anywhere in this module, but if certifi is missing, | |
39 # google.auth.transport will fail with | |
40 # https://stackoverflow.com/questions/24973326 | |
41 certifi = None | |
42 if grpc is not None: | |
43 try: | |
44 import certifi | |
45 except ImportError: | |
46 # Could not import certifi; gRPC HTTPS connections may fail. This will be | |
47 # logged in IsolateServerGrpc.__init__, since the logger is not configured | |
48 # during the import time. | |
49 pass | |
50 | 33 |
51 # Chunk size to use when reading from network stream. | 34 # Chunk size to use when reading from network stream. |
52 NET_IO_FILE_CHUNK = 16 * 1024 | 35 NET_IO_FILE_CHUNK = 16 * 1024 |
53 | 36 |
54 | 37 |
55 # Read timeout in seconds for downloads from isolate storage. If there's no | 38 # Read timeout in seconds for downloads from isolate storage. If there's no |
56 # response from the server within this timeout whole download will be aborted. | 39 # response from the server within this timeout whole download will be aborted. |
57 DOWNLOAD_READ_TIMEOUT = 60 | 40 DOWNLOAD_READ_TIMEOUT = 60 |
58 | 41 |
59 | 42 |
(...skipping 466 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
526 class IsolateServerGrpc(StorageApi): | 509 class IsolateServerGrpc(StorageApi): |
527 """StorageApi implementation that downloads and uploads to a gRPC service. | 510 """StorageApi implementation that downloads and uploads to a gRPC service. |
528 | 511 |
529 Limitations: only works for the default-gzip namespace, and with zero offsets | 512 Limitations: only works for the default-gzip namespace, and with zero offsets |
530 while fetching. | 513 while fetching. |
531 """ | 514 """ |
532 | 515 |
533 def __init__(self, server, namespace, proxy): | 516 def __init__(self, server, namespace, proxy): |
534 super(IsolateServerGrpc, self).__init__() | 517 super(IsolateServerGrpc, self).__init__() |
535 logging.info('Using gRPC for Isolate') | 518 logging.info('Using gRPC for Isolate') |
536 if not certifi: | 519 # Proxies only support the default-gzip namespace for now. |
537 logging.warning( | 520 # TODO(aludwin): support other namespaces if necessary |
538 'Could not import certifi; gRPC HTTPS connections may fail') | 521 assert namespace == 'default-gzip' |
539 self._server = server | 522 self._server = server |
540 self._lock = threading.Lock() | 523 self._lock = threading.Lock() |
541 self._memory_use = 0 | 524 self._memory_use = 0 |
542 self._num_pushes = 0 | 525 self._num_pushes = 0 |
543 self._already_exists = 0 | 526 self._already_exists = 0 |
544 | 527 self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub) |
545 # Proxies only support the default-gzip namespace for now. | |
546 # TODO(aludwin): support other namespaces if necessary | |
547 assert namespace == 'default-gzip' | |
548 self._namespace = namespace | 528 self._namespace = namespace |
549 | 529 |
550 # Make sure grpc was successfully imported | |
551 assert grpc | |
552 assert bytestream_pb2 | |
553 | |
554 roots = os.environ.get('ISOLATE_GRPC_PROXY_TLS_ROOTS') | |
555 overd = os.environ.get('ISOLATE_GRPC_PROXY_TLS_OVERRIDE') | |
556 | |
557 # The "proxy" envvar must be of the form: | |
558 # http[s]://<server>[:port][/prefix] | |
559 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) | |
560 if not m: | |
561 raise ValueError(('gRPC proxy must have the form: ' | |
562 'http[s]://<server>[:port][/prefix] ' | |
563 '(given: %s)') % proxy) | |
564 transport = m.group(1) | |
565 host = m.group(2) | |
566 prefix = m.group(3) | |
567 if not prefix.endswith('/'): | |
568 prefix = prefix + '/' | |
569 logging.info('gRPC proxy: transport %s, host %s, prefix %s', | |
570 transport, host, prefix) | |
571 self._prefix = prefix | |
572 | |
573 if transport == 'http': | |
574 self._channel = grpc.insecure_channel(host) | |
575 elif transport == 'https': | |
576 # Using cloud container builder scopes for testing: | |
577 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',) | |
578 credentials, _ = google_auth.default(scopes=scopes) | |
579 request = google_auth_transport_requests.Request() | |
580 options = () | |
581 root_certs = None | |
582 if roots is not None: | |
583 logging.info('Using root CA %s', roots) | |
584 with open(roots) as f: | |
585 root_certs = f.read() | |
586 if overd is not None: | |
587 logging.info('Using TLS server override %s', overd) | |
588 options=(('grpc.ssl_target_name_override', overd),) | |
589 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) | |
590 self._channel = google_auth_transport_grpc.secure_authorized_channel( | |
591 credentials, request, host, ssl_creds, options=options) | |
592 else: | |
593 raise ValueError('unknown transport %s (should be http[s])' % transport) | |
594 self._stub = bytestream_pb2.ByteStreamStub(self._channel) | |
595 | 530 |
596 @property | 531 @property |
597 def location(self): | 532 def location(self): |
598 return self._server | 533 return self._server |
599 | 534 |
600 @property | 535 @property |
601 def namespace(self): | 536 def namespace(self): |
602 return self._namespace | 537 return self._namespace |
603 | 538 |
604 @property | 539 @property |
605 def internal_compression(self): | 540 def internal_compression(self): |
606 # gRPC natively compresses all messages before transmission. | 541 # gRPC natively compresses all messages before transmission. |
607 return True | 542 return True |
608 | 543 |
609 def fetch(self, digest, offset=0): | 544 def fetch(self, digest, offset=0): |
610 # The gRPC APIs only work with an offset of 0 | 545 # The gRPC APIs only work with an offset of 0 |
611 assert offset == 0 | 546 assert offset == 0 |
612 request = bytestream_pb2.ReadRequest() | 547 request = bytestream_pb2.ReadRequest() |
613 #TODO(aludwin): send the expected size of the item | 548 #TODO(aludwin): send the expected size of the item |
614 request.resource_name = '%sblobs/%s/0' % ( | 549 request.resource_name = '%s/blobs/%s/0' % ( |
615 self._prefix, digest) | 550 self._proxy.prefix, digest) |
616 try: | 551 try: |
617 for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT): | 552 for response in self._proxy.get_stream('Read', request): |
618 yield response.data | 553 yield response.data |
619 except grpc.RpcError as g: | 554 except grpc.RpcError as g: |
620 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) | 555 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
621 raise IOError(g) | 556 raise IOError(g) |
622 | 557 |
623 def push(self, item, push_state, content=None): | 558 def push(self, item, push_state, content=None): |
624 assert isinstance(item, Item) | 559 assert isinstance(item, Item) |
625 assert item.digest is not None | 560 assert item.digest is not None |
626 assert item.size is not None | 561 assert item.size is not None |
627 assert isinstance(push_state, _IsolateServerGrpcPushState) | 562 assert isinstance(push_state, _IsolateServerGrpcPushState) |
(...skipping 10 matching lines...) Expand all Loading... |
638 or not isinstance(content, collections.Iterable)): | 573 or not isinstance(content, collections.Iterable)): |
639 yield content | 574 yield content |
640 else: | 575 else: |
641 for chunk in content: | 576 for chunk in content: |
642 yield chunk | 577 yield chunk |
643 def slicer(): | 578 def slicer(): |
644 # Ensures every bit of content is under the gRPC max size; yields | 579 # Ensures every bit of content is under the gRPC max size; yields |
645 # proto messages to send via gRPC. | 580 # proto messages to send via gRPC. |
646 request = bytestream_pb2.WriteRequest() | 581 request = bytestream_pb2.WriteRequest() |
647 u = uuid.uuid4() | 582 u = uuid.uuid4() |
648 request.resource_name = '%suploads/%s/blobs/%s/%d' % ( | 583 request.resource_name = '%s/uploads/%s/blobs/%s/%d' % ( |
649 self._prefix, u, item.digest, item.size) | 584 self._proxy.prefix, u, item.digest, item.size) |
650 request.write_offset = 0 | 585 request.write_offset = 0 |
651 for chunk in chunker(): | 586 for chunk in chunker(): |
652 # Make sure we send at least one chunk for zero-length blobs | 587 # Make sure we send at least one chunk for zero-length blobs |
653 has_sent_anything = False | 588 has_sent_anything = False |
654 while chunk or not has_sent_anything: | 589 while chunk or not has_sent_anything: |
655 has_sent_anything = True | 590 has_sent_anything = True |
656 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) | 591 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |
657 request.data = chunk[:slice_len] | 592 request.data = chunk[:slice_len] |
658 if request.write_offset + slice_len == item.size: | 593 if request.write_offset + slice_len == item.size: |
659 request.finish_write = True | 594 request.finish_write = True |
660 yield request | 595 yield request |
661 request.write_offset += slice_len | 596 request.write_offset += slice_len |
662 chunk = chunk[slice_len:] | 597 chunk = chunk[slice_len:] |
663 | 598 |
664 response = None | 599 response = None |
665 try: | 600 try: |
666 response = self._stub.Write(slicer()) | 601 response = self._proxy.call_no_retries('Write', slicer()) |
667 except grpc.RpcError as r: | 602 except grpc.RpcError as r: |
668 if r.code() == grpc.StatusCode.ALREADY_EXISTS: | 603 if r.code() == grpc.StatusCode.ALREADY_EXISTS: |
669 # This is legit - we didn't check before we pushed so no problem if | 604 # This is legit - we didn't check before we pushed so no problem if |
670 # it's already there. | 605 # it's already there. |
671 self._already_exists += 1 | 606 self._already_exists += 1 |
672 if self._already_exists % 100 == 0: | 607 if self._already_exists % 100 == 0: |
673 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % ( | 608 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % ( |
674 self._already_exists, self._num_pushes, | 609 self._already_exists, self._num_pushes, |
675 100.0 * self._already_exists / self._num_pushes)) | 610 100.0 * self._already_exists / self._num_pushes)) |
676 else: | 611 else: |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
722 namespace: isolate namespace to operate in, also defines hashing and | 657 namespace: isolate namespace to operate in, also defines hashing and |
723 compression scheme used, i.e. namespace names that end with '-gzip' | 658 compression scheme used, i.e. namespace names that end with '-gzip' |
724 store compressed data. | 659 store compressed data. |
725 | 660 |
726 Returns: | 661 Returns: |
727 Instance of StorageApi subclass. | 662 Instance of StorageApi subclass. |
728 """ | 663 """ |
729 if _grpc_proxy is not None: | 664 if _grpc_proxy is not None: |
730 return IsolateServerGrpc(url, namespace, _grpc_proxy) | 665 return IsolateServerGrpc(url, namespace, _grpc_proxy) |
731 return IsolateServer(url, namespace) | 666 return IsolateServer(url, namespace) |
OLD | NEW |