OLD | NEW |
---|---|
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright 2016 The LUCI Authors. All rights reserved. | 2 # Copyright 2016 The LUCI Authors. All rights reserved. |
3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
5 | 5 |
6 """A low-level blob storage/retrieval interface to the Isolate server""" | 6 """A low-level blob storage/retrieval interface to the Isolate server""" |
7 | 7 |
8 import base64 | 8 import base64 |
9 import binascii | 9 import binascii |
10 import collections | 10 import collections |
11 import logging | 11 import logging |
12 import os | |
12 import re | 13 import re |
13 import sys | 14 import sys |
14 import threading | 15 import threading |
15 import time | 16 import time |
16 import types | 17 import types |
18 import uuid | |
17 | 19 |
18 from utils import file_path | 20 from utils import file_path |
19 from utils import net | 21 from utils import net |
20 | 22 |
21 import isolated_format | 23 import isolated_format |
22 | 24 |
23 # gRPC may not be installed on the worker machine. This is fine, as long as | 25 # gRPC may not be installed on the worker machine. This is fine, as long as |
24 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). | 26 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). |
27 # Full external requirements are: grpcio, certifi. | |
25 try: | 28 try: |
26 import grpc | 29 import grpc |
27 from proto import isolate_bot_pb2 | 30 from google import auth as google_auth |
28 except ImportError: | 31 from google.auth.transport import grpc as google_auth_transport_grpc |
32 from google.auth.transport import requests as google_auth_transport_requests | |
33 from proto import bytestream_pb2 | |
34 except ImportError as err: | |
29 grpc = None | 35 grpc = None |
30 isolate_bot_pb2 = None | 36 bytestream_pb2 = None |
31 | 37 |
38 # If gRPC is installed, at least give a warning if certifi is not. | |
39 if grpc is not None: | |
40 try: | |
41 import certifi | |
Vadim Sh.
2017/06/27 19:12:49
where is this used?
aludwin
2017/06/27 19:23:06
It's conditionally imported by google.auth.transpo
Vadim Sh.
2017/06/27 19:27:09
Yeah, leave a comment.
aludwin
2017/06/27 20:10:17
Done.
| |
42 except ImportError as err: | |
43 logging.warning('could not import certifi; gRPC HTTPS connections may fail') | |
32 | 44 |
33 # Chunk size to use when reading from network stream. | 45 # Chunk size to use when reading from network stream. |
34 NET_IO_FILE_CHUNK = 16 * 1024 | 46 NET_IO_FILE_CHUNK = 16 * 1024 |
35 | 47 |
36 | 48 |
37 # Read timeout in seconds for downloads from isolate storage. If there's no | 49 # Read timeout in seconds for downloads from isolate storage. If there's no |
38 # response from the server within this timeout whole download will be aborted. | 50 # response from the server within this timeout whole download will be aborted. |
39 DOWNLOAD_READ_TIMEOUT = 60 | 51 DOWNLOAD_READ_TIMEOUT = 60 |
40 | 52 |
41 | 53 |
(...skipping 457 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
499 class IsolateServerGrpc(StorageApi): | 511 class IsolateServerGrpc(StorageApi): |
500 """StorageApi implementation that downloads and uploads to a gRPC service. | 512 """StorageApi implementation that downloads and uploads to a gRPC service. |
501 | 513 |
502 Limitations: only works for the default-gzip namespace, and with zero offsets | 514 Limitations: only works for the default-gzip namespace, and with zero offsets |
503 while fetching. | 515 while fetching. |
504 """ | 516 """ |
505 | 517 |
506 def __init__(self, server, namespace): | 518 def __init__(self, server, namespace): |
507 super(IsolateServerGrpc, self).__init__() | 519 super(IsolateServerGrpc, self).__init__() |
508 logging.info('Using gRPC for Isolate') | 520 logging.info('Using gRPC for Isolate') |
521 self._server = server | |
522 self._lock = threading.Lock() | |
523 self._memory_use = 0 | |
524 self._num_pushes = 0 | |
525 self._already_exists = 0 | |
509 | 526 |
510 # Make sure grpc was successfully imported | 527 # Make sure grpc was successfully imported |
511 assert grpc | 528 assert grpc |
512 assert isolate_bot_pb2 | 529 assert bytestream_pb2 |
530 # Proxies only support the default-gzip namespace for now. | |
531 # TODO(aludwin): support other namespaces if necessary | |
532 assert namespace == 'default-gzip' | |
513 | 533 |
514 # Proxies only support the default-gzip namespace for now. | 534 proxy = os.environ.get('ISOLATED_GRPC_PROXY', '') |
515 # TODO(aludwin): support other namespaces | 535 roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS') |
516 assert namespace == 'default-gzip' | 536 overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE') |
517 self._server = server | 537 |
518 self._channel = grpc.insecure_channel(server) | 538 # The "proxy" envvar must be of the form: |
519 self._stub = isolate_bot_pb2.FileServiceStub(self._channel) | 539 # http[s]://<server>[:port][/prefix] |
520 self._lock = threading.Lock() | 540 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) |
521 self._memory_use = 0 | 541 if not m: |
522 logging.info('...gRPC successfully initialized') | 542 raise ValueError(('gRPC proxy must have the form: ' |
543 'http[s]://<server>[:port][/prefix] ' | |
544 '(given: %s)') % proxy) | |
545 transport = m.group(1) | |
546 host = m.group(2) | |
547 prefix = m.group(3) | |
548 if not prefix.endswith('/'): | |
549 prefix = prefix + '/' | |
550 logging.info('gRPC proxy: transport %s, host %s, prefix %s', | |
551 transport, host, prefix) | |
552 self._prefix = prefix | |
553 | |
554 if transport == 'http': | |
555 self._channel = grpc.insecure_channel(host) | |
556 elif transport == 'https': | |
557 # Using cloud container builder scopes for testing: | |
558 scopes = ('https://www.googleapis.com/auth/cloud-build-service',) | |
559 credentials, _ = google_auth.default(scopes=scopes) | |
Vadim Sh.
2017/06/27 19:12:49
If you are requiring configured Application Defaul
| |
560 request = google_auth_transport_requests.Request() | |
561 options = () | |
562 root_certs = None | |
563 if roots is not None: | |
564 logging.info('Using root CA %s', roots) | |
565 with open(roots) as f: | |
566 root_certs = f.read() | |
567 if overd is not None: | |
568 logging.info('Using TLS server override %s', overd) | |
569 options=(('grpc.ssl_target_name_override', overd),) | |
570 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) | |
571 self._channel = google_auth_transport_grpc.secure_authorized_channel( | |
572 credentials, request, host, ssl_creds, options=options) | |
573 else: | |
574 raise ValueError('unknown transport %s (should be http[s])' % transport) | |
575 self._stub = bytestream_pb2.ByteStreamStub(self._channel) | |
Vadim Sh.
2017/06/27 19:12:49
if this object thread safe? 'fetch' and 'push' wil
aludwin
2017/06/27 20:10:17
Yup: https://github.com/grpc/grpc/issues/9320
| |
523 | 576 |
524 @property | 577 @property |
525 def location(self): | 578 def location(self): |
526 return self._server | 579 return self._server |
527 | 580 |
528 @property | 581 @property |
529 def namespace(self): | 582 def namespace(self): |
530 # This is used to determine if the data is compressed, but gRPC proxies | 583 # This is used to determine if the data is compressed, but gRPC proxies |
531 # don't have concepts of 'namespaces' and natively compress all messages | 584 # don't have concepts of 'namespaces' and natively compress all messages |
532 # before transmission. So return an unlikely-to-be-used name so that | 585 # before transmission. So return an unlikely-to-be-used name so that |
533 # isolateserver doesn't try to compress anything. | 586 # isolateserver doesn't try to compress anything. |
534 return 'grpc-proxy' | 587 return 'grpc-proxy' |
535 | 588 |
536 def fetch(self, digest, offset=0): | 589 def fetch(self, digest, offset=0): |
537 # The gRPC APIs only work with an offset of 0 | 590 # The gRPC APIs only work with an offset of 0 |
538 assert offset == 0 | 591 assert offset == 0 |
539 request = isolate_bot_pb2.FetchBlobsRequest() | 592 request = bytestream_pb2.ReadRequest() |
540 req_digest = request.digest.add() | 593 #TODO(aludwin): send the expected size of the item |
541 # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte | 594 request.resource_name = '%sblobs/%s/0' % ( |
542 # array (like [0x01, 0x2a, 0xbc]). | 595 self._prefix, digest) |
543 req_digest.digest = binascii.unhexlify(digest) | |
544 expected_offset = 0 | |
545 try: | 596 try: |
546 for response in self._stub.FetchBlobs(request, | 597 for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT): |
547 timeout=DOWNLOAD_READ_TIMEOUT): | 598 yield response.data |
548 if not response.status.succeeded: | |
549 raise IOError( | |
550 'Error while fetching %s: %s' % (digest, response.status)) | |
551 if not expected_offset == response.data.offset: | |
552 raise IOError( | |
553 'Error while fetching %s: expected offset %d, got %d' % ( | |
554 digest, expected_offset, response.data.offset)) | |
555 expected_offset += len(response.data.data) | |
556 yield response.data.data | |
557 except grpc.RpcError as g: | 599 except grpc.RpcError as g: |
558 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) | 600 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
559 raise IOError(g) | 601 raise IOError(g) |
560 | 602 |
561 def push(self, item, push_state, content=None): | 603 def push(self, item, push_state, content=None): |
562 assert isinstance(item, Item) | 604 assert isinstance(item, Item) |
563 assert item.digest is not None | 605 assert item.digest is not None |
564 assert item.size is not None | 606 assert item.size is not None |
565 assert isinstance(push_state, _IsolateServerGrpcPushState) | 607 assert isinstance(push_state, _IsolateServerGrpcPushState) |
566 | 608 |
567 # Default to item.content(). | 609 # Default to item.content(). |
568 content = item.content() if content is None else content | 610 content = item.content() if content is None else content |
569 guard_memory_use(self, content, item.size) | 611 guard_memory_use(self, content, item.size) |
612 self._num_pushes += 1 | |
570 | 613 |
571 try: | 614 try: |
572 def chunker(): | 615 def chunker(): |
573 # Returns one bit of content at a time | 616 # Returns one bit of content at a time |
574 if (isinstance(content, str) | 617 if (isinstance(content, str) |
575 or not isinstance(content, collections.Iterable)): | 618 or not isinstance(content, collections.Iterable)): |
576 yield content | 619 yield content |
577 else: | 620 else: |
578 for chunk in content: | 621 for chunk in content: |
579 yield chunk | 622 yield chunk |
580 def slicer(): | 623 def slicer(): |
581 # Ensures every bit of content is under the gRPC max size; yields | 624 # Ensures every bit of content is under the gRPC max size; yields |
582 # proto messages to send via gRPC. | 625 # proto messages to send via gRPC. |
583 request = isolate_bot_pb2.PushBlobsRequest() | 626 request = bytestream_pb2.WriteRequest() |
584 request.data.digest.digest = binascii.unhexlify(item.digest) | 627 u = uuid.uuid4() |
585 request.data.digest.size_bytes = item.size | 628 request.resource_name = '%suploads/%s/blobs/%s/%d' % ( |
586 request.data.offset = 0 | 629 self._prefix, u, item.digest, item.size) |
630 request.write_offset = 0 | |
587 for chunk in chunker(): | 631 for chunk in chunker(): |
588 # Make sure we send at least one chunk for zero-length blobs | 632 # Make sure we send at least one chunk for zero-length blobs |
589 has_sent_anything = False | 633 has_sent_anything = False |
590 while chunk or not has_sent_anything: | 634 while chunk or not has_sent_anything: |
635 has_sent_anything = True | |
591 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) | 636 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |
592 request.data.data = chunk[:slice_len] | 637 request.data = chunk[:slice_len] |
638 if request.write_offset + slice_len == item.size: | |
639 request.finish_write = True | |
593 yield request | 640 yield request |
594 has_sent_anything = True | 641 request.write_offset += slice_len |
595 request.data.offset += slice_len | |
596 # The proxy only expects the first chunk to have the digest | |
597 request.data.ClearField("digest") | |
598 chunk = chunk[slice_len:] | 642 chunk = chunk[slice_len:] |
599 | 643 |
600 # TODO(aludwin): batch up several requests to reuse TCP connections | |
601 try: | 644 try: |
602 response = self._stub.PushBlobs(slicer()) | 645 response = self._stub.Write(slicer()) |
603 except grpc.RpcError as g: | 646 except grpc.Call as c: |
604 logging.error('gRPC error during push: re-throwing as IOError (%s)' % g) | 647 # You might think that errors from gRPC would be rpc.RpcError. You'd |
605 raise IOError(g) | 648 # be... right... but it's *also* an instance of grpc.Call, and that's |
649 # where the status code actually lives. | |
650 if c.code() == grpc.StatusCode.ALREADY_EXISTS: | |
651 # This is legit - we didn't check before we pushed so no problem if | |
652 # it's already there. | |
653 self._already_exists += 1 | |
654 if self._already_exists % 100 == 0: | |
655 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % ( | |
656 self._already_exists, self._num_pushes, | |
657 100.0 * self._already_exists / self._num_pushes)) | |
658 else: | |
659 logging.error('gRPC error during push: throwing as IOError (%s)' % c) | |
660 raise IOError(c) | |
661 except Exception as e: | |
662 logging.error('error during push: throwing as IOError (%s)' % e) | |
Vadim Sh.
2017/06/27 19:12:49
nit: use 2 spaces for indentation
Vadim Sh.
2017/06/27 21:14:29
Ping. It is 4 spaces now.
aludwin
2017/06/28 13:23:53
Done.
| |
663 raise IOError(e) | |
606 | 664 |
607 if not response.status.succeeded: | 665 if response.committed_size != item.size: |
608 raise IOError( | 666 raise IOError('%s/%d: incorrect size written (%d)' % ( |
609 'Error while uploading %s: %s' % ( | 667 item.digest, item.size, response.committed_size)) |
610 item.digest, response.status.error_detail)) | |
611 | 668 |
612 finally: | 669 finally: |
613 with self._lock: | 670 with self._lock: |
614 self._memory_use -= item.size | 671 self._memory_use -= item.size |
615 | 672 |
616 def contains(self, items): | 673 def contains(self, items): |
617 """Returns the set of all missing items.""" | 674 """Returns the set of all missing items.""" |
675 # TODO(aludwin): this isn't supported directly in Bytestream, so for now | |
676 # assume that nothing is present in the cache. | |
618 # Ensure all items were initialized with 'prepare' call. Storage does that. | 677 # Ensure all items were initialized with 'prepare' call. Storage does that. |
619 assert all(i.digest is not None and i.size is not None for i in items) | 678 assert all(i.digest is not None and i.size is not None for i in items) |
620 request = isolate_bot_pb2.ContainsRequest() | 679 # Assume all Items are missing, and attach _PushState to them. The gRPC |
621 items_by_digest = {} | 680 # implementation doesn't actually have a push state, we just attach empty |
681 # objects to satisfy the StorageApi interface. | |
682 missing_items = {} | |
622 for item in items: | 683 for item in items: |
623 cd = request.digest.add() | |
624 cd.digest = binascii.unhexlify(item.digest) | |
625 items_by_digest[cd.digest] = item | |
626 try: | |
627 response = self._stub.Contains(request) | |
628 except grpc.RpcError as g: | |
629 logging.error('gRPC error during contains: re-throwing as IOError (%s)' | |
630 % g) | |
631 raise IOError(g) | |
632 | |
633 # If everything's present, return the empty set. | |
634 if response.status.succeeded: | |
635 return {} | |
636 | |
637 if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST: | |
638 raise IOError('Unknown response during lookup: %s' % response.status) | |
639 | |
640 # Pick Items that are missing, attach _PushState to them. The gRPC | |
641 # implementation doesn't actually have a push state, we just attach | |
642 # empty objects to satisfy the StorageApi interface. | |
643 missing_items = {} | |
644 for missing in response.status.missing_digest: | |
645 item = items_by_digest[missing.digest] | |
646 missing_items[item] = _IsolateServerGrpcPushState() | 684 missing_items[item] = _IsolateServerGrpcPushState() |
647 | |
648 logging.info('Queried %d files, %d cache hit', | |
649 len(items), len(items) - len(missing_items)) | |
650 return missing_items | 685 return missing_items |
651 | 686 |
652 | 687 |
653 def set_storage_api_class(cls): | 688 def set_storage_api_class(cls): |
654 """Replaces StorageApi implementation used by default.""" | 689 """Replaces StorageApi implementation used by default.""" |
655 global _storage_api_cls | 690 global _storage_api_cls |
656 assert _storage_api_cls is None | 691 assert _storage_api_cls is None |
657 assert issubclass(cls, StorageApi) | 692 assert issubclass(cls, StorageApi) |
658 _storage_api_cls = cls | 693 _storage_api_cls = cls |
659 | 694 |
660 | 695 |
661 def get_storage_api(url, namespace): | 696 def get_storage_api(url, namespace): |
662 """Returns an object that implements low-level StorageApi interface. | 697 """Returns an object that implements low-level StorageApi interface. |
663 | 698 |
664 It is used by Storage to work with single isolate |namespace|. It should | 699 It is used by Storage to work with single isolate |namespace|. It should |
665 rarely be used directly by clients, see 'get_storage' for | 700 rarely be used directly by clients, see 'get_storage' for |
666 a better alternative. | 701 a better alternative. |
667 | 702 |
668 Arguments: | 703 Arguments: |
669 url: URL of isolate service to use shared cloud based storage. | 704 url: URL of isolate service to use shared cloud based storage. |
670 namespace: isolate namespace to operate in, also defines hashing and | 705 namespace: isolate namespace to operate in, also defines hashing and |
671 compression scheme used, i.e. namespace names that end with '-gzip' | 706 compression scheme used, i.e. namespace names that end with '-gzip' |
672 store compressed data. | 707 store compressed data. |
673 | 708 |
674 Returns: | 709 Returns: |
675 Instance of StorageApi subclass. | 710 Instance of StorageApi subclass. |
676 """ | 711 """ |
677 cls = _storage_api_cls or IsolateServer | 712 cls = _storage_api_cls or IsolateServer |
678 return cls(url, namespace) | 713 return cls(url, namespace) |
OLD | NEW |