| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright 2016 The LUCI Authors. All rights reserved. | 2 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
| 5 | 5 |
| 6 """A low-level blob storage/retrieval interface to the Isolate server""" | 6 """A low-level blob storage/retrieval interface to the Isolate server""" |
| 7 | 7 |
| 8 import base64 | 8 import base64 |
| 9 import binascii | 9 import binascii |
| 10 import collections | 10 import collections |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 47 | 47 |
| 48 # Chunk size to use when reading from network stream. | 48 # Chunk size to use when reading from network stream. |
| 49 NET_IO_FILE_CHUNK = 16 * 1024 | 49 NET_IO_FILE_CHUNK = 16 * 1024 |
| 50 | 50 |
| 51 | 51 |
| 52 # Read timeout in seconds for downloads from isolate storage. If there's no | 52 # Read timeout in seconds for downloads from isolate storage. If there's no |
| 53 # response from the server within this timeout whole download will be aborted. | 53 # response from the server within this timeout whole download will be aborted. |
| 54 DOWNLOAD_READ_TIMEOUT = 60 | 54 DOWNLOAD_READ_TIMEOUT = 60 |
| 55 | 55 |
| 56 | 56 |
| 57 # A class to use to communicate with the server by default. Can be changed by | 57 # Stores the gRPC proxy address. Must be set if the storage API class is |
| 58 # 'set_storage_api_class'. Default is IsolateServer. | 58 # IsolateServerGrpc (call 'set_grpc_proxy'). |
| 59 _storage_api_cls = None | 59 _grpc_proxy = None |
| 60 | 60 |
| 61 | 61 |
| 62 class Item(object): | 62 class Item(object): |
| 63 """An item to push to Storage. | 63 """An item to push to Storage. |
| 64 | 64 |
| 65 Its digest and size may be provided in advance, if known. Otherwise they will | 65 Its digest and size may be provided in advance, if known. Otherwise they will |
| 66 be derived from content(). If digest is provided, it MUST correspond to | 66 be derived from content(). If digest is provided, it MUST correspond to |
| 67 hash algorithm used by Storage. | 67 hash algorithm used by Storage. |
| 68 | 68 |
| 69 When used with Storage, Item starts its life in a main thread, travels | 69 When used with Storage, Item starts its life in a main thread, travels |
| (...skipping 450 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 520 pass | 520 pass |
| 521 | 521 |
| 522 | 522 |
| 523 class IsolateServerGrpc(StorageApi): | 523 class IsolateServerGrpc(StorageApi): |
| 524 """StorageApi implementation that downloads and uploads to a gRPC service. | 524 """StorageApi implementation that downloads and uploads to a gRPC service. |
| 525 | 525 |
| 526 Limitations: only works for the default-gzip namespace, and with zero offsets | 526 Limitations: only works for the default-gzip namespace, and with zero offsets |
| 527 while fetching. | 527 while fetching. |
| 528 """ | 528 """ |
| 529 | 529 |
| 530 def __init__(self, server, namespace): | 530 def __init__(self, server, namespace, proxy): |
| 531 super(IsolateServerGrpc, self).__init__() | 531 super(IsolateServerGrpc, self).__init__() |
| 532 logging.info('Using gRPC for Isolate') | 532 logging.info('Using gRPC for Isolate') |
| 533 self._server = server | 533 self._server = server |
| 534 self._lock = threading.Lock() | 534 self._lock = threading.Lock() |
| 535 self._memory_use = 0 | 535 self._memory_use = 0 |
| 536 self._num_pushes = 0 | 536 self._num_pushes = 0 |
| 537 self._already_exists = 0 | 537 self._already_exists = 0 |
| 538 | 538 |
| 539 # Proxies only support the default-gzip namespace for now. | 539 # Proxies only support the default-gzip namespace for now. |
| 540 # TODO(aludwin): support other namespaces if necessary | 540 # TODO(aludwin): support other namespaces if necessary |
| 541 assert namespace == 'default-gzip' | 541 assert namespace == 'default-gzip' |
| 542 self._namespace = namespace | 542 self._namespace = namespace |
| 543 | 543 |
| 544 # Make sure grpc was successfully imported | 544 # Make sure grpc was successfully imported |
| 545 assert grpc | 545 assert grpc |
| 546 assert bytestream_pb2 | 546 assert bytestream_pb2 |
| 547 | 547 |
| 548 proxy = os.environ.get('ISOLATED_GRPC_PROXY', '') | 548 roots = os.environ.get('ISOLATE_GRPC_PROXY_TLS_ROOTS') |
| 549 roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS') | 549 overd = os.environ.get('ISOLATE_GRPC_PROXY_TLS_OVERRIDE') |
| 550 overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE') | |
| 551 | 550 |
| 552 # The "proxy" envvar must be of the form: | 551 # The "proxy" envvar must be of the form: |
| 553 # http[s]://<server>[:port][/prefix] | 552 # http[s]://<server>[:port][/prefix] |
| 554 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) | 553 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) |
| 555 if not m: | 554 if not m: |
| 556 raise ValueError(('gRPC proxy must have the form: ' | 555 raise ValueError(('gRPC proxy must have the form: ' |
| 557 'http[s]://<server>[:port][/prefix] ' | 556 'http[s]://<server>[:port][/prefix] ' |
| 558 '(given: %s)') % proxy) | 557 '(given: %s)') % proxy) |
| 559 transport = m.group(1) | 558 transport = m.group(1) |
| 560 host = m.group(2) | 559 host = m.group(2) |
| (...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 693 assert all(i.digest is not None and i.size is not None for i in items) | 692 assert all(i.digest is not None and i.size is not None for i in items) |
| 694 # Assume all Items are missing, and attach _PushState to them. The gRPC | 693 # Assume all Items are missing, and attach _PushState to them. The gRPC |
| 695 # implementation doesn't actually have a push state, we just attach empty | 694 # implementation doesn't actually have a push state, we just attach empty |
| 696 # objects to satisfy the StorageApi interface. | 695 # objects to satisfy the StorageApi interface. |
| 697 missing_items = {} | 696 missing_items = {} |
| 698 for item in items: | 697 for item in items: |
| 699 missing_items[item] = _IsolateServerGrpcPushState() | 698 missing_items[item] = _IsolateServerGrpcPushState() |
| 700 return missing_items | 699 return missing_items |
| 701 | 700 |
| 702 | 701 |
| 703 def set_storage_api_class(cls): | 702 def set_grpc_proxy(proxy): |
| 704 """Replaces StorageApi implementation used by default.""" | 703 """Sets the StorageApi to use the specified proxy.""" |
| 705 global _storage_api_cls | 704 global _grpc_proxy |
| 706 assert _storage_api_cls is None | 705 assert _grpc_proxy is None |
| 707 assert issubclass(cls, StorageApi) | 706 _grpc_proxy = proxy |
| 708 _storage_api_cls = cls | |
| 709 | 707 |
| 710 | 708 |
| 711 def get_storage_api(url, namespace): | 709 def get_storage_api(url, namespace): |
| 712 """Returns an object that implements low-level StorageApi interface. | 710 """Returns an object that implements low-level StorageApi interface. |
| 713 | 711 |
| 714 It is used by Storage to work with single isolate |namespace|. It should | 712 It is used by Storage to work with single isolate |namespace|. It should |
| 715 rarely be used directly by clients, see 'get_storage' for | 713 rarely be used directly by clients, see 'get_storage' for |
| 716 a better alternative. | 714 a better alternative. |
| 717 | 715 |
| 718 Arguments: | 716 Arguments: |
| 719 url: URL of isolate service to use shared cloud based storage. | 717 url: URL of isolate service to use shared cloud based storage. |
| 720 namespace: isolate namespace to operate in, also defines hashing and | 718 namespace: isolate namespace to operate in, also defines hashing and |
| 721 compression scheme used, i.e. namespace names that end with '-gzip' | 719 compression scheme used, i.e. namespace names that end with '-gzip' |
| 722 store compressed data. | 720 store compressed data. |
| 723 | 721 |
| 724 Returns: | 722 Returns: |
| 725 Instance of StorageApi subclass. | 723 Instance of StorageApi subclass. |
| 726 """ | 724 """ |
| 727 cls = _storage_api_cls or IsolateServer | 725 if _grpc_proxy is not None: |
| 728 return cls(url, namespace) | 726 return IsolateServerGrpc(url, namespace, _grpc_proxy) |
| 727 return IsolateServer(url, namespace) |
| OLD | NEW |