| OLD | NEW | 
|    1 #!/usr/bin/env python |    1 #!/usr/bin/env python | 
|    2 # Copyright 2016 The LUCI Authors. All rights reserved. |    2 # Copyright 2016 The LUCI Authors. All rights reserved. | 
|    3 # Use of this source code is governed under the Apache License, Version 2.0 |    3 # Use of this source code is governed under the Apache License, Version 2.0 | 
|    4 # that can be found in the LICENSE file. |    4 # that can be found in the LICENSE file. | 
|    5  |    5  | 
|    6 """A low-level blob storage/retrieval interface to the Isolate server""" |    6 """A low-level blob storage/retrieval interface to the Isolate server""" | 
|    7  |    7  | 
|    8 import base64 |    8 import base64 | 
|    9 import binascii |    9 import binascii | 
|   10 import collections |   10 import collections | 
| (...skipping 524 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
|  535  |  535  | 
|  536   def fetch(self, digest, offset=0): |  536   def fetch(self, digest, offset=0): | 
|  537     # The gRPC APIs only work with an offset of 0 |  537     # The gRPC APIs only work with an offset of 0 | 
|  538     assert offset == 0 |  538     assert offset == 0 | 
|  539     request = isolate_bot_pb2.FetchBlobsRequest() |  539     request = isolate_bot_pb2.FetchBlobsRequest() | 
|  540     req_digest = request.digest.add() |  540     req_digest = request.digest.add() | 
|  541     # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte |  541     # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte | 
|  542     # array (like [0x01, 0x2a, 0xbc]). |  542     # array (like [0x01, 0x2a, 0xbc]). | 
|  543     req_digest.digest = binascii.unhexlify(digest) |  543     req_digest.digest = binascii.unhexlify(digest) | 
|  544     expected_offset = 0 |  544     expected_offset = 0 | 
|  545     for response in self._stub.FetchBlobs(request, |  545     try: | 
|  546                                           timeout=DOWNLOAD_READ_TIMEOUT): |  546       for response in self._stub.FetchBlobs(request, | 
|  547       if not response.status.succeeded: |  547                                             timeout=DOWNLOAD_READ_TIMEOUT): | 
|  548         raise IOError( |  548         if not response.status.succeeded: | 
|  549             'Error while fetching %s: %s' % (digest, response.status)) |  549           raise IOError( | 
|  550       if not expected_offset == response.data.offset: |  550               'Error while fetching %s: %s' % (digest, response.status)) | 
|  551         raise IOError( |  551         if not expected_offset == response.data.offset: | 
|  552             'Error while fetching %s: expected offset %d, got %d' % ( |  552           raise IOError( | 
|  553                 digest, expected_offset, response.data.offset)) |  553               'Error while fetching %s: expected offset %d, got %d' % ( | 
|  554       expected_offset += len(response.data.data) |  554                   digest, expected_offset, response.data.offset)) | 
|  555       yield response.data.data |  555         expected_offset += len(response.data.data) | 
 |  556         yield response.data.data | 
 |  557     except grpc.RpcError as g: | 
 |  558       logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) | 
 |  559       raise IOError(g) | 
|  556  |  560  | 
|  557   def push(self, item, push_state, content=None): |  561   def push(self, item, push_state, content=None): | 
|  558     assert isinstance(item, Item) |  562     assert isinstance(item, Item) | 
|  559     assert item.digest is not None |  563     assert item.digest is not None | 
|  560     assert item.size is not None |  564     assert item.size is not None | 
|  561     assert isinstance(push_state, _IsolateServerGrpcPushState) |  565     assert isinstance(push_state, _IsolateServerGrpcPushState) | 
|  562  |  566  | 
|  563     # Default to item.content(). |  567     # Default to item.content(). | 
|  564     content = item.content() if content is None else content |  568     content = item.content() if content is None else content | 
|  565     guard_memory_use(self, content, item.size) |  569     guard_memory_use(self, content, item.size) | 
| (...skipping 21 matching lines...) Expand all  Loading... | 
|  587             slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |  591             slice_len = min(len(chunk), NET_IO_FILE_CHUNK) | 
|  588             request.data.data = chunk[:slice_len] |  592             request.data.data = chunk[:slice_len] | 
|  589             yield request |  593             yield request | 
|  590             has_sent_anything = True |  594             has_sent_anything = True | 
|  591             request.data.offset += slice_len |  595             request.data.offset += slice_len | 
|  592             # The proxy only expects the first chunk to have the digest |  596             # The proxy only expects the first chunk to have the digest | 
|  593             request.data.ClearField("digest") |  597             request.data.ClearField("digest") | 
|  594             chunk = chunk[slice_len:] |  598             chunk = chunk[slice_len:] | 
|  595  |  599  | 
|  596       # TODO(aludwin): batch up several requests to reuse TCP connections |  600       # TODO(aludwin): batch up several requests to reuse TCP connections | 
|  597       response = self._stub.PushBlobs(slicer()) |  601       try: | 
 |  602         response = self._stub.PushBlobs(slicer()) | 
 |  603       except grpc.RpcError as g: | 
 |  604         logging.error('gRPC error during push: re-throwing as IOError (%s)' % g) | 
 |  605         raise IOError(g) | 
 |  606  | 
