Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(138)

Side by Side Diff: client/isolate_storage.py

Issue 2987333002: Refactor all gRPC proxy code into a single class. (Closed)
Patch Set: Fix pylint errors Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2016 The LUCI Authors. All rights reserved. 2 # Copyright 2016 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file. 4 # that can be found in the LICENSE file.
5 5
6 """A low-level blob storage/retrieval interface to the Isolate server""" 6 """A low-level blob storage/retrieval interface to the Isolate server"""
7 7
8 import base64 8 import base64
9 import collections 9 import collections
10 import logging 10 import logging
11 import os 11 import os
12 import re 12 import re
13 import sys 13 import sys
14 import threading 14 import threading
15 import time 15 import time
16 import types 16 import types
17 import uuid 17 import uuid
18 18
19 from utils import file_path 19 from utils import file_path
20 from utils import net 20 from utils import net
21 21
22 import isolated_format 22 import isolated_format
23 23
24 # gRPC may not be installed on the worker machine. This is fine, as long as
25 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
26 # Full external requirements are: grpcio, certifi.
27 try: 24 try:
28 import grpc 25 import grpc # for error codes
29 from google import auth as google_auth 26 from utils import grpc_proxy
30 from google.auth.transport import grpc as google_auth_transport_grpc
31 from google.auth.transport import requests as google_auth_transport_requests
32 from proto import bytestream_pb2 27 from proto import bytestream_pb2
33 except ImportError as err: 28 except ImportError as err:
34 grpc = None 29 grpc = None
30 grpc_proxy = None
35 bytestream_pb2 = None 31 bytestream_pb2 = None
36 32
37 # If gRPC is installed, at least give a warning if certifi is not. This is not
38 # actually used anywhere in this module, but if certifi is missing,
39 # google.auth.transport will fail with
40 # https://stackoverflow.com/questions/24973326
41 certifi = None
42 if grpc is not None:
43 try:
44 import certifi
45 except ImportError:
46 # Could not import certifi; gRPC HTTPS connections may fail. This will be
47 # logged in IsolateServerGrpc.__init__, since the logger is not configured
48 # during the import time.
49 pass
50 33
51 # Chunk size to use when reading from network stream. 34 # Chunk size to use when reading from network stream.
52 NET_IO_FILE_CHUNK = 16 * 1024 35 NET_IO_FILE_CHUNK = 16 * 1024
53 36
54 37
55 # Read timeout in seconds for downloads from isolate storage. If there's no 38 # Read timeout in seconds for downloads from isolate storage. If there's no
56 # response from the server within this timeout whole download will be aborted. 39 # response from the server within this timeout whole download will be aborted.
57 DOWNLOAD_READ_TIMEOUT = 60 40 DOWNLOAD_READ_TIMEOUT = 60
58 41
59 42
(...skipping 466 matching lines...) Expand 10 before | Expand all | Expand 10 after
526 class IsolateServerGrpc(StorageApi): 509 class IsolateServerGrpc(StorageApi):
527 """StorageApi implementation that downloads and uploads to a gRPC service. 510 """StorageApi implementation that downloads and uploads to a gRPC service.
528 511
529 Limitations: only works for the default-gzip namespace, and with zero offsets 512 Limitations: only works for the default-gzip namespace, and with zero offsets
530 while fetching. 513 while fetching.
531 """ 514 """
532 515
533 def __init__(self, server, namespace, proxy): 516 def __init__(self, server, namespace, proxy):
534 super(IsolateServerGrpc, self).__init__() 517 super(IsolateServerGrpc, self).__init__()
535 logging.info('Using gRPC for Isolate') 518 logging.info('Using gRPC for Isolate')
536 if not certifi: 519 # Proxies only support the default-gzip namespace for now.
537 logging.warning( 520 # TODO(aludwin): support other namespaces if necessary
538 'Could not import certifi; gRPC HTTPS connections may fail') 521 assert namespace == 'default-gzip'
539 self._server = server 522 self._server = server
540 self._lock = threading.Lock() 523 self._lock = threading.Lock()
541 self._memory_use = 0 524 self._memory_use = 0
542 self._num_pushes = 0 525 self._num_pushes = 0
543 self._already_exists = 0 526 self._already_exists = 0
544 527 self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub)
545 # Proxies only support the default-gzip namespace for now.
546 # TODO(aludwin): support other namespaces if necessary
547 assert namespace == 'default-gzip'
548 self._namespace = namespace 528 self._namespace = namespace
549 529
550 # Make sure grpc was successfully imported
551 assert grpc
552 assert bytestream_pb2
553
554 roots = os.environ.get('ISOLATE_GRPC_PROXY_TLS_ROOTS')
555 overd = os.environ.get('ISOLATE_GRPC_PROXY_TLS_OVERRIDE')
556
557 # The "proxy" envvar must be of the form:
558 # http[s]://<server>[:port][/prefix]
559 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
560 if not m:
561 raise ValueError(('gRPC proxy must have the form: '
562 'http[s]://<server>[:port][/prefix] '
563 '(given: %s)') % proxy)
564 transport = m.group(1)
565 host = m.group(2)
566 prefix = m.group(3)
567 if not prefix.endswith('/'):
568 prefix = prefix + '/'
569 logging.info('gRPC proxy: transport %s, host %s, prefix %s',
570 transport, host, prefix)
571 self._prefix = prefix
572
573 if transport == 'http':
574 self._channel = grpc.insecure_channel(host)
575 elif transport == 'https':
576 # Using cloud container builder scopes for testing:
577 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
578 credentials, _ = google_auth.default(scopes=scopes)
579 request = google_auth_transport_requests.Request()
580 options = ()
581 root_certs = None
582 if roots is not None:
583 logging.info('Using root CA %s', roots)
584 with open(roots) as f:
585 root_certs = f.read()
586 if overd is not None:
587 logging.info('Using TLS server override %s', overd)
588 options=(('grpc.ssl_target_name_override', overd),)
589 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
590 self._channel = google_auth_transport_grpc.secure_authorized_channel(
591 credentials, request, host, ssl_creds, options=options)
592 else:
593 raise ValueError('unknown transport %s (should be http[s])' % transport)
594 self._stub = bytestream_pb2.ByteStreamStub(self._channel)
595 530
596 @property 531 @property
597 def location(self): 532 def location(self):
598 return self._server 533 return self._server
599 534
600 @property 535 @property
601 def namespace(self): 536 def namespace(self):
602 return self._namespace 537 return self._namespace
603 538
604 @property 539 @property
605 def internal_compression(self): 540 def internal_compression(self):
606 # gRPC natively compresses all messages before transmission. 541 # gRPC natively compresses all messages before transmission.
607 return True 542 return True
608 543
609 def fetch(self, digest, offset=0): 544 def fetch(self, digest, offset=0):
610 # The gRPC APIs only work with an offset of 0 545 # The gRPC APIs only work with an offset of 0
611 assert offset == 0 546 assert offset == 0
612 request = bytestream_pb2.ReadRequest() 547 request = bytestream_pb2.ReadRequest()
613 #TODO(aludwin): send the expected size of the item 548 #TODO(aludwin): send the expected size of the item
614 request.resource_name = '%sblobs/%s/0' % ( 549 request.resource_name = '%s/blobs/%s/0' % (
615 self._prefix, digest) 550 self._proxy.prefix, digest)
616 try: 551 try:
617 for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT): 552 for response in self._proxy.get_stream('Read', request):
618 yield response.data 553 yield response.data
619 except grpc.RpcError as g: 554 except grpc.RpcError as g:
620 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) 555 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
621 raise IOError(g) 556 raise IOError(g)
622 557
623 def push(self, item, push_state, content=None): 558 def push(self, item, push_state, content=None):
624 assert isinstance(item, Item) 559 assert isinstance(item, Item)
625 assert item.digest is not None 560 assert item.digest is not None
626 assert item.size is not None 561 assert item.size is not None
627 assert isinstance(push_state, _IsolateServerGrpcPushState) 562 assert isinstance(push_state, _IsolateServerGrpcPushState)
(...skipping 10 matching lines...) Expand all
638 or not isinstance(content, collections.Iterable)): 573 or not isinstance(content, collections.Iterable)):
639 yield content 574 yield content
640 else: 575 else:
641 for chunk in content: 576 for chunk in content:
642 yield chunk 577 yield chunk
643 def slicer(): 578 def slicer():
644 # Ensures every bit of content is under the gRPC max size; yields 579 # Ensures every bit of content is under the gRPC max size; yields
645 # proto messages to send via gRPC. 580 # proto messages to send via gRPC.
646 request = bytestream_pb2.WriteRequest() 581 request = bytestream_pb2.WriteRequest()
647 u = uuid.uuid4() 582 u = uuid.uuid4()
648 request.resource_name = '%suploads/%s/blobs/%s/%d' % ( 583 request.resource_name = '%s/uploads/%s/blobs/%s/%d' % (
649 self._prefix, u, item.digest, item.size) 584 self._proxy.prefix, u, item.digest, item.size)
650 request.write_offset = 0 585 request.write_offset = 0
651 for chunk in chunker(): 586 for chunk in chunker():
652 # Make sure we send at least one chunk for zero-length blobs 587 # Make sure we send at least one chunk for zero-length blobs
653 has_sent_anything = False 588 has_sent_anything = False
654 while chunk or not has_sent_anything: 589 while chunk or not has_sent_anything:
655 has_sent_anything = True 590 has_sent_anything = True
656 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) 591 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
657 request.data = chunk[:slice_len] 592 request.data = chunk[:slice_len]
658 if request.write_offset + slice_len == item.size: 593 if request.write_offset + slice_len == item.size:
659 request.finish_write = True 594 request.finish_write = True
660 yield request 595 yield request
661 request.write_offset += slice_len 596 request.write_offset += slice_len
662 chunk = chunk[slice_len:] 597 chunk = chunk[slice_len:]
663 598
664 response = None 599 response = None
665 try: 600 try:
666 response = self._stub.Write(slicer()) 601 response = self._proxy.call_no_retries('Write', slicer())
667 except grpc.RpcError as r: 602 except grpc.RpcError as r:
668 if r.code() == grpc.StatusCode.ALREADY_EXISTS: 603 if r.code() == grpc.StatusCode.ALREADY_EXISTS:
669 # This is legit - we didn't check before we pushed so no problem if 604 # This is legit - we didn't check before we pushed so no problem if
670 # it's already there. 605 # it's already there.
671 self._already_exists += 1 606 self._already_exists += 1
672 if self._already_exists % 100 == 0: 607 if self._already_exists % 100 == 0:
673 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % ( 608 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
674 self._already_exists, self._num_pushes, 609 self._already_exists, self._num_pushes,
675 100.0 * self._already_exists / self._num_pushes)) 610 100.0 * self._already_exists / self._num_pushes))
676 else: 611 else:
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
722 namespace: isolate namespace to operate in, also defines hashing and 657 namespace: isolate namespace to operate in, also defines hashing and
723 compression scheme used, i.e. namespace names that end with '-gzip' 658 compression scheme used, i.e. namespace names that end with '-gzip'
724 store compressed data. 659 store compressed data.
725 660
726 Returns: 661 Returns:
727 Instance of StorageApi subclass. 662 Instance of StorageApi subclass.
728 """ 663 """
729 if _grpc_proxy is not None: 664 if _grpc_proxy is not None:
730 return IsolateServerGrpc(url, namespace, _grpc_proxy) 665 return IsolateServerGrpc(url, namespace, _grpc_proxy)
731 return IsolateServer(url, namespace) 666 return IsolateServer(url, namespace)
OLDNEW
« no previous file with comments | « appengine/swarming/swarming_bot/bot_code/remote_client_grpc_test.py ('k') | client/utils/grpc_proxy.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698