| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright 2016 The LUCI Authors. All rights reserved. | 2 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
| 5 | 5 |
| 6 """A low-level blob storage/retrieval interface to the Isolate server""" | 6 """A low-level blob storage/retrieval interface to the Isolate server""" |
| 7 | 7 |
| 8 import base64 | 8 import base64 |
| 9 import binascii | 9 import binascii |
| 10 import collections | 10 import collections |
| (...skipping 524 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 535 | 535 |
| 536 def fetch(self, digest, offset=0): | 536 def fetch(self, digest, offset=0): |
| 537 # The gRPC APIs only work with an offset of 0 | 537 # The gRPC APIs only work with an offset of 0 |
| 538 assert offset == 0 | 538 assert offset == 0 |
| 539 request = isolate_bot_pb2.FetchBlobsRequest() | 539 request = isolate_bot_pb2.FetchBlobsRequest() |
| 540 req_digest = request.digest.add() | 540 req_digest = request.digest.add() |
| 541 # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte | 541 # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte |
| 542 # array (like [0x01, 0x2a, 0xbc]). | 542 # array (like [0x01, 0x2a, 0xbc]). |
| 543 req_digest.digest = binascii.unhexlify(digest) | 543 req_digest.digest = binascii.unhexlify(digest) |
| 544 expected_offset = 0 | 544 expected_offset = 0 |
| 545 for response in self._stub.FetchBlobs(request, | 545 try: |
| 546 timeout=DOWNLOAD_READ_TIMEOUT): | 546 for response in self._stub.FetchBlobs(request, |
| 547 if not response.status.succeeded: | 547 timeout=DOWNLOAD_READ_TIMEOUT): |
| 548 raise IOError( | 548 if not response.status.succeeded: |
| 549 'Error while fetching %s: %s' % (digest, response.status)) | 549 raise IOError( |
| 550 if not expected_offset == response.data.offset: | 550 'Error while fetching %s: %s' % (digest, response.status)) |
| 551 raise IOError( | 551 if not expected_offset == response.data.offset: |
| 552 'Error while fetching %s: expected offset %d, got %d' % ( | 552 raise IOError( |
| 553 digest, expected_offset, response.data.offset)) | 553 'Error while fetching %s: expected offset %d, got %d' % ( |
| 554 expected_offset += len(response.data.data) | 554 digest, expected_offset, response.data.offset)) |
| 555 yield response.data.data | 555 expected_offset += len(response.data.data) |
| 556 yield response.data.data |
| 557 except grpc.RpcError as g: |
| 558 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
| 559 raise IOError(g) |
| 556 | 560 |
| 557 def push(self, item, push_state, content=None): | 561 def push(self, item, push_state, content=None): |
| 558 assert isinstance(item, Item) | 562 assert isinstance(item, Item) |
| 559 assert item.digest is not None | 563 assert item.digest is not None |
| 560 assert item.size is not None | 564 assert item.size is not None |
| 561 assert isinstance(push_state, _IsolateServerGrpcPushState) | 565 assert isinstance(push_state, _IsolateServerGrpcPushState) |
| 562 | 566 |
| 563 # Default to item.content(). | 567 # Default to item.content(). |
| 564 content = item.content() if content is None else content | 568 content = item.content() if content is None else content |
| 565 guard_memory_use(self, content, item.size) | 569 guard_memory_use(self, content, item.size) |
| (...skipping 21 matching lines...) Expand all Loading... |
| 587 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) | 591 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |
| 588 request.data.data = chunk[:slice_len] | 592 request.data.data = chunk[:slice_len] |
| 589 yield request | 593 yield request |
| 590 has_sent_anything = True | 594 has_sent_anything = True |
| 591 request.data.offset += slice_len | 595 request.data.offset += slice_len |
| 592 # The proxy only expects the first chunk to have the digest | 596 # The proxy only expects the first chunk to have the digest |
| 593 request.data.ClearField("digest") | 597 request.data.ClearField("digest") |
| 594 chunk = chunk[slice_len:] | 598 chunk = chunk[slice_len:] |
| 595 | 599 |
| 596 # TODO(aludwin): batch up several requests to reuse TCP connections | 600 # TODO(aludwin): batch up several requests to reuse TCP connections |
| 597 response = self._stub.PushBlobs(slicer()) | 601 try: |
| 602 response = self._stub.PushBlobs(slicer()) |
| 603 except grpc.RpcError as g: |
| 604 logging.error('gRPC error during push: re-throwing as IOError (%s)' % g) |
| 605 raise IOError(g) |
| 606 |
| 598 if not response.status.succeeded: | 607 if not response.status.succeeded: |
| 599 raise IOError( | 608 raise IOError( |
| 600 'Error while uploading %s: %s' % ( | 609 'Error while uploading %s: %s' % ( |
| 601 item.digest, response.status.error_detail)) | 610 item.digest, response.status.error_detail)) |
| 602 | 611 |
| 603 finally: | 612 finally: |
| 604 with self._lock: | 613 with self._lock: |
| 605 self._memory_use -= item.size | 614 self._memory_use -= item.size |
| 606 | 615 |
| 607 def contains(self, items): | 616 def contains(self, items): |
| 608 """Returns the set of all missing items.""" | 617 """Returns the set of all missing items.""" |
| 609 # Ensure all items were initialized with 'prepare' call. Storage does that. | 618 # Ensure all items were initialized with 'prepare' call. Storage does that. |
| 610 assert all(i.digest is not None and i.size is not None for i in items) | 619 assert all(i.digest is not None and i.size is not None for i in items) |
| 611 request = isolate_bot_pb2.ContainsRequest() | 620 request = isolate_bot_pb2.ContainsRequest() |
| 612 items_by_digest = {} | 621 items_by_digest = {} |
| 613 for item in items: | 622 for item in items: |
| 614 cd = request.digest.add() | 623 cd = request.digest.add() |
| 615 cd.digest = binascii.unhexlify(item.digest) | 624 cd.digest = binascii.unhexlify(item.digest) |
| 616 items_by_digest[cd.digest] = item | 625 items_by_digest[cd.digest] = item |
| 617 response = self._stub.Contains(request) | 626 try: |
| 627 response = self._stub.Contains(request) |
| 628 except grpc.RpcError as g: |
| 629 logging.error('gRPC error during contains: re-throwing as IOError (%s)' |
| 630 % g) |
| 631 raise IOError(g) |
| 618 | 632 |
| 619 # If everything's present, return the empty set. | 633 # If everything's present, return the empty set. |
| 620 if response.status.succeeded: | 634 if response.status.succeeded: |
| 621 return {} | 635 return {} |
| 622 | 636 |
| 623 if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST: | 637 if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST: |
| 624 raise IOError('Unknown response during lookup: %s' % response.status) | 638 raise IOError('Unknown response during lookup: %s' % response.status) |
| 625 | 639 |
| 626 # Pick Items that are missing, attach _PushState to them. The gRPC | 640 # Pick Items that are missing, attach _PushState to them. The gRPC |
| 627 # implementation doesn't actually have a push state, we just attach | 641 # implementation doesn't actually have a push state, we just attach |
| (...skipping 27 matching lines...) Expand all Loading... |
| 655 url: URL of isolate service to use shared cloud based storage. | 669 url: URL of isolate service to use shared cloud based storage. |
| 656 namespace: isolate namespace to operate in, also defines hashing and | 670 namespace: isolate namespace to operate in, also defines hashing and |
| 657 compression scheme used, i.e. namespace names that end with '-gzip' | 671 compression scheme used, i.e. namespace names that end with '-gzip' |
| 658 store compressed data. | 672 store compressed data. |
| 659 | 673 |
| 660 Returns: | 674 Returns: |
| 661 Instance of StorageApi subclass. | 675 Instance of StorageApi subclass. |
| 662 """ | 676 """ |
| 663 cls = _storage_api_cls or IsolateServer | 677 cls = _storage_api_cls or IsolateServer |
| 664 return cls(url, namespace) | 678 return cls(url, namespace) |
| OLD | NEW |