OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright 2016 The LUCI Authors. All rights reserved. | 2 # Copyright 2016 The LUCI Authors. All rights reserved. |
3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
5 | 5 |
6 """A low-level blob storage/retrieval interface to the Isolate server""" | 6 """A low-level blob storage/retrieval interface to the Isolate server""" |
7 | 7 |
8 import base64 | 8 import base64 |
9 import binascii | 9 import binascii |
10 import collections | 10 import collections |
(...skipping 524 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
535 | 535 |
536 def fetch(self, digest, offset=0): | 536 def fetch(self, digest, offset=0): |
537 # The gRPC APIs only work with an offset of 0 | 537 # The gRPC APIs only work with an offset of 0 |
538 assert offset == 0 | 538 assert offset == 0 |
539 request = isolate_bot_pb2.FetchBlobsRequest() | 539 request = isolate_bot_pb2.FetchBlobsRequest() |
540 req_digest = request.digest.add() | 540 req_digest = request.digest.add() |
541 # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte | 541 # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte |
542 # array (like [0x01, 0x2a, 0xbc]). | 542 # array (like [0x01, 0x2a, 0xbc]). |
543 req_digest.digest = binascii.unhexlify(digest) | 543 req_digest.digest = binascii.unhexlify(digest) |
544 expected_offset = 0 | 544 expected_offset = 0 |
545 for response in self._stub.FetchBlobs(request, | 545 try: |
546 timeout=DOWNLOAD_READ_TIMEOUT): | 546 for response in self._stub.FetchBlobs(request, |
547 if not response.status.succeeded: | 547 timeout=DOWNLOAD_READ_TIMEOUT): |
548 raise IOError( | 548 if not response.status.succeeded: |
549 'Error while fetching %s: %s' % (digest, response.status)) | 549 raise IOError( |
550 if not expected_offset == response.data.offset: | 550 'Error while fetching %s: %s' % (digest, response.status)) |
551 raise IOError( | 551 if not expected_offset == response.data.offset: |
552 'Error while fetching %s: expected offset %d, got %d' % ( | 552 raise IOError( |
553 digest, expected_offset, response.data.offset)) | 553 'Error while fetching %s: expected offset %d, got %d' % ( |
554 expected_offset += len(response.data.data) | 554 digest, expected_offset, response.data.offset)) |
555 yield response.data.data | 555 expected_offset += len(response.data.data) |
| 556 yield response.data.data |
| 557 except grpc.RpcError as g: |
| 558 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
| 559 raise IOError(g) |
556 | 560 |
557 def push(self, item, push_state, content=None): | 561 def push(self, item, push_state, content=None): |
558 assert isinstance(item, Item) | 562 assert isinstance(item, Item) |
559 assert item.digest is not None | 563 assert item.digest is not None |
560 assert item.size is not None | 564 assert item.size is not None |
561 assert isinstance(push_state, _IsolateServerGrpcPushState) | 565 assert isinstance(push_state, _IsolateServerGrpcPushState) |
562 | 566 |
563 # Default to item.content(). | 567 # Default to item.content(). |
564 content = item.content() if content is None else content | 568 content = item.content() if content is None else content |
565 guard_memory_use(self, content, item.size) | 569 guard_memory_use(self, content, item.size) |
(...skipping 21 matching lines...) Expand all Loading... |
587 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) | 591 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |
588 request.data.data = chunk[:slice_len] | 592 request.data.data = chunk[:slice_len] |
589 yield request | 593 yield request |
590 has_sent_anything = True | 594 has_sent_anything = True |
591 request.data.offset += slice_len | 595 request.data.offset += slice_len |
592 # The proxy only expects the first chunk to have the digest | 596 # The proxy only expects the first chunk to have the digest |
593 request.data.ClearField("digest") | 597 request.data.ClearField("digest") |
594 chunk = chunk[slice_len:] | 598 chunk = chunk[slice_len:] |
595 | 599 |
596 # TODO(aludwin): batch up several requests to reuse TCP connections | 600 # TODO(aludwin): batch up several requests to reuse TCP connections |
597 response = self._stub.PushBlobs(slicer()) | 601 try: |
| 602 response = self._stub.PushBlobs(slicer()) |
| 603 except grpc.RpcError as g: |
| 604 logging.error('gRPC error during push: re-throwing as IOError (%s)' % g) |
| 605 raise IOError(g) |
| 606 |
598 if not response.status.succeeded: | 607 if not response.status.succeeded: |
599 raise IOError( | 608 raise IOError( |
600 'Error while uploading %s: %s' % ( | 609 'Error while uploading %s: %s' % ( |
601 item.digest, response.status.error_detail)) | 610 item.digest, response.status.error_detail)) |
602 | 611 |
603 finally: | 612 finally: |
604 with self._lock: | 613 with self._lock: |
605 self._memory_use -= item.size | 614 self._memory_use -= item.size |
606 | 615 |
607 def contains(self, items): | 616 def contains(self, items): |
608 """Returns the set of all missing items.""" | 617 """Returns the set of all missing items.""" |
609 # Ensure all items were initialized with 'prepare' call. Storage does that. | 618 # Ensure all items were initialized with 'prepare' call. Storage does that. |
610 assert all(i.digest is not None and i.size is not None for i in items) | 619 assert all(i.digest is not None and i.size is not None for i in items) |
611 request = isolate_bot_pb2.ContainsRequest() | 620 request = isolate_bot_pb2.ContainsRequest() |
612 items_by_digest = {} | 621 items_by_digest = {} |
613 for item in items: | 622 for item in items: |
614 cd = request.digest.add() | 623 cd = request.digest.add() |
615 cd.digest = binascii.unhexlify(item.digest) | 624 cd.digest = binascii.unhexlify(item.digest) |
616 items_by_digest[cd.digest] = item | 625 items_by_digest[cd.digest] = item |
617 response = self._stub.Contains(request) | 626 try: |
| 627 response = self._stub.Contains(request) |
| 628 except grpc.RpcError as g: |
| 629 logging.error('gRPC error during contains: re-throwing as IOError (%s)' |
| 630 % g) |
| 631 raise IOError(g) |
618 | 632 |
619 # If everything's present, return the empty set. | 633 # If everything's present, return the empty set. |
620 if response.status.succeeded: | 634 if response.status.succeeded: |
621 return {} | 635 return {} |
622 | 636 |
623 if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST: | 637 if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST: |
624 raise IOError('Unknown response during lookup: %s' % response.status) | 638 raise IOError('Unknown response during lookup: %s' % response.status) |
625 | 639 |
626 # Pick Items that are missing, attach _PushState to them. The gRPC | 640 # Pick Items that are missing, attach _PushState to them. The gRPC |
627 # implementation doesn't actually have a push state, we just attach | 641 # implementation doesn't actually have a push state, we just attach |
(...skipping 27 matching lines...) Expand all Loading... |
655 url: URL of isolate service to use shared cloud based storage. | 669 url: URL of isolate service to use shared cloud based storage. |
656 namespace: isolate namespace to operate in, also defines hashing and | 670 namespace: isolate namespace to operate in, also defines hashing and |
657 compression scheme used, i.e. namespace names that end with '-gzip' | 671 compression scheme used, i.e. namespace names that end with '-gzip' |
658 store compressed data. | 672 store compressed data. |
659 | 673 |
660 Returns: | 674 Returns: |
661 Instance of StorageApi subclass. | 675 Instance of StorageApi subclass. |
662 """ | 676 """ |
663 cls = _storage_api_cls or IsolateServer | 677 cls = _storage_api_cls or IsolateServer |
664 return cls(url, namespace) | 678 return cls(url, namespace) |
OLD | NEW |