Index: tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/transport.py |
diff --git a/tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/transport.py b/tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/transport.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..5d7e56492f84fb262b848ddc5ea174fdd277f8a8 |
--- /dev/null |
+++ b/tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/transport.py |
@@ -0,0 +1,412 @@ |
+#!/usr/bin/env python |
+# |
+# Copyright 2010 Google Inc. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+# |
+ |
+"""Transport library for ProtoRPC. |
+ |
+Contains underlying infrastructure used for communicating RPCs over low level |
+transports such as HTTP. |
+ |
+Includes HTTP transport built over urllib2. |
+""" |
+ |
+import six.moves.http_client |
+import logging |
+import os |
+import socket |
+import sys |
+import urlparse |
+ |
+from . import messages |
+from . import protobuf |
+from . import remote |
+from . import util |
+import six |
+ |
+__all__ = [ |
+ 'RpcStateError', |
+ |
+ 'HttpTransport', |
+ 'LocalTransport', |
+ 'Rpc', |
+ 'Transport', |
+] |
+ |
+ |
+class RpcStateError(messages.Error): |
+ """Raised when trying to put RPC in to an invalid state.""" |
+ |
+ |
+class Rpc(object): |
+ """Represents a client side RPC. |
+ |
+ An RPC is created by the transport class and is used with a single RPC. While |
+ an RPC is still in process, the response is set to None. When it is complete |
+ the response will contain the response message. |
+ """ |
+ |
+ def __init__(self, request): |
+ """Constructor. |
+ |
+ Args: |
+ request: Request associated with this RPC. |
+ """ |
+ self.__request = request |
+ self.__response = None |
+ self.__state = remote.RpcState.RUNNING |
+ self.__error_message = None |
+ self.__error_name = None |
+ |
+ @property |
+ def request(self): |
+ """Request associated with RPC.""" |
+ return self.__request |
+ |
+ @property |
+ def response(self): |
+ """Response associated with RPC.""" |
+ self.wait() |
+ self.__check_status() |
+ return self.__response |
+ |
+ @property |
+ def state(self): |
+ """State associated with RPC.""" |
+ return self.__state |
+ |
+ @property |
+ def error_message(self): |
+ """Error, if any, associated with RPC.""" |
+ self.wait() |
+ return self.__error_message |
+ |
+ @property |
+ def error_name(self): |
+ """Error name, if any, associated with RPC.""" |
+ self.wait() |
+ return self.__error_name |
+ |
+ def wait(self): |
+ """Wait for an RPC to finish.""" |
+ if self.__state == remote.RpcState.RUNNING: |
+ self._wait_impl() |
+ |
+ def _wait_impl(self): |
+ """Implementation for wait().""" |
+ raise NotImplementedError() |
+ |
+ def __check_status(self): |
+ error_class = remote.RpcError.from_state(self.__state) |
+ if error_class is not None: |
+ if error_class is remote.ApplicationError: |
+ raise error_class(self.__error_message, self.__error_name) |
+ else: |
+ raise error_class(self.__error_message) |
+ |
+ def __set_state(self, state, error_message=None, error_name=None): |
+ if self.__state != remote.RpcState.RUNNING: |
+ raise RpcStateError( |
+ 'RPC must be in RUNNING state to change to %s' % state) |
+ if state == remote.RpcState.RUNNING: |
+ raise RpcStateError('RPC is already in RUNNING state') |
+ self.__state = state |
+ self.__error_message = error_message |
+ self.__error_name = error_name |
+ |
+ def set_response(self, response): |
+ # TODO: Even more specific type checking. |
+ if not isinstance(response, messages.Message): |
+ raise TypeError('Expected Message type, received %r' % (response)) |
+ |
+ self.__response = response |
+ self.__set_state(remote.RpcState.OK) |
+ |
+ def set_status(self, status): |
+ status.check_initialized() |
+ self.__set_state(status.state, status.error_message, status.error_name) |
+ |
+ |
+class Transport(object): |
+ """Transport base class. |
+ |
+ Provides basic support for implementing a ProtoRPC transport such as one |
+ that can send and receive messages over HTTP. |
+ |
+ Implementations override _start_rpc. This method receives a RemoteInfo |
+ instance and a request Message. The transport is expected to set the rpc |
+ response or raise an exception before termination. |
+ """ |
+ |
+ @util.positional(1) |
+ def __init__(self, protocol=protobuf): |
+ """Constructor. |
+ |
+ Args: |
+ protocol: If string, will look up a protocol from the default Protocols |
+ instance by name. Can also be an instance of remote.ProtocolConfig. |
+ If neither, it must be an object that implements a protocol interface |
+ by implementing encode_message, decode_message and set CONTENT_TYPE. |
+ For example, the modules protobuf and protojson can be used directly. |
+ """ |
+ if isinstance(protocol, six.string_types): |
+ protocols = remote.Protocols.get_default() |
+ try: |
+ protocol = protocols.lookup_by_name(protocol) |
+ except KeyError: |
+ protocol = protocols.lookup_by_content_type(protocol) |
+ if isinstance(protocol, remote.ProtocolConfig): |
+ self.__protocol = protocol.protocol |
+ self.__protocol_config = protocol |
+ else: |
+ self.__protocol = protocol |
+ self.__protocol_config = remote.ProtocolConfig( |
+ protocol, 'default', default_content_type=protocol.CONTENT_TYPE) |
+ |
+ @property |
+ def protocol(self): |
+ """Protocol associated with this transport.""" |
+ return self.__protocol |
+ |
+ @property |
+ def protocol_config(self): |
+ """Protocol associated with this transport.""" |
+ return self.__protocol_config |
+ |
+ def send_rpc(self, remote_info, request): |
+ """Initiate sending an RPC over the transport. |
+ |
+ Args: |
+ remote_info: RemoteInfo instance describing remote method. |
+ request: Request message to send to service. |
+ |
+ Returns: |
+ An Rpc instance intialized with the request.. |
+ """ |
+ request.check_initialized() |
+ |
+ rpc = self._start_rpc(remote_info, request) |
+ |
+ return rpc |
+ |
+ def _start_rpc(self, remote_info, request): |
+ """Start a remote procedure call. |
+ |
+ Args: |
+ remote_info: RemoteInfo instance describing remote method. |
+ request: Request message to send to service. |
+ |
+ Returns: |
+ An Rpc instance initialized with the request. |
+ """ |
+ raise NotImplementedError() |
+ |
+ |
+class HttpTransport(Transport): |
+ """Transport for communicating with HTTP servers.""" |
+ |
+ @util.positional(2) |
+ def __init__(self, |
+ service_url, |
+ protocol=protobuf): |
+ """Constructor. |
+ |
+ Args: |
+ service_url: URL where the service is located. All communication via |
+ the transport will go to this URL. |
+ protocol: The protocol implementation. Must implement encode_message and |
+ decode_message. Can also be an instance of remote.ProtocolConfig. |
+ """ |
+ super(HttpTransport, self).__init__(protocol=protocol) |
+ self.__service_url = service_url |
+ |
+ def __get_rpc_status(self, response, content): |
+ """Get RPC status from HTTP response. |
+ |
+ Args: |
+ response: HTTPResponse object. |
+ content: Content read from HTTP response. |
+ |
+ Returns: |
+ RpcStatus object parsed from response, else an RpcStatus with a generic |
+ HTTP error. |
+ """ |
+ # Status above 400 may have RpcStatus content. |
+ if response.status >= 400: |
+ content_type = response.getheader('content-type') |
+ if content_type == self.protocol_config.default_content_type: |
+ try: |
+ rpc_status = self.protocol.decode_message(remote.RpcStatus, content) |
+ except Exception as decode_err: |
+ logging.warning( |
+ 'An error occurred trying to parse status: %s\n%s', |
+ str(decode_err), content) |
+ else: |
+ if rpc_status.is_initialized(): |
+ return rpc_status |
+ else: |
+ logging.warning( |
+ 'Body does not result in an initialized RpcStatus message:\n%s', |
+ content) |
+ |
+ # If no RpcStatus message present, attempt to forward any content. If empty |
+ # use standard error message. |
+ if not content.strip(): |
+ content = six.moves.http_client.responses.get(response.status, 'Unknown Error') |
+ return remote.RpcStatus(state=remote.RpcState.SERVER_ERROR, |
+ error_message='HTTP Error %s: %s' % ( |
+ response.status, content or 'Unknown Error')) |
+ |
+ def __set_response(self, remote_info, connection, rpc): |
+ """Set response on RPC. |
+ |
+ Sets response or status from HTTP request. Implements the wait method of |
+ Rpc instance. |
+ |
+ Args: |
+ remote_info: Remote info for invoked RPC. |
+ connection: HTTPConnection that is making request. |
+ rpc: Rpc instance. |
+ """ |
+ try: |
+ response = connection.getresponse() |
+ |
+ content = response.read() |
+ |
+ if response.status == six.moves.http_client.OK: |
+ response = self.protocol.decode_message(remote_info.response_type, |
+ content) |
+ rpc.set_response(response) |
+ else: |
+ status = self.__get_rpc_status(response, content) |
+ rpc.set_status(status) |
+ finally: |
+ connection.close() |
+ |
+ def _start_rpc(self, remote_info, request): |
+ """Start a remote procedure call. |
+ |
+ Args: |
+ remote_info: A RemoteInfo instance for this RPC. |
+ request: The request message for this RPC. |
+ |
+ Returns: |
+ An Rpc instance initialized with a Request. |
+ """ |
+ method_url = '%s.%s' % (self.__service_url, remote_info.method.__name__) |
+ encoded_request = self.protocol.encode_message(request) |
+ |
+ url = urlparse.urlparse(method_url) |
+ if url.scheme == 'https': |
+ connection_type = six.moves.http_client.HTTPSConnection |
+ else: |
+ connection_type = six.moves.http_client.HTTPConnection |
+ connection = connection_type(url.hostname, url.port) |
+ try: |
+ self._send_http_request(connection, url.path, encoded_request) |
+ rpc = Rpc(request) |
+ except remote.RpcError: |
+ # Pass through all ProtoRPC errors |
+ connection.close() |
+ raise |
+ except socket.error as err: |
+ connection.close() |
+ raise remote.NetworkError('Socket error: %s %r' % (type(err).__name__, |
+ err.args), |
+ err) |
+ except Exception as err: |
+ connection.close() |
+ raise remote.NetworkError('Error communicating with HTTP server', |
+ err) |
+ else: |
+ wait_impl = lambda: self.__set_response(remote_info, connection, rpc) |
+ rpc._wait_impl = wait_impl |
+ |
+ return rpc |
+ |
+ def _send_http_request(self, connection, http_path, encoded_request): |
+ connection.request( |
+ 'POST', |
+ http_path, |
+ encoded_request, |
+ headers={'Content-type': self.protocol_config.default_content_type, |
+ 'Content-length': len(encoded_request)}) |
+ |
+ |
+class LocalTransport(Transport): |
+ """Local transport that sends messages directly to services. |
+ |
+ Useful in tests or creating code that can work with either local or remote |
+ services. Using LocalTransport is preferrable to simply instantiating a |
+ single instance of a service and reusing it. The entire request process |
+ involves instantiating a new instance of a service, initializing it with |
+ request state and then invoking the remote method for every request. |
+ """ |
+ |
+ def __init__(self, service_factory): |
+ """Constructor. |
+ |
+ Args: |
+ service_factory: Service factory or class. |
+ """ |
+ super(LocalTransport, self).__init__() |
+ self.__service_class = getattr(service_factory, |
+ 'service_class', |
+ service_factory) |
+ self.__service_factory = service_factory |
+ |
+ @property |
+ def service_class(self): |
+ return self.__service_class |
+ |
+ @property |
+ def service_factory(self): |
+ return self.__service_factory |
+ |
+ def _start_rpc(self, remote_info, request): |
+ """Start a remote procedure call. |
+ |
+ Args: |
+ remote_info: RemoteInfo instance describing remote method. |
+ request: Request message to send to service. |
+ |
+ Returns: |
+ An Rpc instance initialized with the request. |
+ """ |
+ rpc = Rpc(request) |
+ def wait_impl(): |
+ instance = self.__service_factory() |
+ try: |
+ initalize_request_state = instance.initialize_request_state |
+ except AttributeError: |
+ pass |
+ else: |
+ host = six.text_type(os.uname()[1]) |
+ initalize_request_state(remote.RequestState(remote_host=host, |
+ remote_address=u'127.0.0.1', |
+ server_host=host, |
+ server_port=-1)) |
+ try: |
+ response = remote_info.method(instance, request) |
+ assert isinstance(response, remote_info.response_type) |
+ except remote.ApplicationError: |
+ raise |
+ except: |
+ exc_type, exc_value, traceback = sys.exc_info() |
+ message = 'Unexpected error %s: %s' % (exc_type.__name__, exc_value) |
+ six.reraise(remote.ServerError, message, traceback) |
+ rpc.set_response(response) |
+ rpc._wait_impl = wait_impl |
+ return rpc |