Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(810)

Side by Side Diff: mojo/public/python/mojo_bindings/messaging.py

Issue 2250183003: Make the fuchsia mojo/public repo the source of truth. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Utility classes to handle sending and receiving messages."""
6
7
8 import struct
9 import sys
10 import weakref
11
12 import mojo_bindings.serialization as serialization
13
14 # pylint: disable=E0611,F0401
15 import mojo_system as system
16
17
18 # The flag values for a message header.
19 NO_FLAG = 0
20 MESSAGE_EXPECTS_RESPONSE_FLAG = 1 << 0
21 MESSAGE_IS_RESPONSE_FLAG = 1 << 1
22
23
24 class MessagingException(Exception):
25 def __init__(self, *args, **kwargs):
26 Exception.__init__(self, *args, **kwargs)
27 self.__traceback__ = sys.exc_info()[2]
28
29
30 class MessageHeader(object):
31 """The header of a mojo message."""
32
33 _SIMPLE_MESSAGE_VERSION = 0
34 _SIMPLE_MESSAGE_STRUCT = struct.Struct("<IIII")
35
36 _REQUEST_ID_STRUCT = struct.Struct("<Q")
37 _REQUEST_ID_OFFSET = _SIMPLE_MESSAGE_STRUCT.size
38
39 _MESSAGE_WITH_REQUEST_ID_VERSION = 1
40 _MESSAGE_WITH_REQUEST_ID_SIZE = (
41 _SIMPLE_MESSAGE_STRUCT.size + _REQUEST_ID_STRUCT.size)
42
43 def __init__(self, message_type, flags, request_id=0, data=None):
44 self._message_type = message_type
45 self._flags = flags
46 self._request_id = request_id
47 self._data = data
48
49 @classmethod
50 def Deserialize(cls, data):
51 buf = buffer(data)
52 if len(data) < cls._SIMPLE_MESSAGE_STRUCT.size:
53 raise serialization.DeserializationException('Header is too short.')
54 (size, version, message_type, flags) = (
55 cls._SIMPLE_MESSAGE_STRUCT.unpack_from(buf))
56 if (version < cls._SIMPLE_MESSAGE_VERSION):
57 raise serialization.DeserializationException('Incorrect version.')
58 request_id = 0
59 if _HasRequestId(flags):
60 if version < cls._MESSAGE_WITH_REQUEST_ID_VERSION:
61 raise serialization.DeserializationException('Incorrect version.')
62 if (size < cls._MESSAGE_WITH_REQUEST_ID_SIZE or
63 len(data) < cls._MESSAGE_WITH_REQUEST_ID_SIZE):
64 raise serialization.DeserializationException('Header is too short.')
65 (request_id, ) = cls._REQUEST_ID_STRUCT.unpack_from(
66 buf, cls._REQUEST_ID_OFFSET)
67 return MessageHeader(message_type, flags, request_id, data)
68
69 @property
70 def message_type(self):
71 return self._message_type
72
73 # pylint: disable=E0202
74 @property
75 def request_id(self):
76 assert self.has_request_id
77 return self._request_id
78
79 # pylint: disable=E0202
80 @request_id.setter
81 def request_id(self, request_id):
82 assert self.has_request_id
83 self._request_id = request_id
84 self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET,
85 request_id)
86
87 @property
88 def has_request_id(self):
89 return _HasRequestId(self._flags)
90
91 @property
92 def expects_response(self):
93 return self._HasFlag(MESSAGE_EXPECTS_RESPONSE_FLAG)
94
95 @property
96 def is_response(self):
97 return self._HasFlag(MESSAGE_IS_RESPONSE_FLAG)
98
99 @property
100 def size(self):
101 if self.has_request_id:
102 return self._MESSAGE_WITH_REQUEST_ID_SIZE
103 return self._SIMPLE_MESSAGE_STRUCT.size
104
105 def Serialize(self):
106 if not self._data:
107 self._data = bytearray(self.size)
108 version = self._SIMPLE_MESSAGE_VERSION
109 size = self._SIMPLE_MESSAGE_STRUCT.size
110 if self.has_request_id:
111 version = self._MESSAGE_WITH_REQUEST_ID_VERSION
112 size = self._MESSAGE_WITH_REQUEST_ID_SIZE
113 self._SIMPLE_MESSAGE_STRUCT.pack_into(self._data, 0, size, version,
114 self._message_type, self._flags)
115 if self.has_request_id:
116 self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET,
117 self._request_id)
118 return self._data
119
120 def _HasFlag(self, flag):
121 return self._flags & flag != 0
122
123
124 class Message(object):
125 """A message for a message pipe. This contains data and handles."""
126
127 def __init__(self, data=None, handles=None, header=None):
128 self.data = data
129 self.handles = handles
130 self._header = header
131 self._payload = None
132
133 @property
134 def header(self):
135 if self._header is None:
136 self._header = MessageHeader.Deserialize(self.data)
137 return self._header
138
139 @property
140 def payload(self):
141 if self._payload is None:
142 self._payload = Message(self.data[self.header.size:], self.handles)
143 return self._payload
144
145 def SetRequestId(self, request_id):
146 header = self.header
147 header.request_id = request_id
148 (data, _) = header.Serialize()
149 self.data[:header.Size] = data[:header.Size]
150
151
152 class MessageReceiver(object):
153 """A class which implements this interface can receive Message objects."""
154
155 def Accept(self, message):
156 """
157 Receive a Message. The MessageReceiver is allowed to mutate the message.
158
159 Args:
160 message: the received message.
161
162 Returns:
163 True if the message has been handled, False otherwise.
164 """
165 raise NotImplementedError()
166
167
168 class MessageReceiverWithResponder(MessageReceiver):
169 """
170 A MessageReceiver that can also handle the response message generated from the
171 given message.
172 """
173
174 def AcceptWithResponder(self, message, responder):
175 """
176 A variant on Accept that registers a MessageReceiver (known as the
177 responder) to handle the response message generated from the given message.
178 The responder's Accept method may be called as part of the call to
179 AcceptWithResponder, or some time after its return.
180
181 Args:
182 message: the received message.
183 responder: the responder that will receive the response.
184
185 Returns:
186 True if the message has been handled, False otherwise.
187 """
188 raise NotImplementedError()
189
190
191 class ConnectionErrorHandler(object):
192 """
193 A ConnectionErrorHandler is notified of an error happening while using the
194 bindings over message pipes.
195 """
196
197 def OnError(self, result):
198 raise NotImplementedError()
199
200
201 class Connector(MessageReceiver):
202 """
203 A Connector owns a message pipe and will send any received messages to the
204 registered MessageReceiver. It also acts as a MessageReceiver and will send
205 any message through the handle.
206
207 The method Start must be called before the Connector will start listening to
208 incoming messages.
209 """
210
211 def __init__(self, handle):
212 MessageReceiver.__init__(self)
213 self._handle = handle
214 self._cancellable = None
215 self._incoming_message_receiver = None
216 self._error_handler = None
217
218 def __del__(self):
219 if self._cancellable:
220 self._cancellable()
221
222 def SetIncomingMessageReceiver(self, message_receiver):
223 """
224 Set the MessageReceiver that will receive message from the owned message
225 pipe.
226 """
227 self._incoming_message_receiver = message_receiver
228
229 def SetErrorHandler(self, error_handler):
230 """
231 Set the ConnectionErrorHandler that will be notified of errors on the owned
232 message pipe.
233 """
234 self._error_handler = error_handler
235
236 def Start(self):
237 assert not self._cancellable
238 self._RegisterAsyncWaiterForRead()
239
240 def Accept(self, message):
241 result = self._handle.WriteMessage(message.data, message.handles)
242 return result == system.RESULT_OK
243
244 def Close(self):
245 if self._cancellable:
246 self._cancellable()
247 self._cancellable = None
248 self._handle.Close()
249
250 def PassMessagePipe(self):
251 if self._cancellable:
252 self._cancellable()
253 self._cancellable = None
254 result = self._handle
255 self._handle = system.Handle()
256 return result
257
258 def _OnAsyncWaiterResult(self, result):
259 self._cancellable = None
260 if result == system.RESULT_OK:
261 self._ReadOutstandingMessages()
262 else:
263 self._OnError(result)
264
265 def _OnError(self, result):
266 assert not self._cancellable
267 if self._error_handler:
268 self._error_handler.OnError(result)
269 self._handle.Close()
270
271 def _RegisterAsyncWaiterForRead(self) :
272 assert not self._cancellable
273 self._cancellable = self._handle.AsyncWait(
274 system.HANDLE_SIGNAL_READABLE,
275 system.DEADLINE_INDEFINITE,
276 _WeakCallback(self._OnAsyncWaiterResult))
277
278 def _ReadOutstandingMessages(self):
279 result = None
280 dispatched = True
281 while dispatched:
282 result, dispatched = _ReadAndDispatchMessage(
283 self._handle, self._incoming_message_receiver)
284 if result == system.RESULT_SHOULD_WAIT:
285 self._RegisterAsyncWaiterForRead()
286 return
287 self._OnError(result)
288
289
290 class Router(MessageReceiverWithResponder):
291 """
292 A Router will handle mojo message and forward those to a Connector. It deals
293 with parsing of headers and adding of request ids in order to be able to match
294 a response to a request.
295 """
296
297 def __init__(self, handle):
298 MessageReceiverWithResponder.__init__(self)
299 self._incoming_message_receiver = None
300 self._next_request_id = 1
301 self._responders = {}
302 self._connector = Connector(handle)
303 self._connector.SetIncomingMessageReceiver(
304 ForwardingMessageReceiver(_WeakCallback(self._HandleIncomingMessage)))
305
306 def Start(self):
307 self._connector.Start()
308
309 def SetIncomingMessageReceiver(self, message_receiver):
310 """
311 Set the MessageReceiver that will receive message from the owned message
312 pipe.
313 """
314 self._incoming_message_receiver = message_receiver
315
316 def SetErrorHandler(self, error_handler):
317 """
318 Set the ConnectionErrorHandler that will be notified of errors on the owned
319 message pipe.
320 """
321 self._connector.SetErrorHandler(error_handler)
322
323 def Accept(self, message):
324 # A message without responder is directly forwarded to the connector.
325 return self._connector.Accept(message)
326
327 def AcceptWithResponder(self, message, responder):
328 # The message must have a header.
329 header = message.header
330 assert header.expects_response
331 request_id = self._NextRequestId()
332 header.request_id = request_id
333 if not self._connector.Accept(message):
334 return False
335 self._responders[request_id] = responder
336 return True
337
338 def Close(self):
339 self._connector.Close()
340
341 def PassMessagePipe(self):
342 return self._connector.PassMessagePipe()
343
344 def _HandleIncomingMessage(self, message):
345 header = message.header
346 if header.expects_response:
347 if self._incoming_message_receiver:
348 return self._incoming_message_receiver.AcceptWithResponder(
349 message, self)
350 # If we receive a request expecting a response when the client is not
351 # listening, then we have no choice but to tear down the pipe.
352 self.Close()
353 return False
354 if header.is_response:
355 request_id = header.request_id
356 responder = self._responders.pop(request_id, None)
357 if responder is None:
358 return False
359 return responder.Accept(message)
360 if self._incoming_message_receiver:
361 return self._incoming_message_receiver.Accept(message)
362 # Ok to drop the message
363 return False
364
365 def _NextRequestId(self):
366 request_id = self._next_request_id
367 while request_id == 0 or request_id in self._responders:
368 request_id = (request_id + 1) % (1 << 64)
369 self._next_request_id = (request_id + 1) % (1 << 64)
370 return request_id
371
372 class ForwardingMessageReceiver(MessageReceiver):
373 """A MessageReceiver that forward calls to |Accept| to a callable."""
374
375 def __init__(self, callback):
376 MessageReceiver.__init__(self)
377 self._callback = callback
378
379 def Accept(self, message):
380 return self._callback(message)
381
382
383 def _WeakCallback(callback):
384 func = callback.im_func
385 self = callback.im_self
386 if not self:
387 return callback
388 weak_self = weakref.ref(self)
389 def Callback(*args, **kwargs):
390 self = weak_self()
391 if self:
392 return func(self, *args, **kwargs)
393 return Callback
394
395
396 def _ReadAndDispatchMessage(handle, message_receiver):
397 dispatched = False
398 (result, _, sizes) = handle.ReadMessage()
399 if result == system.RESULT_OK and message_receiver:
400 dispatched = message_receiver.Accept(Message(bytearray(), []))
401 if result != system.RESULT_RESOURCE_EXHAUSTED:
402 return (result, dispatched)
403 (result, data, _) = handle.ReadMessage(bytearray(sizes[0]), sizes[1])
404 if result == system.RESULT_OK and message_receiver:
405 dispatched = message_receiver.Accept(Message(data[0], data[1]))
406 return (result, dispatched)
407
408 def _HasRequestId(flags):
409 return flags & (MESSAGE_EXPECTS_RESPONSE_FLAG|MESSAGE_IS_RESPONSE_FLAG) != 0
OLDNEW
« no previous file with comments | « mojo/public/python/mojo_bindings/interface_reflection.py ('k') | mojo/public/python/mojo_bindings/promise.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698