Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1292)

Unified Diff: tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/transport.py

Issue 1264873003: Add gsutil/third_party to telemetry/third_party/gsutilz/third_party. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Remove httplib2 Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/transport.py
diff --git a/tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/transport.py b/tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/transport.py
new file mode 100755
index 0000000000000000000000000000000000000000..5d7e56492f84fb262b848ddc5ea174fdd277f8a8
--- /dev/null
+++ b/tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/transport.py
@@ -0,0 +1,412 @@
+#!/usr/bin/env python
+#
+# Copyright 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Transport library for ProtoRPC.
+
+Contains underlying infrastructure used for communicating RPCs over low level
+transports such as HTTP.
+
+Includes HTTP transport built over urllib2.
+"""
+
+import six.moves.http_client
+import logging
+import os
+import socket
+import sys
+import urlparse
+
+from . import messages
+from . import protobuf
+from . import remote
+from . import util
+import six
+
+__all__ = [
+ 'RpcStateError',
+
+ 'HttpTransport',
+ 'LocalTransport',
+ 'Rpc',
+ 'Transport',
+]
+
+
+class RpcStateError(messages.Error):
+ """Raised when trying to put RPC in to an invalid state."""
+
+
+class Rpc(object):
+ """Represents a client side RPC.
+
+ An RPC is created by the transport class and is used with a single RPC. While
+ an RPC is still in process, the response is set to None. When it is complete
+ the response will contain the response message.
+ """
+
+ def __init__(self, request):
+ """Constructor.
+
+ Args:
+ request: Request associated with this RPC.
+ """
+ self.__request = request
+ self.__response = None
+ self.__state = remote.RpcState.RUNNING
+ self.__error_message = None
+ self.__error_name = None
+
+ @property
+ def request(self):
+ """Request associated with RPC."""
+ return self.__request
+
+ @property
+ def response(self):
+ """Response associated with RPC."""
+ self.wait()
+ self.__check_status()
+ return self.__response
+
+ @property
+ def state(self):
+ """State associated with RPC."""
+ return self.__state
+
+ @property
+ def error_message(self):
+ """Error, if any, associated with RPC."""
+ self.wait()
+ return self.__error_message
+
+ @property
+ def error_name(self):
+ """Error name, if any, associated with RPC."""
+ self.wait()
+ return self.__error_name
+
+ def wait(self):
+ """Wait for an RPC to finish."""
+ if self.__state == remote.RpcState.RUNNING:
+ self._wait_impl()
+
+ def _wait_impl(self):
+ """Implementation for wait()."""
+ raise NotImplementedError()
+
+ def __check_status(self):
+ error_class = remote.RpcError.from_state(self.__state)
+ if error_class is not None:
+ if error_class is remote.ApplicationError:
+ raise error_class(self.__error_message, self.__error_name)
+ else:
+ raise error_class(self.__error_message)
+
+ def __set_state(self, state, error_message=None, error_name=None):
+ if self.__state != remote.RpcState.RUNNING:
+ raise RpcStateError(
+ 'RPC must be in RUNNING state to change to %s' % state)
+ if state == remote.RpcState.RUNNING:
+ raise RpcStateError('RPC is already in RUNNING state')
+ self.__state = state
+ self.__error_message = error_message
+ self.__error_name = error_name
+
+ def set_response(self, response):
+ # TODO: Even more specific type checking.
+ if not isinstance(response, messages.Message):
+ raise TypeError('Expected Message type, received %r' % (response))
+
+ self.__response = response
+ self.__set_state(remote.RpcState.OK)
+
+ def set_status(self, status):
+ status.check_initialized()
+ self.__set_state(status.state, status.error_message, status.error_name)
+
+
+class Transport(object):
+ """Transport base class.
+
+ Provides basic support for implementing a ProtoRPC transport such as one
+ that can send and receive messages over HTTP.
+
+ Implementations override _start_rpc. This method receives a RemoteInfo
+ instance and a request Message. The transport is expected to set the rpc
+ response or raise an exception before termination.
+ """
+
+ @util.positional(1)
+ def __init__(self, protocol=protobuf):
+ """Constructor.
+
+ Args:
+ protocol: If string, will look up a protocol from the default Protocols
+ instance by name. Can also be an instance of remote.ProtocolConfig.
+ If neither, it must be an object that implements a protocol interface
+ by implementing encode_message, decode_message and set CONTENT_TYPE.
+ For example, the modules protobuf and protojson can be used directly.
+ """
+ if isinstance(protocol, six.string_types):
+ protocols = remote.Protocols.get_default()
+ try:
+ protocol = protocols.lookup_by_name(protocol)
+ except KeyError:
+ protocol = protocols.lookup_by_content_type(protocol)
+ if isinstance(protocol, remote.ProtocolConfig):
+ self.__protocol = protocol.protocol
+ self.__protocol_config = protocol
+ else:
+ self.__protocol = protocol
+ self.__protocol_config = remote.ProtocolConfig(
+ protocol, 'default', default_content_type=protocol.CONTENT_TYPE)
+
+ @property
+ def protocol(self):
+ """Protocol associated with this transport."""
+ return self.__protocol
+
+ @property
+ def protocol_config(self):
+ """Protocol associated with this transport."""
+ return self.__protocol_config
+
+ def send_rpc(self, remote_info, request):
+ """Initiate sending an RPC over the transport.
+
+ Args:
+ remote_info: RemoteInfo instance describing remote method.
+ request: Request message to send to service.
+
+ Returns:
+ An Rpc instance intialized with the request..
+ """
+ request.check_initialized()
+
+ rpc = self._start_rpc(remote_info, request)
+
+ return rpc
+
+ def _start_rpc(self, remote_info, request):
+ """Start a remote procedure call.
+
+ Args:
+ remote_info: RemoteInfo instance describing remote method.
+ request: Request message to send to service.
+
+ Returns:
+ An Rpc instance initialized with the request.
+ """
+ raise NotImplementedError()
+
+
+class HttpTransport(Transport):
+ """Transport for communicating with HTTP servers."""
+
+ @util.positional(2)
+ def __init__(self,
+ service_url,
+ protocol=protobuf):
+ """Constructor.
+
+ Args:
+ service_url: URL where the service is located. All communication via
+ the transport will go to this URL.
+ protocol: The protocol implementation. Must implement encode_message and
+ decode_message. Can also be an instance of remote.ProtocolConfig.
+ """
+ super(HttpTransport, self).__init__(protocol=protocol)
+ self.__service_url = service_url
+
+ def __get_rpc_status(self, response, content):
+ """Get RPC status from HTTP response.
+
+ Args:
+ response: HTTPResponse object.
+ content: Content read from HTTP response.
+
+ Returns:
+ RpcStatus object parsed from response, else an RpcStatus with a generic
+ HTTP error.
+ """
+ # Status above 400 may have RpcStatus content.
+ if response.status >= 400:
+ content_type = response.getheader('content-type')
+ if content_type == self.protocol_config.default_content_type:
+ try:
+ rpc_status = self.protocol.decode_message(remote.RpcStatus, content)
+ except Exception as decode_err:
+ logging.warning(
+ 'An error occurred trying to parse status: %s\n%s',
+ str(decode_err), content)
+ else:
+ if rpc_status.is_initialized():
+ return rpc_status
+ else:
+ logging.warning(
+ 'Body does not result in an initialized RpcStatus message:\n%s',
+ content)
+
+ # If no RpcStatus message present, attempt to forward any content. If empty
+ # use standard error message.
+ if not content.strip():
+ content = six.moves.http_client.responses.get(response.status, 'Unknown Error')
+ return remote.RpcStatus(state=remote.RpcState.SERVER_ERROR,
+ error_message='HTTP Error %s: %s' % (
+ response.status, content or 'Unknown Error'))
+
+ def __set_response(self, remote_info, connection, rpc):
+ """Set response on RPC.
+
+ Sets response or status from HTTP request. Implements the wait method of
+ Rpc instance.
+
+ Args:
+ remote_info: Remote info for invoked RPC.
+ connection: HTTPConnection that is making request.
+ rpc: Rpc instance.
+ """
+ try:
+ response = connection.getresponse()
+
+ content = response.read()
+
+ if response.status == six.moves.http_client.OK:
+ response = self.protocol.decode_message(remote_info.response_type,
+ content)
+ rpc.set_response(response)
+ else:
+ status = self.__get_rpc_status(response, content)
+ rpc.set_status(status)
+ finally:
+ connection.close()
+
+ def _start_rpc(self, remote_info, request):
+ """Start a remote procedure call.
+
+ Args:
+ remote_info: A RemoteInfo instance for this RPC.
+ request: The request message for this RPC.
+
+ Returns:
+ An Rpc instance initialized with a Request.
+ """
+ method_url = '%s.%s' % (self.__service_url, remote_info.method.__name__)
+ encoded_request = self.protocol.encode_message(request)
+
+ url = urlparse.urlparse(method_url)
+ if url.scheme == 'https':
+ connection_type = six.moves.http_client.HTTPSConnection
+ else:
+ connection_type = six.moves.http_client.HTTPConnection
+ connection = connection_type(url.hostname, url.port)
+ try:
+ self._send_http_request(connection, url.path, encoded_request)
+ rpc = Rpc(request)
+ except remote.RpcError:
+ # Pass through all ProtoRPC errors
+ connection.close()
+ raise
+ except socket.error as err:
+ connection.close()
+ raise remote.NetworkError('Socket error: %s %r' % (type(err).__name__,
+ err.args),
+ err)
+ except Exception as err:
+ connection.close()
+ raise remote.NetworkError('Error communicating with HTTP server',
+ err)
+ else:
+ wait_impl = lambda: self.__set_response(remote_info, connection, rpc)
+ rpc._wait_impl = wait_impl
+
+ return rpc
+
+ def _send_http_request(self, connection, http_path, encoded_request):
+ connection.request(
+ 'POST',
+ http_path,
+ encoded_request,
+ headers={'Content-type': self.protocol_config.default_content_type,
+ 'Content-length': len(encoded_request)})
+
+
+class LocalTransport(Transport):
+ """Local transport that sends messages directly to services.
+
+ Useful in tests or creating code that can work with either local or remote
+ services. Using LocalTransport is preferrable to simply instantiating a
+ single instance of a service and reusing it. The entire request process
+ involves instantiating a new instance of a service, initializing it with
+ request state and then invoking the remote method for every request.
+ """
+
+ def __init__(self, service_factory):
+ """Constructor.
+
+ Args:
+ service_factory: Service factory or class.
+ """
+ super(LocalTransport, self).__init__()
+ self.__service_class = getattr(service_factory,
+ 'service_class',
+ service_factory)
+ self.__service_factory = service_factory
+
+ @property
+ def service_class(self):
+ return self.__service_class
+
+ @property
+ def service_factory(self):
+ return self.__service_factory
+
+ def _start_rpc(self, remote_info, request):
+ """Start a remote procedure call.
+
+ Args:
+ remote_info: RemoteInfo instance describing remote method.
+ request: Request message to send to service.
+
+ Returns:
+ An Rpc instance initialized with the request.
+ """
+ rpc = Rpc(request)
+ def wait_impl():
+ instance = self.__service_factory()
+ try:
+ initalize_request_state = instance.initialize_request_state
+ except AttributeError:
+ pass
+ else:
+ host = six.text_type(os.uname()[1])
+ initalize_request_state(remote.RequestState(remote_host=host,
+ remote_address=u'127.0.0.1',
+ server_host=host,
+ server_port=-1))
+ try:
+ response = remote_info.method(instance, request)
+ assert isinstance(response, remote_info.response_type)
+ except remote.ApplicationError:
+ raise
+ except:
+ exc_type, exc_value, traceback = sys.exc_info()
+ message = 'Unexpected error %s: %s' % (exc_type.__name__, exc_value)
+ six.reraise(remote.ServerError, message, traceback)
+ rpc.set_response(response)
+ rpc._wait_impl = wait_impl
+ return rpc

Powered by Google App Engine
This is Rietveld 408576698