|  598       if not response.status.succeeded: |  607       if not response.status.succeeded: | 
|  599         raise IOError( |  608         raise IOError( | 
|  600             'Error while uploading %s: %s' % ( |  609             'Error while uploading %s: %s' % ( | 
|  601                 item.digest, response.status.error_detail)) |  610                 item.digest, response.status.error_detail)) | 
|  602  |  611  | 
|  603     finally: |  612     finally: | 
|  604       with self._lock: |  613       with self._lock: | 
|  605         self._memory_use -= item.size |  614         self._memory_use -= item.size | 
|  606  |  615  | 
|  607   def contains(self, items): |  616   def contains(self, items): | 
|  608     """Returns the set of all missing items.""" |  617     """Returns the set of all missing items.""" | 
|  609     # Ensure all items were initialized with 'prepare' call. Storage does that. |  618     # Ensure all items were initialized with 'prepare' call. Storage does that. | 
|  610     assert all(i.digest is not None and i.size is not None for i in items) |  619     assert all(i.digest is not None and i.size is not None for i in items) | 
|  611     request = isolate_bot_pb2.ContainsRequest() |  620     request = isolate_bot_pb2.ContainsRequest() | 
|  612     items_by_digest = {} |  621     items_by_digest = {} | 
|  613     for item in items: |  622     for item in items: | 
|  614       cd = request.digest.add() |  623       cd = request.digest.add() | 
|  615       cd.digest = binascii.unhexlify(item.digest) |  624       cd.digest = binascii.unhexlify(item.digest) | 
|  616       items_by_digest[cd.digest] = item |  625       items_by_digest[cd.digest] = item | 
|  617     response = self._stub.Contains(request) |  626     try: | 
 |  627       response = self._stub.Contains(request) | 
 |  628     except grpc.RpcError as g: | 
 |  629       logging.error('gRPC error during contains: re-throwing as IOError (%s)' | 
 |  630                     % g) | 
 |  631       raise IOError(g) | 
|  618  |  632  | 
|  619     # If everything's present, return the empty set. |  633     # If everything's present, return the empty set. | 
|  620     if response.status.succeeded: |  634     if response.status.succeeded: | 
|  621       return {} |  635       return {} | 
|  622  |  636  | 
|  623     if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST: |  637     if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST: | 
|  624       raise IOError('Unknown response during lookup: %s' % response.status) |  638       raise IOError('Unknown response during lookup: %s' % response.status) | 
|  625  |  639  | 
|  626     # Pick Items that are missing, attach _PushState to them. The gRPC |  640     # Pick Items that are missing, attach _PushState to them. The gRPC | 
|  627     # implementation doesn't actually have a push state, we just attach |  641     # implementation doesn't actually have a push state, we just attach | 
| (...skipping 27 matching lines...) Expand all  Loading... | 
|  655     url: URL of isolate service to use shared cloud based storage. |  669     url: URL of isolate service to use shared cloud based storage. | 
|  656     namespace: isolate namespace to operate in, also defines hashing and |  670     namespace: isolate namespace to operate in, also defines hashing and | 
|  657         compression scheme used, i.e. namespace names that end with '-gzip' |  671         compression scheme used, i.e. namespace names that end with '-gzip' | 
|  658         store compressed data. |  672         store compressed data. | 
|  659  |  673  | 
|  660   Returns: |  674   Returns: | 
|  661     Instance of StorageApi subclass. |  675     Instance of StorageApi subclass. | 
|  662   """ |  676   """ | 
|  663   cls = _storage_api_cls or IsolateServer |  677   cls = _storage_api_cls or IsolateServer | 
|  664   return cls(url, namespace) |  678   return cls(url, namespace) | 
| OLD | NEW |