Index: mojo/public/python/mojo_bindings/messaging.py |
diff --git a/mojo/public/python/mojo_bindings/messaging.py b/mojo/public/python/mojo_bindings/messaging.py |
deleted file mode 100644 |
index 385a08099b7a338a1194d812c77a0ce7e85179a7..0000000000000000000000000000000000000000 |
--- a/mojo/public/python/mojo_bindings/messaging.py |
+++ /dev/null |
@@ -1,407 +0,0 @@ |
-# Copyright 2014 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
- |
-"""Utility classes to handle sending and receiving messages.""" |
- |
- |
-import struct |
-import sys |
-import weakref |
- |
-import mojo_bindings.serialization as serialization |
- |
-# pylint: disable=E0611,F0401 |
-import mojo_system as system |
- |
- |
-# The flag values for a message header. |
-NO_FLAG = 0 |
-MESSAGE_EXPECTS_RESPONSE_FLAG = 1 << 0 |
-MESSAGE_IS_RESPONSE_FLAG = 1 << 1 |
- |
- |
-class MessagingException(Exception): |
- def __init__(self, *args, **kwargs): |
- Exception.__init__(self, *args, **kwargs) |
- self.__traceback__ = sys.exc_info()[2] |
- |
- |
-class MessageHeader(object): |
- """The header of a mojo message.""" |
- |
- _SIMPLE_MESSAGE_NUM_FIELDS = 2 |
- _SIMPLE_MESSAGE_STRUCT = struct.Struct("<IIII") |
- |
- _REQUEST_ID_STRUCT = struct.Struct("<Q") |
- _REQUEST_ID_OFFSET = _SIMPLE_MESSAGE_STRUCT.size |
- |
- _MESSAGE_WITH_REQUEST_ID_NUM_FIELDS = 3 |
- _MESSAGE_WITH_REQUEST_ID_SIZE = ( |
- _SIMPLE_MESSAGE_STRUCT.size + _REQUEST_ID_STRUCT.size) |
- |
- def __init__(self, message_type, flags, request_id=0, data=None): |
- self._message_type = message_type |
- self._flags = flags |
- self._request_id = request_id |
- self._data = data |
- |
- @classmethod |
- def Deserialize(cls, data): |
- buf = buffer(data) |
- if len(data) < cls._SIMPLE_MESSAGE_STRUCT.size: |
- raise serialization.DeserializationException('Header is too short.') |
- (size, version, message_type, flags) = ( |
- cls._SIMPLE_MESSAGE_STRUCT.unpack_from(buf)) |
- if (version < cls._SIMPLE_MESSAGE_NUM_FIELDS): |
- raise serialization.DeserializationException('Incorrect version.') |
- request_id = 0 |
- if _HasRequestId(flags): |
- if version < cls._MESSAGE_WITH_REQUEST_ID_NUM_FIELDS: |
- raise serialization.DeserializationException('Incorrect version.') |
- if (size < cls._MESSAGE_WITH_REQUEST_ID_SIZE or |
- len(data) < cls._MESSAGE_WITH_REQUEST_ID_SIZE): |
- raise serialization.DeserializationException('Header is too short.') |
- (request_id, ) = cls._REQUEST_ID_STRUCT.unpack_from( |
- buf, cls._REQUEST_ID_OFFSET) |
- return MessageHeader(message_type, flags, request_id, data) |
- |
- @property |
- def message_type(self): |
- return self._message_type |
- |
- # pylint: disable=E0202 |
- @property |
- def request_id(self): |
- assert self.has_request_id |
- return self._request_id |
- |
- # pylint: disable=E0202 |
- @request_id.setter |
- def request_id(self, request_id): |
- assert self.has_request_id |
- self._request_id = request_id |
- self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET, |
- request_id) |
- |
- @property |
- def has_request_id(self): |
- return _HasRequestId(self._flags) |
- |
- @property |
- def expects_response(self): |
- return self._HasFlag(MESSAGE_EXPECTS_RESPONSE_FLAG) |
- |
- @property |
- def is_response(self): |
- return self._HasFlag(MESSAGE_IS_RESPONSE_FLAG) |
- |
- @property |
- def size(self): |
- if self.has_request_id: |
- return self._MESSAGE_WITH_REQUEST_ID_SIZE |
- return self._SIMPLE_MESSAGE_STRUCT.size |
- |
- def Serialize(self): |
- if not self._data: |
- self._data = bytearray(self.size) |
- version = self._SIMPLE_MESSAGE_NUM_FIELDS |
- size = self._SIMPLE_MESSAGE_STRUCT.size |
- if self.has_request_id: |
- version = self._MESSAGE_WITH_REQUEST_ID_NUM_FIELDS |
- size = self._MESSAGE_WITH_REQUEST_ID_SIZE |
- self._SIMPLE_MESSAGE_STRUCT.pack_into(self._data, 0, size, version, |
- self._message_type, self._flags) |
- if self.has_request_id: |
- self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET, |
- self._request_id) |
- return self._data |
- |
- def _HasFlag(self, flag): |
- return self._flags & flag != 0 |
- |
- |
-class Message(object): |
- """A message for a message pipe. This contains data and handles.""" |
- |
- def __init__(self, data=None, handles=None, header=None): |
- self.data = data |
- self.handles = handles |
- self._header = header |
- self._payload = None |
- |
- @property |
- def header(self): |
- if self._header is None: |
- self._header = MessageHeader.Deserialize(self.data) |
- return self._header |
- |
- @property |
- def payload(self): |
- if self._payload is None: |
- self._payload = Message(self.data[self.header.size:], self.handles) |
- return self._payload |
- |
- def SetRequestId(self, request_id): |
- header = self.header |
- header.request_id = request_id |
- (data, _) = header.Serialize() |
- self.data[:header.Size] = data[:header.Size] |
- |
- |
-class MessageReceiver(object): |
- """A class which implements this interface can receive Message objects.""" |
- |
- def Accept(self, message): |
- """ |
- Receive a Message. The MessageReceiver is allowed to mutate the message. |
- |
- Args: |
- message: the received message. |
- |
- Returns: |
- True if the message has been handled, False otherwise. |
- """ |
- raise NotImplementedError() |
- |
- |
-class MessageReceiverWithResponder(MessageReceiver): |
- """ |
- A MessageReceiver that can also handle the response message generated from the |
- given message. |
- """ |
- |
- def AcceptWithResponder(self, message, responder): |
- """ |
- A variant on Accept that registers a MessageReceiver (known as the |
- responder) to handle the response message generated from the given message. |
- The responder's Accept method may be called as part of the call to |
- AcceptWithResponder, or some time after its return. |
- |
- Args: |
- message: the received message. |
- responder: the responder that will receive the response. |
- |
- Returns: |
- True if the message has been handled, False otherwise. |
- """ |
- raise NotImplementedError() |
- |
- |
-class ConnectionErrorHandler(object): |
- """ |
- A ConnectionErrorHandler is notified of an error happening while using the |
- bindings over message pipes. |
- """ |
- |
- def OnError(self, result): |
- raise NotImplementedError() |
- |
- |
-class Connector(MessageReceiver): |
- """ |
- A Connector owns a message pipe and will send any received messages to the |
- registered MessageReceiver. It also acts as a MessageReceiver and will send |
- any message through the handle. |
- |
- The method Start must be called before the Connector will start listening to |
- incoming messages. |
- """ |
- |
- def __init__(self, handle): |
- MessageReceiver.__init__(self) |
- self._handle = handle |
- self._cancellable = None |
- self._incoming_message_receiver = None |
- self._error_handler = None |
- |
- def __del__(self): |
- if self._cancellable: |
- self._cancellable() |
- |
- def SetIncomingMessageReceiver(self, message_receiver): |
- """ |
- Set the MessageReceiver that will receive message from the owned message |
- pipe. |
- """ |
- self._incoming_message_receiver = message_receiver |
- |
- def SetErrorHandler(self, error_handler): |
- """ |
- Set the ConnectionErrorHandler that will be notified of errors on the owned |
- message pipe. |
- """ |
- self._error_handler = error_handler |
- |
- def Start(self): |
- assert not self._cancellable |
- self._RegisterAsyncWaiterForRead() |
- |
- def Accept(self, message): |
- result = self._handle.WriteMessage(message.data, message.handles) |
- return result == system.RESULT_OK |
- |
- def Close(self): |
- if self._cancellable: |
- self._cancellable() |
- self._cancellable = None |
- self._handle.Close() |
- |
- def PassMessagePipe(self): |
- if self._cancellable: |
- self._cancellable() |
- self._cancellable = None |
- result = self._handle |
- self._handle = system.Handle() |
- return result |
- |
- def _OnAsyncWaiterResult(self, result): |
- self._cancellable = None |
- if result == system.RESULT_OK: |
- self._ReadOutstandingMessages() |
- else: |
- self._OnError(result) |
- |
- def _OnError(self, result): |
- assert not self._cancellable |
- if self._error_handler: |
- self._error_handler.OnError(result) |
- self._handle.Close() |
- |
- def _RegisterAsyncWaiterForRead(self) : |
- assert not self._cancellable |
- self._cancellable = self._handle.AsyncWait( |
- system.HANDLE_SIGNAL_READABLE, |
- system.DEADLINE_INDEFINITE, |
- _WeakCallback(self._OnAsyncWaiterResult)) |
- |
- def _ReadOutstandingMessages(self): |
- result = system.RESULT_OK |
- while result == system.RESULT_OK: |
- result = _ReadAndDispatchMessage(self._handle, |
- self._incoming_message_receiver) |
- if result == system.RESULT_SHOULD_WAIT: |
- self._RegisterAsyncWaiterForRead() |
- return |
- self._OnError(result) |
- |
- |
-class Router(MessageReceiverWithResponder): |
- """ |
- A Router will handle mojo message and forward those to a Connector. It deals |
- with parsing of headers and adding of request ids in order to be able to match |
- a response to a request. |
- """ |
- |
- def __init__(self, handle): |
- MessageReceiverWithResponder.__init__(self) |
- self._incoming_message_receiver = None |
- self._next_request_id = 1 |
- self._responders = {} |
- self._connector = Connector(handle) |
- self._connector.SetIncomingMessageReceiver( |
- ForwardingMessageReceiver(_WeakCallback(self._HandleIncomingMessage))) |
- |
- def Start(self): |
- self._connector.Start() |
- |
- def SetIncomingMessageReceiver(self, message_receiver): |
- """ |
- Set the MessageReceiver that will receive message from the owned message |
- pipe. |
- """ |
- self._incoming_message_receiver = message_receiver |
- |
- def SetErrorHandler(self, error_handler): |
- """ |
- Set the ConnectionErrorHandler that will be notified of errors on the owned |
- message pipe. |
- """ |
- self._connector.SetErrorHandler(error_handler) |
- |
- def Accept(self, message): |
- # A message without responder is directly forwarded to the connector. |
- return self._connector.Accept(message) |
- |
- def AcceptWithResponder(self, message, responder): |
- # The message must have a header. |
- header = message.header |
- assert header.expects_response |
- request_id = self._NextRequestId() |
- header.request_id = request_id |
- if not self._connector.Accept(message): |
- return False |
- self._responders[request_id] = responder |
- return True |
- |
- def Close(self): |
- self._connector.Close() |
- |
- def PassMessagePipe(self): |
- return self._connector.PassMessagePipe() |
- |
- def _HandleIncomingMessage(self, message): |
- header = message.header |
- if header.expects_response: |
- if self._incoming_message_receiver: |
- return self._incoming_message_receiver.AcceptWithResponder( |
- message, self) |
- # If we receive a request expecting a response when the client is not |
- # listening, then we have no choice but to tear down the pipe. |
- self.Close() |
- return False |
- if header.is_response: |
- request_id = header.request_id |
- responder = self._responders.pop(request_id, None) |
- if responder is None: |
- return False |
- return responder.Accept(message) |
- if self._incoming_message_receiver: |
- return self._incoming_message_receiver.Accept(message) |
- # Ok to drop the message |
- return False |
- |
- def _NextRequestId(self): |
- request_id = self._next_request_id |
- while request_id == 0 or request_id in self._responders: |
- request_id = (request_id + 1) % (1 << 64) |
- self._next_request_id = (request_id + 1) % (1 << 64) |
- return request_id |
- |
-class ForwardingMessageReceiver(MessageReceiver): |
- """A MessageReceiver that forward calls to |Accept| to a callable.""" |
- |
- def __init__(self, callback): |
- MessageReceiver.__init__(self) |
- self._callback = callback |
- |
- def Accept(self, message): |
- return self._callback(message) |
- |
- |
-def _WeakCallback(callback): |
- func = callback.im_func |
- self = callback.im_self |
- if not self: |
- return callback |
- weak_self = weakref.ref(self) |
- def Callback(*args, **kwargs): |
- self = weak_self() |
- if self: |
- return func(self, *args, **kwargs) |
- return Callback |
- |
- |
-def _ReadAndDispatchMessage(handle, message_receiver): |
- (result, _, sizes) = handle.ReadMessage() |
- if result == system.RESULT_OK and message_receiver: |
- message_receiver.Accept(Message(bytearray(), [])) |
- if result != system.RESULT_RESOURCE_EXHAUSTED: |
- return result |
- (result, data, _) = handle.ReadMessage(bytearray(sizes[0]), sizes[1]) |
- if result == system.RESULT_OK and message_receiver: |
- message_receiver.Accept(Message(data[0], data[1])) |
- return result |
- |
-def _HasRequestId(flags): |
- return flags & (MESSAGE_EXPECTS_RESPONSE_FLAG|MESSAGE_IS_RESPONSE_FLAG) != 0 |