Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(119)

Side by Side Diff: mojo/public/python/mojo_bindings/messaging.py

Issue 814543006: Move //mojo/{public, edk} underneath //third_party (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Utility classes to handle sending and receiving messages."""
6
7
8 import struct
9 import sys
10 import weakref
11
12 import mojo_bindings.serialization as serialization
13
14 # pylint: disable=E0611,F0401
15 import mojo_system as system
16
17
18 # The flag values for a message header.
19 NO_FLAG = 0
20 MESSAGE_EXPECTS_RESPONSE_FLAG = 1 << 0
21 MESSAGE_IS_RESPONSE_FLAG = 1 << 1
22
23
24 class MessagingException(Exception):
25 def __init__(self, *args, **kwargs):
26 Exception.__init__(self, *args, **kwargs)
27 self.__traceback__ = sys.exc_info()[2]
28
29
30 class MessageHeader(object):
31 """The header of a mojo message."""
32
33 _SIMPLE_MESSAGE_NUM_FIELDS = 2
34 _SIMPLE_MESSAGE_STRUCT = struct.Struct("<IIII")
35
36 _REQUEST_ID_STRUCT = struct.Struct("<Q")
37 _REQUEST_ID_OFFSET = _SIMPLE_MESSAGE_STRUCT.size
38
39 _MESSAGE_WITH_REQUEST_ID_NUM_FIELDS = 3
40 _MESSAGE_WITH_REQUEST_ID_SIZE = (
41 _SIMPLE_MESSAGE_STRUCT.size + _REQUEST_ID_STRUCT.size)
42
43 def __init__(self, message_type, flags, request_id=0, data=None):
44 self._message_type = message_type
45 self._flags = flags
46 self._request_id = request_id
47 self._data = data
48
49 @classmethod
50 def Deserialize(cls, data):
51 buf = buffer(data)
52 if len(data) < cls._SIMPLE_MESSAGE_STRUCT.size:
53 raise serialization.DeserializationException('Header is too short.')
54 (size, version, message_type, flags) = (
55 cls._SIMPLE_MESSAGE_STRUCT.unpack_from(buf))
56 if (version < cls._SIMPLE_MESSAGE_NUM_FIELDS):
57 raise serialization.DeserializationException('Incorrect version.')
58 request_id = 0
59 if _HasRequestId(flags):
60 if version < cls._MESSAGE_WITH_REQUEST_ID_NUM_FIELDS:
61 raise serialization.DeserializationException('Incorrect version.')
62 if (size < cls._MESSAGE_WITH_REQUEST_ID_SIZE or
63 len(data) < cls._MESSAGE_WITH_REQUEST_ID_SIZE):
64 raise serialization.DeserializationException('Header is too short.')
65 (request_id, ) = cls._REQUEST_ID_STRUCT.unpack_from(
66 buf, cls._REQUEST_ID_OFFSET)
67 return MessageHeader(message_type, flags, request_id, data)
68
69 @property
70 def message_type(self):
71 return self._message_type
72
73 # pylint: disable=E0202
74 @property
75 def request_id(self):
76 assert self.has_request_id
77 return self._request_id
78
79 # pylint: disable=E0202
80 @request_id.setter
81 def request_id(self, request_id):
82 assert self.has_request_id
83 self._request_id = request_id
84 self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET,
85 request_id)
86
87 @property
88 def has_request_id(self):
89 return _HasRequestId(self._flags)
90
91 @property
92 def expects_response(self):
93 return self._HasFlag(MESSAGE_EXPECTS_RESPONSE_FLAG)
94
95 @property
96 def is_response(self):
97 return self._HasFlag(MESSAGE_IS_RESPONSE_FLAG)
98
99 @property
100 def size(self):
101 if self.has_request_id:
102 return self._MESSAGE_WITH_REQUEST_ID_SIZE
103 return self._SIMPLE_MESSAGE_STRUCT.size
104
105 def Serialize(self):
106 if not self._data:
107 self._data = bytearray(self.size)
108 version = self._SIMPLE_MESSAGE_NUM_FIELDS
109 size = self._SIMPLE_MESSAGE_STRUCT.size
110 if self.has_request_id:
111 version = self._MESSAGE_WITH_REQUEST_ID_NUM_FIELDS
112 size = self._MESSAGE_WITH_REQUEST_ID_SIZE
113 self._SIMPLE_MESSAGE_STRUCT.pack_into(self._data, 0, size, version,
114 self._message_type, self._flags)
115 if self.has_request_id:
116 self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET,
117 self._request_id)
118 return self._data
119
120 def _HasFlag(self, flag):
121 return self._flags & flag != 0
122
123
124 class Message(object):
125 """A message for a message pipe. This contains data and handles."""
126
127 def __init__(self, data=None, handles=None, header=None):
128 self.data = data
129 self.handles = handles
130 self._header = header
131 self._payload = None
132
133 @property
134 def header(self):
135 if self._header is None:
136 self._header = MessageHeader.Deserialize(self.data)
137 return self._header
138
139 @property
140 def payload(self):
141 if self._payload is None:
142 self._payload = Message(self.data[self.header.size:], self.handles)
143 return self._payload
144
145 def SetRequestId(self, request_id):
146 header = self.header
147 header.request_id = request_id
148 (data, _) = header.Serialize()
149 self.data[:header.Size] = data[:header.Size]
150
151
152 class MessageReceiver(object):
153 """A class which implements this interface can receive Message objects."""
154
155 def Accept(self, message):
156 """
157 Receive a Message. The MessageReceiver is allowed to mutate the message.
158
159 Args:
160 message: the received message.
161
162 Returns:
163 True if the message has been handled, False otherwise.
164 """
165 raise NotImplementedError()
166
167
168 class MessageReceiverWithResponder(MessageReceiver):
169 """
170 A MessageReceiver that can also handle the response message generated from the
171 given message.
172 """
173
174 def AcceptWithResponder(self, message, responder):
175 """
176 A variant on Accept that registers a MessageReceiver (known as the
177 responder) to handle the response message generated from the given message.
178 The responder's Accept method may be called as part of the call to
179 AcceptWithResponder, or some time after its return.
180
181 Args:
182 message: the received message.
183 responder: the responder that will receive the response.
184
185 Returns:
186 True if the message has been handled, False otherwise.
187 """
188 raise NotImplementedError()
189
190
191 class ConnectionErrorHandler(object):
192 """
193 A ConnectionErrorHandler is notified of an error happening while using the
194 bindings over message pipes.
195 """
196
197 def OnError(self, result):
198 raise NotImplementedError()
199
200
201 class Connector(MessageReceiver):
202 """
203 A Connector owns a message pipe and will send any received messages to the
204 registered MessageReceiver. It also acts as a MessageReceiver and will send
205 any message through the handle.
206
207 The method Start must be called before the Connector will start listening to
208 incoming messages.
209 """
210
211 def __init__(self, handle):
212 MessageReceiver.__init__(self)
213 self._handle = handle
214 self._cancellable = None
215 self._incoming_message_receiver = None
216 self._error_handler = None
217
218 def __del__(self):
219 if self._cancellable:
220 self._cancellable()
221
222 def SetIncomingMessageReceiver(self, message_receiver):
223 """
224 Set the MessageReceiver that will receive message from the owned message
225 pipe.
226 """
227 self._incoming_message_receiver = message_receiver
228
229 def SetErrorHandler(self, error_handler):
230 """
231 Set the ConnectionErrorHandler that will be notified of errors on the owned
232 message pipe.
233 """
234 self._error_handler = error_handler
235
236 def Start(self):
237 assert not self._cancellable
238 self._RegisterAsyncWaiterForRead()
239
240 def Accept(self, message):
241 result = self._handle.WriteMessage(message.data, message.handles)
242 return result == system.RESULT_OK
243
244 def Close(self):
245 if self._cancellable:
246 self._cancellable()
247 self._cancellable = None
248 self._handle.Close()
249
250 def PassMessagePipe(self):
251 if self._cancellable:
252 self._cancellable()
253 self._cancellable = None
254 result = self._handle
255 self._handle = system.Handle()
256 return result
257
258 def _OnAsyncWaiterResult(self, result):
259 self._cancellable = None
260 if result == system.RESULT_OK:
261 self._ReadOutstandingMessages()
262 else:
263 self._OnError(result)
264
265 def _OnError(self, result):
266 assert not self._cancellable
267 if self._error_handler:
268 self._error_handler.OnError(result)
269 self._handle.Close()
270
271 def _RegisterAsyncWaiterForRead(self) :
272 assert not self._cancellable
273 self._cancellable = self._handle.AsyncWait(
274 system.HANDLE_SIGNAL_READABLE,
275 system.DEADLINE_INDEFINITE,
276 _WeakCallback(self._OnAsyncWaiterResult))
277
278 def _ReadOutstandingMessages(self):
279 result = system.RESULT_OK
280 while result == system.RESULT_OK:
281 result = _ReadAndDispatchMessage(self._handle,
282 self._incoming_message_receiver)
283 if result == system.RESULT_SHOULD_WAIT:
284 self._RegisterAsyncWaiterForRead()
285 return
286 self._OnError(result)
287
288
289 class Router(MessageReceiverWithResponder):
290 """
291 A Router will handle mojo message and forward those to a Connector. It deals
292 with parsing of headers and adding of request ids in order to be able to match
293 a response to a request.
294 """
295
296 def __init__(self, handle):
297 MessageReceiverWithResponder.__init__(self)
298 self._incoming_message_receiver = None
299 self._next_request_id = 1
300 self._responders = {}
301 self._connector = Connector(handle)
302 self._connector.SetIncomingMessageReceiver(
303 ForwardingMessageReceiver(_WeakCallback(self._HandleIncomingMessage)))
304
305 def Start(self):
306 self._connector.Start()
307
308 def SetIncomingMessageReceiver(self, message_receiver):
309 """
310 Set the MessageReceiver that will receive message from the owned message
311 pipe.
312 """
313 self._incoming_message_receiver = message_receiver
314
315 def SetErrorHandler(self, error_handler):
316 """
317 Set the ConnectionErrorHandler that will be notified of errors on the owned
318 message pipe.
319 """
320 self._connector.SetErrorHandler(error_handler)
321
322 def Accept(self, message):
323 # A message without responder is directly forwarded to the connector.
324 return self._connector.Accept(message)
325
326 def AcceptWithResponder(self, message, responder):
327 # The message must have a header.
328 header = message.header
329 assert header.expects_response
330 request_id = self._NextRequestId()
331 header.request_id = request_id
332 if not self._connector.Accept(message):
333 return False
334 self._responders[request_id] = responder
335 return True
336
337 def Close(self):
338 self._connector.Close()
339
340 def PassMessagePipe(self):
341 return self._connector.PassMessagePipe()
342
343 def _HandleIncomingMessage(self, message):
344 header = message.header
345 if header.expects_response:
346 if self._incoming_message_receiver:
347 return self._incoming_message_receiver.AcceptWithResponder(
348 message, self)
349 # If we receive a request expecting a response when the client is not
350 # listening, then we have no choice but to tear down the pipe.
351 self.Close()
352 return False
353 if header.is_response:
354 request_id = header.request_id
355 responder = self._responders.pop(request_id, None)
356 if responder is None:
357 return False
358 return responder.Accept(message)
359 if self._incoming_message_receiver:
360 return self._incoming_message_receiver.Accept(message)
361 # Ok to drop the message
362 return False
363
364 def _NextRequestId(self):
365 request_id = self._next_request_id
366 while request_id == 0 or request_id in self._responders:
367 request_id = (request_id + 1) % (1 << 64)
368 self._next_request_id = (request_id + 1) % (1 << 64)
369 return request_id
370
371 class ForwardingMessageReceiver(MessageReceiver):
372 """A MessageReceiver that forward calls to |Accept| to a callable."""
373
374 def __init__(self, callback):
375 MessageReceiver.__init__(self)
376 self._callback = callback
377
378 def Accept(self, message):
379 return self._callback(message)
380
381
382 def _WeakCallback(callback):
383 func = callback.im_func
384 self = callback.im_self
385 if not self:
386 return callback
387 weak_self = weakref.ref(self)
388 def Callback(*args, **kwargs):
389 self = weak_self()
390 if self:
391 return func(self, *args, **kwargs)
392 return Callback
393
394
395 def _ReadAndDispatchMessage(handle, message_receiver):
396 (result, _, sizes) = handle.ReadMessage()
397 if result == system.RESULT_OK and message_receiver:
398 message_receiver.Accept(Message(bytearray(), []))
399 if result != system.RESULT_RESOURCE_EXHAUSTED:
400 return result
401 (result, data, _) = handle.ReadMessage(bytearray(sizes[0]), sizes[1])
402 if result == system.RESULT_OK and message_receiver:
403 message_receiver.Accept(Message(data[0], data[1]))
404 return result
405
406 def _HasRequestId(flags):
407 return flags & (MESSAGE_EXPECTS_RESPONSE_FLAG|MESSAGE_IS_RESPONSE_FLAG) != 0
OLDNEW
« no previous file with comments | « mojo/public/python/mojo_bindings/descriptor.py ('k') | mojo/public/python/mojo_bindings/promise.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698