Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(342)

Side by Side Diff: client/isolate_storage.py

Issue 2599493002: Rethrow Isolate gRPC proxy exceptions as IOError (Closed)
Patch Set: Created 3 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | client/tests/isolate_storage_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2016 The LUCI Authors. All rights reserved. 2 # Copyright 2016 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file. 4 # that can be found in the LICENSE file.
5 5
6 """A low-level blob storage/retrieval interface to the Isolate server""" 6 """A low-level blob storage/retrieval interface to the Isolate server"""
7 7
8 import base64 8 import base64
9 import binascii 9 import binascii
10 import collections 10 import collections
(...skipping 524 matching lines...) Expand 10 before | Expand all | Expand 10 after
535 535
536 def fetch(self, digest, offset=0): 536 def fetch(self, digest, offset=0):
537 # The gRPC APIs only work with an offset of 0 537 # The gRPC APIs only work with an offset of 0
538 assert offset == 0 538 assert offset == 0
539 request = isolate_bot_pb2.FetchBlobsRequest() 539 request = isolate_bot_pb2.FetchBlobsRequest()
540 req_digest = request.digest.add() 540 req_digest = request.digest.add()
541 # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte 541 # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte
542 # array (like [0x01, 0x2a, 0xbc]). 542 # array (like [0x01, 0x2a, 0xbc]).
543 req_digest.digest = binascii.unhexlify(digest) 543 req_digest.digest = binascii.unhexlify(digest)
544 expected_offset = 0 544 expected_offset = 0
545 for response in self._stub.FetchBlobs(request, 545 try:
546 timeout=DOWNLOAD_READ_TIMEOUT): 546 for response in self._stub.FetchBlobs(request,
547 if not response.status.succeeded: 547 timeout=DOWNLOAD_READ_TIMEOUT):
548 raise IOError( 548 if not response.status.succeeded:
549 'Error while fetching %s: %s' % (digest, response.status)) 549 raise IOError(
550 if not expected_offset == response.data.offset: 550 'Error while fetching %s: %s' % (digest, response.status))
551 raise IOError( 551 if not expected_offset == response.data.offset:
552 'Error while fetching %s: expected offset %d, got %d' % ( 552 raise IOError(
553 digest, expected_offset, response.data.offset)) 553 'Error while fetching %s: expected offset %d, got %d' % (
554 expected_offset += len(response.data.data) 554 digest, expected_offset, response.data.offset))
555 yield response.data.data 555 expected_offset += len(response.data.data)
556 yield response.data.data
557 except grpc.RpcError as g:
558 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
559 raise IOError(g)
556 560
557 def push(self, item, push_state, content=None): 561 def push(self, item, push_state, content=None):
558 assert isinstance(item, Item) 562 assert isinstance(item, Item)
559 assert item.digest is not None 563 assert item.digest is not None
560 assert item.size is not None 564 assert item.size is not None
561 assert isinstance(push_state, _IsolateServerGrpcPushState) 565 assert isinstance(push_state, _IsolateServerGrpcPushState)
562 566
563 # Default to item.content(). 567 # Default to item.content().
564 content = item.content() if content is None else content 568 content = item.content() if content is None else content
565 guard_memory_use(self, content, item.size) 569 guard_memory_use(self, content, item.size)
(...skipping 21 matching lines...) Expand all
587 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) 591 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
588 request.data.data = chunk[:slice_len] 592 request.data.data = chunk[:slice_len]
589 yield request 593 yield request
590 has_sent_anything = True 594 has_sent_anything = True
591 request.data.offset += slice_len 595 request.data.offset += slice_len
592 # The proxy only expects the first chunk to have the digest 596 # The proxy only expects the first chunk to have the digest
593 request.data.ClearField("digest") 597 request.data.ClearField("digest")
594 chunk = chunk[slice_len:] 598 chunk = chunk[slice_len:]
595 599
596 # TODO(aludwin): batch up several requests to reuse TCP connections 600 # TODO(aludwin): batch up several requests to reuse TCP connections
597 response = self._stub.PushBlobs(slicer()) 601 try:
602 response = self._stub.PushBlobs(slicer())
603 except grpc.RpcError as g:
604 logging.error('gRPC error during push: re-throwing as IOError (%s)' % g)
605 raise IOError(g)
606
598 if not response.status.succeeded: 607 if not response.status.succeeded:
599 raise IOError( 608 raise IOError(
600 'Error while uploading %s: %s' % ( 609 'Error while uploading %s: %s' % (
601 item.digest, response.status.error_detail)) 610 item.digest, response.status.error_detail))
602 611
603 finally: 612 finally:
604 with self._lock: 613 with self._lock:
605 self._memory_use -= item.size 614 self._memory_use -= item.size
606 615
607 def contains(self, items): 616 def contains(self, items):
608 """Returns the set of all missing items.""" 617 """Returns the set of all missing items."""
609 # Ensure all items were initialized with 'prepare' call. Storage does that. 618 # Ensure all items were initialized with 'prepare' call. Storage does that.
610 assert all(i.digest is not None and i.size is not None for i in items) 619 assert all(i.digest is not None and i.size is not None for i in items)
611 request = isolate_bot_pb2.ContainsRequest() 620 request = isolate_bot_pb2.ContainsRequest()
612 items_by_digest = {} 621 items_by_digest = {}
613 for item in items: 622 for item in items:
614 cd = request.digest.add() 623 cd = request.digest.add()
615 cd.digest = binascii.unhexlify(item.digest) 624 cd.digest = binascii.unhexlify(item.digest)
616 items_by_digest[cd.digest] = item 625 items_by_digest[cd.digest] = item
617 response = self._stub.Contains(request) 626 try:
627 response = self._stub.Contains(request)
628 except grpc.RpcError as g:
629 logging.error('gRPC error during contains: re-throwing as IOError (%s)'
630 % g)
631 raise IOError(g)
618 632
619 # If everything's present, return the empty set. 633 # If everything's present, return the empty set.
620 if response.status.succeeded: 634 if response.status.succeeded:
621 return {} 635 return {}
622 636
623 if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST: 637 if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST:
624 raise IOError('Unknown response during lookup: %s' % response.status) 638 raise IOError('Unknown response during lookup: %s' % response.status)
625 639
626 # Pick Items that are missing, attach _PushState to them. The gRPC 640 # Pick Items that are missing, attach _PushState to them. The gRPC
627 # implementation doesn't actually have a push state, we just attach 641 # implementation doesn't actually have a push state, we just attach
(...skipping 27 matching lines...) Expand all
655 url: URL of isolate service to use shared cloud based storage. 669 url: URL of isolate service to use shared cloud based storage.
656 namespace: isolate namespace to operate in, also defines hashing and 670 namespace: isolate namespace to operate in, also defines hashing and
657 compression scheme used, i.e. namespace names that end with '-gzip' 671 compression scheme used, i.e. namespace names that end with '-gzip'
658 store compressed data. 672 store compressed data.
659 673
660 Returns: 674 Returns:
661 Instance of StorageApi subclass. 675 Instance of StorageApi subclass.
662 """ 676 """
663 cls = _storage_api_cls or IsolateServer 677 cls = _storage_api_cls or IsolateServer
664 return cls(url, namespace) 678 return cls(url, namespace)
OLDNEW
« no previous file with comments | « no previous file | client/tests/isolate_storage_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698