Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(13)

Unified Diff: mojo/public/python/mojo_bindings/messaging.py

Issue 2250183003: Make the fuchsia mojo/public repo the source of truth. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/public/python/mojo_bindings/messaging.py
diff --git a/mojo/public/python/mojo_bindings/messaging.py b/mojo/public/python/mojo_bindings/messaging.py
deleted file mode 100644
index c2742cc2a0e91afa9e8ad2a374a6dc121aa1bd9c..0000000000000000000000000000000000000000
--- a/mojo/public/python/mojo_bindings/messaging.py
+++ /dev/null
@@ -1,409 +0,0 @@
-# Copyright 2014 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Utility classes to handle sending and receiving messages."""
-
-
-import struct
-import sys
-import weakref
-
-import mojo_bindings.serialization as serialization
-
-# pylint: disable=E0611,F0401
-import mojo_system as system
-
-
-# The flag values for a message header.
-NO_FLAG = 0
-MESSAGE_EXPECTS_RESPONSE_FLAG = 1 << 0
-MESSAGE_IS_RESPONSE_FLAG = 1 << 1
-
-
-class MessagingException(Exception):
- def __init__(self, *args, **kwargs):
- Exception.__init__(self, *args, **kwargs)
- self.__traceback__ = sys.exc_info()[2]
-
-
-class MessageHeader(object):
- """The header of a mojo message."""
-
- _SIMPLE_MESSAGE_VERSION = 0
- _SIMPLE_MESSAGE_STRUCT = struct.Struct("<IIII")
-
- _REQUEST_ID_STRUCT = struct.Struct("<Q")
- _REQUEST_ID_OFFSET = _SIMPLE_MESSAGE_STRUCT.size
-
- _MESSAGE_WITH_REQUEST_ID_VERSION = 1
- _MESSAGE_WITH_REQUEST_ID_SIZE = (
- _SIMPLE_MESSAGE_STRUCT.size + _REQUEST_ID_STRUCT.size)
-
- def __init__(self, message_type, flags, request_id=0, data=None):
- self._message_type = message_type
- self._flags = flags
- self._request_id = request_id
- self._data = data
-
- @classmethod
- def Deserialize(cls, data):
- buf = buffer(data)
- if len(data) < cls._SIMPLE_MESSAGE_STRUCT.size:
- raise serialization.DeserializationException('Header is too short.')
- (size, version, message_type, flags) = (
- cls._SIMPLE_MESSAGE_STRUCT.unpack_from(buf))
- if (version < cls._SIMPLE_MESSAGE_VERSION):
- raise serialization.DeserializationException('Incorrect version.')
- request_id = 0
- if _HasRequestId(flags):
- if version < cls._MESSAGE_WITH_REQUEST_ID_VERSION:
- raise serialization.DeserializationException('Incorrect version.')
- if (size < cls._MESSAGE_WITH_REQUEST_ID_SIZE or
- len(data) < cls._MESSAGE_WITH_REQUEST_ID_SIZE):
- raise serialization.DeserializationException('Header is too short.')
- (request_id, ) = cls._REQUEST_ID_STRUCT.unpack_from(
- buf, cls._REQUEST_ID_OFFSET)
- return MessageHeader(message_type, flags, request_id, data)
-
- @property
- def message_type(self):
- return self._message_type
-
- # pylint: disable=E0202
- @property
- def request_id(self):
- assert self.has_request_id
- return self._request_id
-
- # pylint: disable=E0202
- @request_id.setter
- def request_id(self, request_id):
- assert self.has_request_id
- self._request_id = request_id
- self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET,
- request_id)
-
- @property
- def has_request_id(self):
- return _HasRequestId(self._flags)
-
- @property
- def expects_response(self):
- return self._HasFlag(MESSAGE_EXPECTS_RESPONSE_FLAG)
-
- @property
- def is_response(self):
- return self._HasFlag(MESSAGE_IS_RESPONSE_FLAG)
-
- @property
- def size(self):
- if self.has_request_id:
- return self._MESSAGE_WITH_REQUEST_ID_SIZE
- return self._SIMPLE_MESSAGE_STRUCT.size
-
- def Serialize(self):
- if not self._data:
- self._data = bytearray(self.size)
- version = self._SIMPLE_MESSAGE_VERSION
- size = self._SIMPLE_MESSAGE_STRUCT.size
- if self.has_request_id:
- version = self._MESSAGE_WITH_REQUEST_ID_VERSION
- size = self._MESSAGE_WITH_REQUEST_ID_SIZE
- self._SIMPLE_MESSAGE_STRUCT.pack_into(self._data, 0, size, version,
- self._message_type, self._flags)
- if self.has_request_id:
- self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET,
- self._request_id)
- return self._data
-
- def _HasFlag(self, flag):
- return self._flags & flag != 0
-
-
-class Message(object):
- """A message for a message pipe. This contains data and handles."""
-
- def __init__(self, data=None, handles=None, header=None):
- self.data = data
- self.handles = handles
- self._header = header
- self._payload = None
-
- @property
- def header(self):
- if self._header is None:
- self._header = MessageHeader.Deserialize(self.data)
- return self._header
-
- @property
- def payload(self):
- if self._payload is None:
- self._payload = Message(self.data[self.header.size:], self.handles)
- return self._payload
-
- def SetRequestId(self, request_id):
- header = self.header
- header.request_id = request_id
- (data, _) = header.Serialize()
- self.data[:header.Size] = data[:header.Size]
-
-
-class MessageReceiver(object):
- """A class which implements this interface can receive Message objects."""
-
- def Accept(self, message):
- """
- Receive a Message. The MessageReceiver is allowed to mutate the message.
-
- Args:
- message: the received message.
-
- Returns:
- True if the message has been handled, False otherwise.
- """
- raise NotImplementedError()
-
-
-class MessageReceiverWithResponder(MessageReceiver):
- """
- A MessageReceiver that can also handle the response message generated from the
- given message.
- """
-
- def AcceptWithResponder(self, message, responder):
- """
- A variant on Accept that registers a MessageReceiver (known as the
- responder) to handle the response message generated from the given message.
- The responder's Accept method may be called as part of the call to
- AcceptWithResponder, or some time after its return.
-
- Args:
- message: the received message.
- responder: the responder that will receive the response.
-
- Returns:
- True if the message has been handled, False otherwise.
- """
- raise NotImplementedError()
-
-
-class ConnectionErrorHandler(object):
- """
- A ConnectionErrorHandler is notified of an error happening while using the
- bindings over message pipes.
- """
-
- def OnError(self, result):
- raise NotImplementedError()
-
-
-class Connector(MessageReceiver):
- """
- A Connector owns a message pipe and will send any received messages to the
- registered MessageReceiver. It also acts as a MessageReceiver and will send
- any message through the handle.
-
- The method Start must be called before the Connector will start listening to
- incoming messages.
- """
-
- def __init__(self, handle):
- MessageReceiver.__init__(self)
- self._handle = handle
- self._cancellable = None
- self._incoming_message_receiver = None
- self._error_handler = None
-
- def __del__(self):
- if self._cancellable:
- self._cancellable()
-
- def SetIncomingMessageReceiver(self, message_receiver):
- """
- Set the MessageReceiver that will receive message from the owned message
- pipe.
- """
- self._incoming_message_receiver = message_receiver
-
- def SetErrorHandler(self, error_handler):
- """
- Set the ConnectionErrorHandler that will be notified of errors on the owned
- message pipe.
- """
- self._error_handler = error_handler
-
- def Start(self):
- assert not self._cancellable
- self._RegisterAsyncWaiterForRead()
-
- def Accept(self, message):
- result = self._handle.WriteMessage(message.data, message.handles)
- return result == system.RESULT_OK
-
- def Close(self):
- if self._cancellable:
- self._cancellable()
- self._cancellable = None
- self._handle.Close()
-
- def PassMessagePipe(self):
- if self._cancellable:
- self._cancellable()
- self._cancellable = None
- result = self._handle
- self._handle = system.Handle()
- return result
-
- def _OnAsyncWaiterResult(self, result):
- self._cancellable = None
- if result == system.RESULT_OK:
- self._ReadOutstandingMessages()
- else:
- self._OnError(result)
-
- def _OnError(self, result):
- assert not self._cancellable
- if self._error_handler:
- self._error_handler.OnError(result)
- self._handle.Close()
-
- def _RegisterAsyncWaiterForRead(self) :
- assert not self._cancellable
- self._cancellable = self._handle.AsyncWait(
- system.HANDLE_SIGNAL_READABLE,
- system.DEADLINE_INDEFINITE,
- _WeakCallback(self._OnAsyncWaiterResult))
-
- def _ReadOutstandingMessages(self):
- result = None
- dispatched = True
- while dispatched:
- result, dispatched = _ReadAndDispatchMessage(
- self._handle, self._incoming_message_receiver)
- if result == system.RESULT_SHOULD_WAIT:
- self._RegisterAsyncWaiterForRead()
- return
- self._OnError(result)
-
-
-class Router(MessageReceiverWithResponder):
- """
- A Router will handle mojo message and forward those to a Connector. It deals
- with parsing of headers and adding of request ids in order to be able to match
- a response to a request.
- """
-
- def __init__(self, handle):
- MessageReceiverWithResponder.__init__(self)
- self._incoming_message_receiver = None
- self._next_request_id = 1
- self._responders = {}
- self._connector = Connector(handle)
- self._connector.SetIncomingMessageReceiver(
- ForwardingMessageReceiver(_WeakCallback(self._HandleIncomingMessage)))
-
- def Start(self):
- self._connector.Start()
-
- def SetIncomingMessageReceiver(self, message_receiver):
- """
- Set the MessageReceiver that will receive message from the owned message
- pipe.
- """
- self._incoming_message_receiver = message_receiver
-
- def SetErrorHandler(self, error_handler):
- """
- Set the ConnectionErrorHandler that will be notified of errors on the owned
- message pipe.
- """
- self._connector.SetErrorHandler(error_handler)
-
- def Accept(self, message):
- # A message without responder is directly forwarded to the connector.
- return self._connector.Accept(message)
-
- def AcceptWithResponder(self, message, responder):
- # The message must have a header.
- header = message.header
- assert header.expects_response
- request_id = self._NextRequestId()
- header.request_id = request_id
- if not self._connector.Accept(message):
- return False
- self._responders[request_id] = responder
- return True
-
- def Close(self):
- self._connector.Close()
-
- def PassMessagePipe(self):
- return self._connector.PassMessagePipe()
-
- def _HandleIncomingMessage(self, message):
- header = message.header
- if header.expects_response:
- if self._incoming_message_receiver:
- return self._incoming_message_receiver.AcceptWithResponder(
- message, self)
- # If we receive a request expecting a response when the client is not
- # listening, then we have no choice but to tear down the pipe.
- self.Close()
- return False
- if header.is_response:
- request_id = header.request_id
- responder = self._responders.pop(request_id, None)
- if responder is None:
- return False
- return responder.Accept(message)
- if self._incoming_message_receiver:
- return self._incoming_message_receiver.Accept(message)
- # Ok to drop the message
- return False
-
- def _NextRequestId(self):
- request_id = self._next_request_id
- while request_id == 0 or request_id in self._responders:
- request_id = (request_id + 1) % (1 << 64)
- self._next_request_id = (request_id + 1) % (1 << 64)
- return request_id
-
-class ForwardingMessageReceiver(MessageReceiver):
- """A MessageReceiver that forward calls to |Accept| to a callable."""
-
- def __init__(self, callback):
- MessageReceiver.__init__(self)
- self._callback = callback
-
- def Accept(self, message):
- return self._callback(message)
-
-
-def _WeakCallback(callback):
- func = callback.im_func
- self = callback.im_self
- if not self:
- return callback
- weak_self = weakref.ref(self)
- def Callback(*args, **kwargs):
- self = weak_self()
- if self:
- return func(self, *args, **kwargs)
- return Callback
-
-
-def _ReadAndDispatchMessage(handle, message_receiver):
- dispatched = False
- (result, _, sizes) = handle.ReadMessage()
- if result == system.RESULT_OK and message_receiver:
- dispatched = message_receiver.Accept(Message(bytearray(), []))
- if result != system.RESULT_RESOURCE_EXHAUSTED:
- return (result, dispatched)
- (result, data, _) = handle.ReadMessage(bytearray(sizes[0]), sizes[1])
- if result == system.RESULT_OK and message_receiver:
- dispatched = message_receiver.Accept(Message(data[0], data[1]))
- return (result, dispatched)
-
-def _HasRequestId(flags):
- return flags & (MESSAGE_EXPECTS_RESPONSE_FLAG|MESSAGE_IS_RESPONSE_FLAG) != 0
« no previous file with comments | « mojo/public/python/mojo_bindings/interface_reflection.py ('k') | mojo/public/python/mojo_bindings/promise.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698