Index: mojo/public/python/mojo/bindings/reflection.py |
diff --git a/mojo/public/python/mojo/bindings/reflection.py b/mojo/public/python/mojo/bindings/reflection.py |
index e976db300e8a7ace3f9e5b50445631e57559bc68..49c911caa4bd5502822417608a50eb727fb741c1 100644 |
--- a/mojo/public/python/mojo/bindings/reflection.py |
+++ b/mojo/public/python/mojo/bindings/reflection.py |
@@ -5,8 +5,12 @@ |
"""The metaclasses used by the mojo python bindings.""" |
import itertools |
+import logging |
+import sys |
# pylint: disable=F0401 |
+import mojo.bindings.messaging as messaging |
+import mojo.bindings.promise as promise |
import mojo.bindings.serialization as serialization |
@@ -67,7 +71,7 @@ class MojoStructType(type): |
], |
}, |
'fields': [ |
- FieldDescriptor('x', _descriptor.TYPE_INT32, 0), |
+ SingleFieldGroup('x', _descriptor.TYPE_INT32, 0, 0), |
], |
} |
@@ -108,6 +112,11 @@ class MojoStructType(type): |
return serialization_object.Serialize(self, handle_offset) |
dictionary['Serialize'] = Serialize |
+ # pylint: disable=W0212 |
+ def AsDict(self): |
+ return self._fields |
+ dictionary['AsDict'] = AsDict |
+ |
def Deserialize(cls, data, handles): |
result = cls.__new__(cls) |
fields = {} |
@@ -130,6 +139,227 @@ class MojoStructType(type): |
raise AttributeError, 'can\'t delete attribute' |
+class MojoInterfaceType(type): |
+ """Meta class for interfaces. |
+ |
+ Usage: |
+ class MyInterface(object): |
+ __metaclass__ = MojoInterfaceType |
+ DESCRIPTOR = { |
+ 'client': MyInterfaceClient, |
+ 'methods': [ |
+ { |
+ 'name': 'FireAndForget', |
+ 'ordinal': 0, |
+ 'parameters': [ |
+ SingleFieldGroup('x', _descriptor.TYPE_INT32, 0, 0), |
+ ] |
+ }, |
+ { |
+ 'name': 'Ping', |
+ 'ordinal': 1, |
+ 'parameters': [ |
+ SingleFieldGroup('x', _descriptor.TYPE_INT32, 0, 0), |
+ ], |
+ 'responses': [ |
+ SingleFieldGroup('x', _descriptor.TYPE_INT32, 0, 0), |
+ ], |
+ }, |
+ ], |
+ } |
+ """ |
+ |
+ def __new__(mcs, name, bases, dictionary): |
+ # If one of the base class is already an interface type, do not edit the |
+ # class. |
+ for base in bases: |
+ if isinstance(base, mcs): |
+ return type.__new__(mcs, name, bases, dictionary) |
+ |
+ descriptor = dictionary.pop('DESCRIPTOR', {}) |
+ |
+ methods = [_MethodDescriptor(x) for x in descriptor.get('methods', [])] |
+ for method in methods: |
+ dictionary[method.name] = _NotImplemented |
+ client_class = descriptor.get('client', None) |
+ |
+ interface_manager = InterfaceManager(name, methods, client_class) |
+ dictionary.update({ |
+ 'client': None, |
+ 'manager': None, |
+ '_interface_manager': interface_manager, |
+ }) |
+ |
+ interface_class = type.__new__(mcs, name, bases, dictionary) |
+ interface_manager.interface_class = interface_class |
+ return interface_class |
+ |
+ @property |
+ def manager(mcs): |
+ return mcs._interface_manager |
+ |
+ # Prevent adding new attributes, or mutating constants. |
+ def __setattr__(mcs, key, value): |
+ raise AttributeError, 'can\'t set attribute' |
+ |
+ # Prevent deleting constants. |
+ def __delattr__(mcs, key): |
+ raise AttributeError, 'can\'t delete attribute' |
+ |
+ |
+class InterfaceManager(object): |
+ """ |
+ Manager for an interface class. The manager contains the operation that allows |
+ to bind an implementation to a pipe, or to generate a proxy for an interface |
+ over a pipe. |
+ """ |
+ |
+ def __init__(self, name, methods, client_class): |
+ self.name = name |
+ self.methods = methods |
+ if client_class: |
+ self.client_manager = client_class.manager |
+ else: |
+ self.client_manager = None |
+ self.interface_class = None |
+ self._proxy_class = None |
+ self._stub_class = None |
+ |
+ def Proxy(self, handle): |
+ router = messaging.Router(handle) |
+ error_handler = _ProxyErrorHandler() |
+ router.SetErrorHandler(error_handler) |
+ router.Start() |
+ return self._InternalProxy(router, error_handler) |
+ |
+ # pylint: disable=W0212 |
+ def Bind(self, impl, handle): |
+ router = messaging.Router(handle) |
+ router.SetIncomingMessageReceiver(self._Stub(impl)) |
+ error_handler = _ProxyErrorHandler() |
+ router.SetErrorHandler(error_handler) |
+ |
+ # Retain the router, until an error happen. |
+ retainer = _Retainer(router) |
+ def Cleanup(_): |
+ retainer.release() |
+ error_handler.AddCallback(Cleanup) |
+ |
+ if self.client_manager: |
+ impl.client = self.client_manager._InternalProxy(router, error_handler) |
+ |
+ # Give an instance manager to the implementation to allow it to close |
+ # the connection. |
+ impl.manager = InstanceManager(router) |
+ |
+ router.Start() |
+ |
+ def _InternalProxy(self, router, error_handler): |
+ if not self._proxy_class: |
+ dictionary = { |
+ '__module__': __name__, |
+ '__init__': _ProxyInit, |
+ } |
+ if self.client_manager: |
+ dictionary['client'] = property(_ProxyGetClient, _ProxySetClient) |
+ dictionary['manager'] = None |
+ dictionary['_client_manager'] = self.client_manager |
+ for method in self.methods: |
+ dictionary[method.name] = _ProxyMethodCall(method) |
+ self._proxy_class = type('%sProxy' % self.name, |
+ (self.interface_class,), |
+ dictionary) |
+ |
+ proxy = self._proxy_class(router, error_handler) |
+ # Give an instance manager to the proxy to allow to close the connection. |
+ proxy.manager = InstanceManager(router) |
+ return proxy |
+ |
+ def _Stub(self, impl): |
+ if not self._stub_class: |
+ accept_method = _StubAccept(self.methods) |
+ dictionary = { |
+ '__module__': __name__, |
+ '__init__': _StubInit, |
+ 'Accept': accept_method, |
+ 'AcceptWithResponder': accept_method, |
+ } |
+ self._stub_class = type('%sStub' % self.name, |
+ (messaging.MessageReceiverWithResponder,), |
+ dictionary) |
+ return self._stub_class(impl) |
+ |
+ |
+class InstanceManager(object): |
+ """ |
+ Manager for the implementation of an interface or a proxy. The manager allows |
+ to control the connection over the pipe. |
+ """ |
+ def __init__(self, router): |
+ self.router = router |
+ |
+ def Close(self): |
+ self.router.Close() |
+ |
+ |
+class _MethodDescriptor(object): |
+ def __init__(self, descriptor): |
+ self.name = descriptor['name'] |
+ self.ordinal = descriptor['ordinal'] |
+ self.parameters_struct = _ConstructParameterStruct( |
+ descriptor['parameters'], self.name, "Parameters") |
+ self.response_struct = _ConstructParameterStruct( |
+ descriptor.get('responses'), self.name, "Responses") |
+ |
+ |
+def _ConstructParameterStruct(descriptor, name, suffix): |
+ if descriptor is None: |
+ return None |
+ parameter_dictionary = { |
+ '__metaclass__': MojoStructType, |
+ '__module__': __name__, |
+ 'DESCRIPTOR': descriptor, |
+ } |
+ return MojoStructType( |
+ '%s%s' % (name, suffix), |
+ (object,), |
+ parameter_dictionary) |
+ |
+ |
+class _ProxyErrorHandler(messaging.ConnectionErrorHandler): |
+ def __init__(self): |
+ messaging.ConnectionErrorHandler.__init__(self) |
+ self._callbacks = set() |
+ |
+ def OnError(self, result): |
+ exception = messaging.MessagingException('Mojo error: %d' % result) |
+ for callback in list(self._callbacks): |
+ callback(exception) |
+ self._callbacks = None |
+ |
+ def AddCallback(self, callback): |
+ if self._callbacks is not None: |
+ self._callbacks.add(callback) |
+ |
+ def RemoveCallback(self, callback): |
+ if self._callbacks: |
+ self._callbacks.remove(callback) |
+ |
+ |
+class _Retainer(object): |
+ |
+ # Set to force instances to be retained. |
+ _RETAINED = set() |
+ |
+ def __init__(self, retained): |
+ self._retained = retained |
+ _Retainer._RETAINED.add(self) |
+ |
+ def release(self): |
+ self._retained = None |
+ _Retainer._RETAINED.remove(self) |
+ |
+ |
def _StructInit(fields): |
def _Init(self, *args, **kwargs): |
if len(args) + len(kwargs) > len(fields): |
@@ -165,6 +395,7 @@ def _BuildProperty(field): |
return property(Get, Set) |
+ |
def _StructEq(fields): |
def _Eq(self, other): |
if type(self) is not type(other): |
@@ -177,3 +408,125 @@ def _StructEq(fields): |
def _StructNe(self, other): |
return not self.__eq__(other) |
+ |
+ |
+def _ProxyInit(self, router, error_handler): |
+ self._router = router |
+ self._error_handler = error_handler |
+ self._client = None |
+ |
+ |
+# pylint: disable=W0212 |
+def _ProxyGetClient(self): |
+ return self._client |
+ |
+ |
+# pylint: disable=W0212 |
+def _ProxySetClient(self, client): |
+ self._client = client |
+ stub = self._client_manager._Stub(client) |
+ self._router.SetIncomingMessageReceiver(stub) |
+ |
+ |
+# pylint: disable=W0212 |
+def _ProxyMethodCall(method): |
+ flags = messaging.NO_FLAG |
+ if method.response_struct: |
+ flags = messaging.MESSAGE_EXPECTS_RESPONSE_FLAG |
+ def _Call(self, *args, **kwargs): |
+ def GenerationMethod(resolve, reject): |
+ message = _GetMessage(method, flags, *args, **kwargs) |
+ if method.response_struct: |
+ def Accept(message): |
+ try: |
+ assert message.header.message_type == method.ordinal |
+ payload = message.payload |
+ response = method.response_struct.Deserialize(payload.data, |
+ payload.handles) |
+ as_dict = response.AsDict() |
+ if len(as_dict) == 1: |
+ value = as_dict.values()[0] |
+ if not isinstance(value, dict): |
+ response = value |
+ resolve(response) |
+ return True |
+ except Exception as e: |
+ # Adding traceback similarly to python 3.0 (pep-3134) |
+ e.__traceback__ = sys.exc_info()[2] |
+ reject(e) |
+ return False |
+ finally: |
+ self._error_handler.RemoveCallback(reject) |
+ |
+ self._error_handler.AddCallback(reject) |
+ if not self._router.AcceptWithResponder( |
+ message, messaging.ForwardingMessageReceiver(Accept)): |
+ self._error_handler.RemoveCallback(reject) |
+ reject(messaging.MessagingException("Unable to send message.")) |
+ else: |
+ if (self._router.Accept(message)): |
+ resolve(None) |
+ else: |
+ reject(messaging.MessagingException("Unable to send message.")) |
+ return promise.Promise(GenerationMethod) |
+ return _Call |
+ |
+ |
+def _GetMessage(method, flags, *args, **kwargs): |
+ if flags == messaging.MESSAGE_IS_RESPONSE_FLAG: |
+ struct = method.response_struct(*args, **kwargs) |
+ else: |
+ struct = method.parameters_struct(*args, **kwargs) |
+ header = messaging.MessageHeader(method.ordinal, flags) |
+ data = header.Serialize() |
+ (payload, handles) = struct.Serialize() |
+ data.extend(payload) |
+ return messaging.Message(data, handles, header) |
+ |
+ |
+def _StubInit(self, impl): |
+ self.impl = impl |
+ |
+ |
+def _StubAccept(methods): |
+ methods_by_ordinal = dict((m.ordinal, m) for m in methods) |
+ def Accept(self, message, responder=None): |
+ try: |
+ header = message.header |
+ assert header.expects_response == bool(responder) |
+ assert header.message_type in methods_by_ordinal |
+ method = methods_by_ordinal[header.message_type] |
+ payload = message.payload |
+ parameters = method.parameters_struct.Deserialize( |
+ payload.data, payload.handles).AsDict() |
+ response = getattr(self.impl, method.name)(**parameters) |
+ if header.expects_response: |
+ def SendResponse(response): |
+ if isinstance(response, dict): |
+ response_message = _GetMessage(method, |
+ messaging.MESSAGE_IS_RESPONSE_FLAG, |
+ **response) |
+ else: |
+ response_message = _GetMessage(method, |
+ messaging.MESSAGE_IS_RESPONSE_FLAG, |
+ response) |
+ response_message.header.request_id = header.request_id |
+ responder.Accept(response_message) |
+ p = promise.Promise.Resolve(response).Then(SendResponse) |
+ if self.impl.manager: |
+ # Close the connection in case of error. |
+ p.Catch(lambda _: self.impl.manager.Close()) |
+ return True |
+ # pylint: disable=W0702 |
+ except: |
+ # Close the connection in case of error. |
+ logging.warning( |
+ 'Error occured in accept method. Connection will be closed.') |
+ if self.impl.manager: |
+ self.impl.manager.Close() |
+ return False |
+ return Accept |
+ |
+ |
+def _NotImplemented(*_1, **_2): |
+ raise NotImplementedError() |