OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright 2016 The LUCI Authors. All rights reserved. | 2 # Copyright 2016 The LUCI Authors. All rights reserved. |
3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
5 | 5 |
6 """A low-level blob storage/retrieval interface to the Isolate server""" | 6 """A low-level blob storage/retrieval interface to the Isolate server""" |
7 | 7 |
8 import base64 | 8 import base64 |
9 import binascii | 9 import binascii |
10 import collections | 10 import collections |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
47 | 47 |
48 # Chunk size to use when reading from network stream. | 48 # Chunk size to use when reading from network stream. |
49 NET_IO_FILE_CHUNK = 16 * 1024 | 49 NET_IO_FILE_CHUNK = 16 * 1024 |
50 | 50 |
51 | 51 |
52 # Read timeout in seconds for downloads from isolate storage. If there's no | 52 # Read timeout in seconds for downloads from isolate storage. If there's no |
53 # response from the server within this timeout whole download will be aborted. | 53 # response from the server within this timeout whole download will be aborted. |
54 DOWNLOAD_READ_TIMEOUT = 60 | 54 DOWNLOAD_READ_TIMEOUT = 60 |
55 | 55 |
56 | 56 |
57 # A class to use to communicate with the server by default. Can be changed by | 57 # Stores the gRPC proxy address. Must be set if the storage API class is |
58 # 'set_storage_api_class'. Default is IsolateServer. | 58 # IsolateServerGrpc (call 'set_grpc_proxy'). |
59 _storage_api_cls = None | 59 _grpc_proxy = None |
60 | 60 |
61 | 61 |
62 class Item(object): | 62 class Item(object): |
63 """An item to push to Storage. | 63 """An item to push to Storage. |
64 | 64 |
65 Its digest and size may be provided in advance, if known. Otherwise they will | 65 Its digest and size may be provided in advance, if known. Otherwise they will |
66 be derived from content(). If digest is provided, it MUST correspond to | 66 be derived from content(). If digest is provided, it MUST correspond to |
67 hash algorithm used by Storage. | 67 hash algorithm used by Storage. |
68 | 68 |
69 When used with Storage, Item starts its life in a main thread, travels | 69 When used with Storage, Item starts its life in a main thread, travels |
(...skipping 450 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
520 pass | 520 pass |
521 | 521 |
522 | 522 |
523 class IsolateServerGrpc(StorageApi): | 523 class IsolateServerGrpc(StorageApi): |
524 """StorageApi implementation that downloads and uploads to a gRPC service. | 524 """StorageApi implementation that downloads and uploads to a gRPC service. |
525 | 525 |
526 Limitations: only works for the default-gzip namespace, and with zero offsets | 526 Limitations: only works for the default-gzip namespace, and with zero offsets |
527 while fetching. | 527 while fetching. |
528 """ | 528 """ |
529 | 529 |
530 def __init__(self, server, namespace): | 530 def __init__(self, server, namespace, proxy): |
531 super(IsolateServerGrpc, self).__init__() | 531 super(IsolateServerGrpc, self).__init__() |
532 logging.info('Using gRPC for Isolate') | 532 logging.info('Using gRPC for Isolate') |
533 self._server = server | 533 self._server = server |
534 self._lock = threading.Lock() | 534 self._lock = threading.Lock() |
535 self._memory_use = 0 | 535 self._memory_use = 0 |
536 self._num_pushes = 0 | 536 self._num_pushes = 0 |
537 self._already_exists = 0 | 537 self._already_exists = 0 |
538 | 538 |
539 # Proxies only support the default-gzip namespace for now. | 539 # Proxies only support the default-gzip namespace for now. |
540 # TODO(aludwin): support other namespaces if necessary | 540 # TODO(aludwin): support other namespaces if necessary |
541 assert namespace == 'default-gzip' | 541 assert namespace == 'default-gzip' |
542 self._namespace = namespace | 542 self._namespace = namespace |
543 | 543 |
544 # Make sure grpc was successfully imported | 544 # Make sure grpc was successfully imported |
545 assert grpc | 545 assert grpc |
546 assert bytestream_pb2 | 546 assert bytestream_pb2 |
547 | 547 |
548 proxy = os.environ.get('ISOLATED_GRPC_PROXY', '') | 548 roots = os.environ.get('ISOLATE_GRPC_PROXY_TLS_ROOTS') |
549 roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS') | 549 overd = os.environ.get('ISOLATE_GRPC_PROXY_TLS_OVERRIDE') |
550 overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE') | |
551 | 550 |
552 # The "proxy" envvar must be of the form: | 551 # The "proxy" envvar must be of the form: |
553 # http[s]://<server>[:port][/prefix] | 552 # http[s]://<server>[:port][/prefix] |
554 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) | 553 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) |
555 if not m: | 554 if not m: |
556 raise ValueError(('gRPC proxy must have the form: ' | 555 raise ValueError(('gRPC proxy must have the form: ' |
557 'http[s]://<server>[:port][/prefix] ' | 556 'http[s]://<server>[:port][/prefix] ' |
558 '(given: %s)') % proxy) | 557 '(given: %s)') % proxy) |
559 transport = m.group(1) | 558 transport = m.group(1) |
560 host = m.group(2) | 559 host = m.group(2) |
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
693 assert all(i.digest is not None and i.size is not None for i in items) | 692 assert all(i.digest is not None and i.size is not None for i in items) |
694 # Assume all Items are missing, and attach _PushState to them. The gRPC | 693 # Assume all Items are missing, and attach _PushState to them. The gRPC |
695 # implementation doesn't actually have a push state, we just attach empty | 694 # implementation doesn't actually have a push state, we just attach empty |
696 # objects to satisfy the StorageApi interface. | 695 # objects to satisfy the StorageApi interface. |
697 missing_items = {} | 696 missing_items = {} |
698 for item in items: | 697 for item in items: |
699 missing_items[item] = _IsolateServerGrpcPushState() | 698 missing_items[item] = _IsolateServerGrpcPushState() |
700 return missing_items | 699 return missing_items |
701 | 700 |
702 | 701 |
703 def set_storage_api_class(cls): | 702 def set_grpc_proxy(proxy): |
704 """Replaces StorageApi implementation used by default.""" | 703 """Sets the StorageApi to use the specified proxy.""" |
705 global _storage_api_cls | 704 global _grpc_proxy |
706 assert _storage_api_cls is None | 705 assert _grpc_proxy is None |
707 assert issubclass(cls, StorageApi) | 706 _grpc_proxy = proxy |
708 _storage_api_cls = cls | |
709 | 707 |
710 | 708 |
711 def get_storage_api(url, namespace): | 709 def get_storage_api(url, namespace): |
712 """Returns an object that implements low-level StorageApi interface. | 710 """Returns an object that implements low-level StorageApi interface. |
713 | 711 |
714 It is used by Storage to work with single isolate |namespace|. It should | 712 It is used by Storage to work with single isolate |namespace|. It should |
715 rarely be used directly by clients, see 'get_storage' for | 713 rarely be used directly by clients, see 'get_storage' for |
716 a better alternative. | 714 a better alternative. |
717 | 715 |
718 Arguments: | 716 Arguments: |
719 url: URL of isolate service to use shared cloud based storage. | 717 url: URL of isolate service to use shared cloud based storage. |
720 namespace: isolate namespace to operate in, also defines hashing and | 718 namespace: isolate namespace to operate in, also defines hashing and |
721 compression scheme used, i.e. namespace names that end with '-gzip' | 719 compression scheme used, i.e. namespace names that end with '-gzip' |
722 store compressed data. | 720 store compressed data. |
723 | 721 |
724 Returns: | 722 Returns: |
725 Instance of StorageApi subclass. | 723 Instance of StorageApi subclass. |
726 """ | 724 """ |
727 cls = _storage_api_cls or IsolateServer | 725 if _grpc_proxy is not None: |
728 return cls(url, namespace) | 726 return IsolateServerGrpc(url, namespace, _grpc_proxy) |
| 727 return IsolateServer(url, namespace) |
OLD | NEW |