Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(515)

Side by Side Diff: client/isolate_storage.py

Issue 2953253003: Replace custom blob gRPC API with ByteStream (Closed)
Patch Set: Minor cleanups Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2016 The LUCI Authors. All rights reserved. 2 # Copyright 2016 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file. 4 # that can be found in the LICENSE file.
5 5
6 """A low-level blob storage/retrieval interface to the Isolate server""" 6 """A low-level blob storage/retrieval interface to the Isolate server"""
7 7
8 import base64 8 import base64
9 import binascii 9 import binascii
10 import collections 10 import collections
11 import logging 11 import logging
12 import os
12 import re 13 import re
13 import sys 14 import sys
14 import threading 15 import threading
15 import time 16 import time
16 import types 17 import types
18 import uuid
17 19
18 from utils import file_path 20 from utils import file_path
19 from utils import net 21 from utils import net
20 22
21 import isolated_format 23 import isolated_format
22 24
23 # gRPC may not be installed on the worker machine. This is fine, as long as 25 # gRPC may not be installed on the worker machine. This is fine, as long as
24 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). 26 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
27 # Full external requirements are: grpcio, certifi.
25 try: 28 try:
26 import grpc 29 import grpc
27 from proto import isolate_bot_pb2 30 from google import auth as google_auth
28 except ImportError: 31 from google.auth.transport import grpc as google_auth_transport_grpc
32 from google.auth.transport import requests as google_auth_transport_requests
33 from proto import bytestream_pb2
34 except ImportError as err:
29 grpc = None 35 grpc = None
30 isolate_bot_pb2 = None 36 bytestream_pb2 = None
31 37
38 # If gRPC is installed, at least give a warning if certifi is not.
39 if grpc is not None:
40 try:
41 import certifi
Vadim Sh. 2017/06/27 19:12:49 where is this used?
aludwin 2017/06/27 19:23:06 It's conditionally imported by google.auth.transpo
Vadim Sh. 2017/06/27 19:27:09 Yeah, leave a comment.
aludwin 2017/06/27 20:10:17 Done.
42 except ImportError as err:
43 logging.warning('could not import certifi; gRPC HTTPS connections may fail')
32 44
33 # Chunk size to use when reading from network stream. 45 # Chunk size to use when reading from network stream.
34 NET_IO_FILE_CHUNK = 16 * 1024 46 NET_IO_FILE_CHUNK = 16 * 1024
35 47
36 48
37 # Read timeout in seconds for downloads from isolate storage. If there's no 49 # Read timeout in seconds for downloads from isolate storage. If there's no
38 # response from the server within this timeout whole download will be aborted. 50 # response from the server within this timeout whole download will be aborted.
39 DOWNLOAD_READ_TIMEOUT = 60 51 DOWNLOAD_READ_TIMEOUT = 60
40 52
41 53
(...skipping 457 matching lines...) Expand 10 before | Expand all | Expand 10 after
499 class IsolateServerGrpc(StorageApi): 511 class IsolateServerGrpc(StorageApi):
500 """StorageApi implementation that downloads and uploads to a gRPC service. 512 """StorageApi implementation that downloads and uploads to a gRPC service.
501 513
502 Limitations: only works for the default-gzip namespace, and with zero offsets 514 Limitations: only works for the default-gzip namespace, and with zero offsets
503 while fetching. 515 while fetching.
504 """ 516 """
505 517
506 def __init__(self, server, namespace): 518 def __init__(self, server, namespace):
507 super(IsolateServerGrpc, self).__init__() 519 super(IsolateServerGrpc, self).__init__()
508 logging.info('Using gRPC for Isolate') 520 logging.info('Using gRPC for Isolate')
521 self._server = server
522 self._lock = threading.Lock()
523 self._memory_use = 0
524 self._num_pushes = 0
525 self._already_exists = 0
509 526
510 # Make sure grpc was successfully imported 527 # Make sure grpc was successfully imported
511 assert grpc 528 assert grpc
512 assert isolate_bot_pb2 529 assert bytestream_pb2
530 # Proxies only support the default-gzip namespace for now.
531 # TODO(aludwin): support other namespaces if necessary
532 assert namespace == 'default-gzip'
513 533
514 # Proxies only support the default-gzip namespace for now. 534 proxy = os.environ.get('ISOLATED_GRPC_PROXY', '')
515 # TODO(aludwin): support other namespaces 535 roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS')
516 assert namespace == 'default-gzip' 536 overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE')
517 self._server = server 537
518 self._channel = grpc.insecure_channel(server) 538 # The "proxy" envvar must be of the form:
519 self._stub = isolate_bot_pb2.FileServiceStub(self._channel) 539 # http[s]://<server>[:port][/prefix]
520 self._lock = threading.Lock() 540 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
521 self._memory_use = 0 541 if not m:
522 logging.info('...gRPC successfully initialized') 542 raise ValueError(('gRPC proxy must have the form: '
543 'http[s]://<server>[:port][/prefix] '
544 '(given: %s)') % proxy)
545 transport = m.group(1)
546 host = m.group(2)
547 prefix = m.group(3)
548 if not prefix.endswith('/'):
549 prefix = prefix + '/'
550 logging.info('gRPC proxy: transport %s, host %s, prefix %s',
551 transport, host, prefix)
552 self._prefix = prefix
553
554 if transport == 'http':
555 self._channel = grpc.insecure_channel(host)
556 elif transport == 'https':
557 # Using cloud container builder scopes for testing:
558 scopes = ('https://www.googleapis.com/auth/cloud-build-service',)
559 credentials, _ = google_auth.default(scopes=scopes)
Vadim Sh. 2017/06/27 19:12:49 If you are requiring configured Application Defaul
560 request = google_auth_transport_requests.Request()
561 options = ()
562 root_certs = None
563 if roots is not None:
564 logging.info('Using root CA %s', roots)
565 with open(roots) as f:
566 root_certs = f.read()
567 if overd is not None:
568 logging.info('Using TLS server override %s', overd)
569 options=(('grpc.ssl_target_name_override', overd),)
570 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
571 self._channel = google_auth_transport_grpc.secure_authorized_channel(
572 credentials, request, host, ssl_creds, options=options)
573 else:
574 raise ValueError('unknown transport %s (should be http[s])' % transport)
575 self._stub = bytestream_pb2.ByteStreamStub(self._channel)
Vadim Sh. 2017/06/27 19:12:49 if this object thread safe? 'fetch' and 'push' wil
aludwin 2017/06/27 20:10:17 Yup: https://github.com/grpc/grpc/issues/9320
523 576
524 @property 577 @property
525 def location(self): 578 def location(self):
526 return self._server 579 return self._server
527 580
528 @property 581 @property
529 def namespace(self): 582 def namespace(self):
530 # This is used to determine if the data is compressed, but gRPC proxies 583 # This is used to determine if the data is compressed, but gRPC proxies
531 # don't have concepts of 'namespaces' and natively compress all messages 584 # don't have concepts of 'namespaces' and natively compress all messages
532 # before transmission. So return an unlikely-to-be-used name so that 585 # before transmission. So return an unlikely-to-be-used name so that
533 # isolateserver doesn't try to compress anything. 586 # isolateserver doesn't try to compress anything.
534 return 'grpc-proxy' 587 return 'grpc-proxy'
535 588
536 def fetch(self, digest, offset=0): 589 def fetch(self, digest, offset=0):
537 # The gRPC APIs only work with an offset of 0 590 # The gRPC APIs only work with an offset of 0
538 assert offset == 0 591 assert offset == 0
539 request = isolate_bot_pb2.FetchBlobsRequest() 592 request = bytestream_pb2.ReadRequest()
540 req_digest = request.digest.add() 593 #TODO(aludwin): send the expected size of the item
541 # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte 594 request.resource_name = '%sblobs/%s/0' % (
542 # array (like [0x01, 0x2a, 0xbc]). 595 self._prefix, digest)
543 req_digest.digest = binascii.unhexlify(digest)
544 expected_offset = 0
545 try: 596 try:
546 for response in self._stub.FetchBlobs(request, 597 for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT):
547 timeout=DOWNLOAD_READ_TIMEOUT): 598 yield response.data
548 if not response.status.succeeded:
549 raise IOError(
550 'Error while fetching %s: %s' % (digest, response.status))
551 if not expected_offset == response.data.offset:
552 raise IOError(
553 'Error while fetching %s: expected offset %d, got %d' % (
554 digest, expected_offset, response.data.offset))
555 expected_offset += len(response.data.data)
556 yield response.data.data
557 except grpc.RpcError as g: 599 except grpc.RpcError as g:
558 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) 600 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
559 raise IOError(g) 601 raise IOError(g)
560 602
561 def push(self, item, push_state, content=None): 603 def push(self, item, push_state, content=None):
562 assert isinstance(item, Item) 604 assert isinstance(item, Item)
563 assert item.digest is not None 605 assert item.digest is not None
564 assert item.size is not None 606 assert item.size is not None
565 assert isinstance(push_state, _IsolateServerGrpcPushState) 607 assert isinstance(push_state, _IsolateServerGrpcPushState)
566 608
567 # Default to item.content(). 609 # Default to item.content().
568 content = item.content() if content is None else content 610 content = item.content() if content is None else content
569 guard_memory_use(self, content, item.size) 611 guard_memory_use(self, content, item.size)
612 self._num_pushes += 1
570 613
571 try: 614 try:
572 def chunker(): 615 def chunker():
573 # Returns one bit of content at a time 616 # Returns one bit of content at a time
574 if (isinstance(content, str) 617 if (isinstance(content, str)
575 or not isinstance(content, collections.Iterable)): 618 or not isinstance(content, collections.Iterable)):
576 yield content 619 yield content
577 else: 620 else:
578 for chunk in content: 621 for chunk in content:
579 yield chunk 622 yield chunk
580 def slicer(): 623 def slicer():
581 # Ensures every bit of content is under the gRPC max size; yields 624 # Ensures every bit of content is under the gRPC max size; yields
582 # proto messages to send via gRPC. 625 # proto messages to send via gRPC.
583 request = isolate_bot_pb2.PushBlobsRequest() 626 request = bytestream_pb2.WriteRequest()
584 request.data.digest.digest = binascii.unhexlify(item.digest) 627 u = uuid.uuid4()
585 request.data.digest.size_bytes = item.size 628 request.resource_name = '%suploads/%s/blobs/%s/%d' % (
586 request.data.offset = 0 629 self._prefix, u, item.digest, item.size)
630 request.write_offset = 0
587 for chunk in chunker(): 631 for chunk in chunker():
588 # Make sure we send at least one chunk for zero-length blobs 632 # Make sure we send at least one chunk for zero-length blobs
589 has_sent_anything = False 633 has_sent_anything = False
590 while chunk or not has_sent_anything: 634 while chunk or not has_sent_anything:
635 has_sent_anything = True
591 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) 636 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
592 request.data.data = chunk[:slice_len] 637 request.data = chunk[:slice_len]
638 if request.write_offset + slice_len == item.size:
639 request.finish_write = True
593 yield request 640 yield request
594 has_sent_anything = True 641 request.write_offset += slice_len
595 request.data.offset += slice_len
596 # The proxy only expects the first chunk to have the digest
597 request.data.ClearField("digest")
598 chunk = chunk[slice_len:] 642 chunk = chunk[slice_len:]
599 643
600 # TODO(aludwin): batch up several requests to reuse TCP connections
601 try: 644 try:
602 response = self._stub.PushBlobs(slicer()) 645 response = self._stub.Write(slicer())
603 except grpc.RpcError as g: 646 except grpc.Call as c:
604 logging.error('gRPC error during push: re-throwing as IOError (%s)' % g) 647 # You might think that errors from gRPC would be rpc.RpcError. You'd
605 raise IOError(g) 648 # be... right... but it's *also* an instance of grpc.Call, and that's
649 # where the status code actually lives.
650 if c.code() == grpc.StatusCode.ALREADY_EXISTS:
651 # This is legit - we didn't check before we pushed so no problem if
652 # it's already there.
653 self._already_exists += 1
654 if self._already_exists % 100 == 0:
655 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
656 self._already_exists, self._num_pushes,
657 100.0 * self._already_exists / self._num_pushes))
658 else:
659 logging.error('gRPC error during push: throwing as IOError (%s)' % c)
660 raise IOError(c)
661 except Exception as e:
662 logging.error('error during push: throwing as IOError (%s)' % e)
Vadim Sh. 2017/06/27 19:12:49 nit: use 2 spaces for indentation
Vadim Sh. 2017/06/27 21:14:29 Ping. It is 4 spaces now.
aludwin 2017/06/28 13:23:53 Done.
663 raise IOError(e)
606 664
607 if not response.status.succeeded: 665 if response.committed_size != item.size:
608 raise IOError( 666 raise IOError('%s/%d: incorrect size written (%d)' % (
609 'Error while uploading %s: %s' % ( 667 item.digest, item.size, response.committed_size))
610 item.digest, response.status.error_detail))
611 668
612 finally: 669 finally:
613 with self._lock: 670 with self._lock:
614 self._memory_use -= item.size 671 self._memory_use -= item.size
615 672
616 def contains(self, items): 673 def contains(self, items):
617 """Returns the set of all missing items.""" 674 """Returns the set of all missing items."""
675 # TODO(aludwin): this isn't supported directly in Bytestream, so for now
676 # assume that nothing is present in the cache.
618 # Ensure all items were initialized with 'prepare' call. Storage does that. 677 # Ensure all items were initialized with 'prepare' call. Storage does that.
619 assert all(i.digest is not None and i.size is not None for i in items) 678 assert all(i.digest is not None and i.size is not None for i in items)
620 request = isolate_bot_pb2.ContainsRequest() 679 # Assume all Items are missing, and attach _PushState to them. The gRPC
621 items_by_digest = {} 680 # implementation doesn't actually have a push state, we just attach empty
681 # objects to satisfy the StorageApi interface.
682 missing_items = {}
622 for item in items: 683 for item in items:
623 cd = request.digest.add()
624 cd.digest = binascii.unhexlify(item.digest)
625 items_by_digest[cd.digest] = item
626 try:
627 response = self._stub.Contains(request)
628 except grpc.RpcError as g:
629 logging.error('gRPC error during contains: re-throwing as IOError (%s)'
630 % g)
631 raise IOError(g)
632
633 # If everything's present, return the empty set.
634 if response.status.succeeded:
635 return {}
636
637 if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST:
638 raise IOError('Unknown response during lookup: %s' % response.status)
639
640 # Pick Items that are missing, attach _PushState to them. The gRPC
641 # implementation doesn't actually have a push state, we just attach
642 # empty objects to satisfy the StorageApi interface.
643 missing_items = {}
644 for missing in response.status.missing_digest:
645 item = items_by_digest[missing.digest]
646 missing_items[item] = _IsolateServerGrpcPushState() 684 missing_items[item] = _IsolateServerGrpcPushState()
647
648 logging.info('Queried %d files, %d cache hit',
649 len(items), len(items) - len(missing_items))
650 return missing_items 685 return missing_items
651 686
652 687
653 def set_storage_api_class(cls): 688 def set_storage_api_class(cls):
654 """Replaces StorageApi implementation used by default.""" 689 """Replaces StorageApi implementation used by default."""
655 global _storage_api_cls 690 global _storage_api_cls
656 assert _storage_api_cls is None 691 assert _storage_api_cls is None
657 assert issubclass(cls, StorageApi) 692 assert issubclass(cls, StorageApi)
658 _storage_api_cls = cls 693 _storage_api_cls = cls
659 694
660 695
661 def get_storage_api(url, namespace): 696 def get_storage_api(url, namespace):
662 """Returns an object that implements low-level StorageApi interface. 697 """Returns an object that implements low-level StorageApi interface.
663 698
664 It is used by Storage to work with single isolate |namespace|. It should 699 It is used by Storage to work with single isolate |namespace|. It should
665 rarely be used directly by clients, see 'get_storage' for 700 rarely be used directly by clients, see 'get_storage' for
666 a better alternative. 701 a better alternative.
667 702
668 Arguments: 703 Arguments:
669 url: URL of isolate service to use shared cloud based storage. 704 url: URL of isolate service to use shared cloud based storage.
670 namespace: isolate namespace to operate in, also defines hashing and 705 namespace: isolate namespace to operate in, also defines hashing and
671 compression scheme used, i.e. namespace names that end with '-gzip' 706 compression scheme used, i.e. namespace names that end with '-gzip'
672 store compressed data. 707 store compressed data.
673 708
674 Returns: 709 Returns:
675 Instance of StorageApi subclass. 710 Instance of StorageApi subclass.
676 """ 711 """
677 cls = _storage_api_cls or IsolateServer 712 cls = _storage_api_cls or IsolateServer
678 return cls(url, namespace) 713 return cls(url, namespace)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698