OLD | NEW |
| (Empty) |
1 # Copyright 2014 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """Utility classes to handle sending and receiving messages.""" | |
6 | |
7 | |
8 import struct | |
9 import sys | |
10 import weakref | |
11 | |
12 import mojo_bindings.serialization as serialization | |
13 | |
14 # pylint: disable=E0611,F0401 | |
15 import mojo_system as system | |
16 | |
17 | |
18 # The flag values for a message header. | |
19 NO_FLAG = 0 | |
20 MESSAGE_EXPECTS_RESPONSE_FLAG = 1 << 0 | |
21 MESSAGE_IS_RESPONSE_FLAG = 1 << 1 | |
22 | |
23 | |
24 class MessagingException(Exception): | |
25 def __init__(self, *args, **kwargs): | |
26 Exception.__init__(self, *args, **kwargs) | |
27 self.__traceback__ = sys.exc_info()[2] | |
28 | |
29 | |
30 class MessageHeader(object): | |
31 """The header of a mojo message.""" | |
32 | |
33 _SIMPLE_MESSAGE_NUM_FIELDS = 2 | |
34 _SIMPLE_MESSAGE_STRUCT = struct.Struct("<IIII") | |
35 | |
36 _REQUEST_ID_STRUCT = struct.Struct("<Q") | |
37 _REQUEST_ID_OFFSET = _SIMPLE_MESSAGE_STRUCT.size | |
38 | |
39 _MESSAGE_WITH_REQUEST_ID_NUM_FIELDS = 3 | |
40 _MESSAGE_WITH_REQUEST_ID_SIZE = ( | |
41 _SIMPLE_MESSAGE_STRUCT.size + _REQUEST_ID_STRUCT.size) | |
42 | |
43 def __init__(self, message_type, flags, request_id=0, data=None): | |
44 self._message_type = message_type | |
45 self._flags = flags | |
46 self._request_id = request_id | |
47 self._data = data | |
48 | |
49 @classmethod | |
50 def Deserialize(cls, data): | |
51 buf = buffer(data) | |
52 if len(data) < cls._SIMPLE_MESSAGE_STRUCT.size: | |
53 raise serialization.DeserializationException('Header is too short.') | |
54 (size, version, message_type, flags) = ( | |
55 cls._SIMPLE_MESSAGE_STRUCT.unpack_from(buf)) | |
56 if (version < cls._SIMPLE_MESSAGE_NUM_FIELDS): | |
57 raise serialization.DeserializationException('Incorrect version.') | |
58 request_id = 0 | |
59 if _HasRequestId(flags): | |
60 if version < cls._MESSAGE_WITH_REQUEST_ID_NUM_FIELDS: | |
61 raise serialization.DeserializationException('Incorrect version.') | |
62 if (size < cls._MESSAGE_WITH_REQUEST_ID_SIZE or | |
63 len(data) < cls._MESSAGE_WITH_REQUEST_ID_SIZE): | |
64 raise serialization.DeserializationException('Header is too short.') | |
65 (request_id, ) = cls._REQUEST_ID_STRUCT.unpack_from( | |
66 buf, cls._REQUEST_ID_OFFSET) | |
67 return MessageHeader(message_type, flags, request_id, data) | |
68 | |
69 @property | |
70 def message_type(self): | |
71 return self._message_type | |
72 | |
73 # pylint: disable=E0202 | |
74 @property | |
75 def request_id(self): | |
76 assert self.has_request_id | |
77 return self._request_id | |
78 | |
79 # pylint: disable=E0202 | |
80 @request_id.setter | |
81 def request_id(self, request_id): | |
82 assert self.has_request_id | |
83 self._request_id = request_id | |
84 self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET, | |
85 request_id) | |
86 | |
87 @property | |
88 def has_request_id(self): | |
89 return _HasRequestId(self._flags) | |
90 | |
91 @property | |
92 def expects_response(self): | |
93 return self._HasFlag(MESSAGE_EXPECTS_RESPONSE_FLAG) | |
94 | |
95 @property | |
96 def is_response(self): | |
97 return self._HasFlag(MESSAGE_IS_RESPONSE_FLAG) | |
98 | |
99 @property | |
100 def size(self): | |
101 if self.has_request_id: | |
102 return self._MESSAGE_WITH_REQUEST_ID_SIZE | |
103 return self._SIMPLE_MESSAGE_STRUCT.size | |
104 | |
105 def Serialize(self): | |
106 if not self._data: | |
107 self._data = bytearray(self.size) | |
108 version = self._SIMPLE_MESSAGE_NUM_FIELDS | |
109 size = self._SIMPLE_MESSAGE_STRUCT.size | |
110 if self.has_request_id: | |
111 version = self._MESSAGE_WITH_REQUEST_ID_NUM_FIELDS | |
112 size = self._MESSAGE_WITH_REQUEST_ID_SIZE | |
113 self._SIMPLE_MESSAGE_STRUCT.pack_into(self._data, 0, size, version, | |
114 self._message_type, self._flags) | |
115 if self.has_request_id: | |
116 self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET, | |
117 self._request_id) | |
118 return self._data | |
119 | |
120 def _HasFlag(self, flag): | |
121 return self._flags & flag != 0 | |
122 | |
123 | |
124 class Message(object): | |
125 """A message for a message pipe. This contains data and handles.""" | |
126 | |
127 def __init__(self, data=None, handles=None, header=None): | |
128 self.data = data | |
129 self.handles = handles | |
130 self._header = header | |
131 self._payload = None | |
132 | |
133 @property | |
134 def header(self): | |
135 if self._header is None: | |
136 self._header = MessageHeader.Deserialize(self.data) | |
137 return self._header | |
138 | |
139 @property | |
140 def payload(self): | |
141 if self._payload is None: | |
142 self._payload = Message(self.data[self.header.size:], self.handles) | |
143 return self._payload | |
144 | |
145 def SetRequestId(self, request_id): | |
146 header = self.header | |
147 header.request_id = request_id | |
148 (data, _) = header.Serialize() | |
149 self.data[:header.Size] = data[:header.Size] | |
150 | |
151 | |
152 class MessageReceiver(object): | |
153 """A class which implements this interface can receive Message objects.""" | |
154 | |
155 def Accept(self, message): | |
156 """ | |
157 Receive a Message. The MessageReceiver is allowed to mutate the message. | |
158 | |
159 Args: | |
160 message: the received message. | |
161 | |
162 Returns: | |
163 True if the message has been handled, False otherwise. | |
164 """ | |
165 raise NotImplementedError() | |
166 | |
167 | |
168 class MessageReceiverWithResponder(MessageReceiver): | |
169 """ | |
170 A MessageReceiver that can also handle the response message generated from the | |
171 given message. | |
172 """ | |
173 | |
174 def AcceptWithResponder(self, message, responder): | |
175 """ | |
176 A variant on Accept that registers a MessageReceiver (known as the | |
177 responder) to handle the response message generated from the given message. | |
178 The responder's Accept method may be called as part of the call to | |
179 AcceptWithResponder, or some time after its return. | |
180 | |
181 Args: | |
182 message: the received message. | |
183 responder: the responder that will receive the response. | |
184 | |
185 Returns: | |
186 True if the message has been handled, False otherwise. | |
187 """ | |
188 raise NotImplementedError() | |
189 | |
190 | |
191 class ConnectionErrorHandler(object): | |
192 """ | |
193 A ConnectionErrorHandler is notified of an error happening while using the | |
194 bindings over message pipes. | |
195 """ | |
196 | |
197 def OnError(self, result): | |
198 raise NotImplementedError() | |
199 | |
200 | |
201 class Connector(MessageReceiver): | |
202 """ | |
203 A Connector owns a message pipe and will send any received messages to the | |
204 registered MessageReceiver. It also acts as a MessageReceiver and will send | |
205 any message through the handle. | |
206 | |
207 The method Start must be called before the Connector will start listening to | |
208 incoming messages. | |
209 """ | |
210 | |
211 def __init__(self, handle): | |
212 MessageReceiver.__init__(self) | |
213 self._handle = handle | |
214 self._cancellable = None | |
215 self._incoming_message_receiver = None | |
216 self._error_handler = None | |
217 | |
218 def __del__(self): | |
219 if self._cancellable: | |
220 self._cancellable() | |
221 | |
222 def SetIncomingMessageReceiver(self, message_receiver): | |
223 """ | |
224 Set the MessageReceiver that will receive message from the owned message | |
225 pipe. | |
226 """ | |
227 self._incoming_message_receiver = message_receiver | |
228 | |
229 def SetErrorHandler(self, error_handler): | |
230 """ | |
231 Set the ConnectionErrorHandler that will be notified of errors on the owned | |
232 message pipe. | |
233 """ | |
234 self._error_handler = error_handler | |
235 | |
236 def Start(self): | |
237 assert not self._cancellable | |
238 self._RegisterAsyncWaiterForRead() | |
239 | |
240 def Accept(self, message): | |
241 result = self._handle.WriteMessage(message.data, message.handles) | |
242 return result == system.RESULT_OK | |
243 | |
244 def Close(self): | |
245 if self._cancellable: | |
246 self._cancellable() | |
247 self._cancellable = None | |
248 self._handle.Close() | |
249 | |
250 def PassMessagePipe(self): | |
251 if self._cancellable: | |
252 self._cancellable() | |
253 self._cancellable = None | |
254 result = self._handle | |
255 self._handle = system.Handle() | |
256 return result | |
257 | |
258 def _OnAsyncWaiterResult(self, result): | |
259 self._cancellable = None | |
260 if result == system.RESULT_OK: | |
261 self._ReadOutstandingMessages() | |
262 else: | |
263 self._OnError(result) | |
264 | |
265 def _OnError(self, result): | |
266 assert not self._cancellable | |
267 if self._error_handler: | |
268 self._error_handler.OnError(result) | |
269 self._handle.Close() | |
270 | |
271 def _RegisterAsyncWaiterForRead(self) : | |
272 assert not self._cancellable | |
273 self._cancellable = self._handle.AsyncWait( | |
274 system.HANDLE_SIGNAL_READABLE, | |
275 system.DEADLINE_INDEFINITE, | |
276 _WeakCallback(self._OnAsyncWaiterResult)) | |
277 | |
278 def _ReadOutstandingMessages(self): | |
279 result = system.RESULT_OK | |
280 while result == system.RESULT_OK: | |
281 result = _ReadAndDispatchMessage(self._handle, | |
282 self._incoming_message_receiver) | |
283 if result == system.RESULT_SHOULD_WAIT: | |
284 self._RegisterAsyncWaiterForRead() | |
285 return | |
286 self._OnError(result) | |
287 | |
288 | |
289 class Router(MessageReceiverWithResponder): | |
290 """ | |
291 A Router will handle mojo message and forward those to a Connector. It deals | |
292 with parsing of headers and adding of request ids in order to be able to match | |
293 a response to a request. | |
294 """ | |
295 | |
296 def __init__(self, handle): | |
297 MessageReceiverWithResponder.__init__(self) | |
298 self._incoming_message_receiver = None | |
299 self._next_request_id = 1 | |
300 self._responders = {} | |
301 self._connector = Connector(handle) | |
302 self._connector.SetIncomingMessageReceiver( | |
303 ForwardingMessageReceiver(_WeakCallback(self._HandleIncomingMessage))) | |
304 | |
305 def Start(self): | |
306 self._connector.Start() | |
307 | |
308 def SetIncomingMessageReceiver(self, message_receiver): | |
309 """ | |
310 Set the MessageReceiver that will receive message from the owned message | |
311 pipe. | |
312 """ | |
313 self._incoming_message_receiver = message_receiver | |
314 | |
315 def SetErrorHandler(self, error_handler): | |
316 """ | |
317 Set the ConnectionErrorHandler that will be notified of errors on the owned | |
318 message pipe. | |
319 """ | |
320 self._connector.SetErrorHandler(error_handler) | |
321 | |
322 def Accept(self, message): | |
323 # A message without responder is directly forwarded to the connector. | |
324 return self._connector.Accept(message) | |
325 | |
326 def AcceptWithResponder(self, message, responder): | |
327 # The message must have a header. | |
328 header = message.header | |
329 assert header.expects_response | |
330 request_id = self._NextRequestId() | |
331 header.request_id = request_id | |
332 if not self._connector.Accept(message): | |
333 return False | |
334 self._responders[request_id] = responder | |
335 return True | |
336 | |
337 def Close(self): | |
338 self._connector.Close() | |
339 | |
340 def PassMessagePipe(self): | |
341 return self._connector.PassMessagePipe() | |
342 | |
343 def _HandleIncomingMessage(self, message): | |
344 header = message.header | |
345 if header.expects_response: | |
346 if self._incoming_message_receiver: | |
347 return self._incoming_message_receiver.AcceptWithResponder( | |
348 message, self) | |
349 # If we receive a request expecting a response when the client is not | |
350 # listening, then we have no choice but to tear down the pipe. | |
351 self.Close() | |
352 return False | |
353 if header.is_response: | |
354 request_id = header.request_id | |
355 responder = self._responders.pop(request_id, None) | |
356 if responder is None: | |
357 return False | |
358 return responder.Accept(message) | |
359 if self._incoming_message_receiver: | |
360 return self._incoming_message_receiver.Accept(message) | |
361 # Ok to drop the message | |
362 return False | |
363 | |
364 def _NextRequestId(self): | |
365 request_id = self._next_request_id | |
366 while request_id == 0 or request_id in self._responders: | |
367 request_id = (request_id + 1) % (1 << 64) | |
368 self._next_request_id = (request_id + 1) % (1 << 64) | |
369 return request_id | |
370 | |
371 class ForwardingMessageReceiver(MessageReceiver): | |
372 """A MessageReceiver that forward calls to |Accept| to a callable.""" | |
373 | |
374 def __init__(self, callback): | |
375 MessageReceiver.__init__(self) | |
376 self._callback = callback | |
377 | |
378 def Accept(self, message): | |
379 return self._callback(message) | |
380 | |
381 | |
382 def _WeakCallback(callback): | |
383 func = callback.im_func | |
384 self = callback.im_self | |
385 if not self: | |
386 return callback | |
387 weak_self = weakref.ref(self) | |
388 def Callback(*args, **kwargs): | |
389 self = weak_self() | |
390 if self: | |
391 return func(self, *args, **kwargs) | |
392 return Callback | |
393 | |
394 | |
395 def _ReadAndDispatchMessage(handle, message_receiver): | |
396 (result, _, sizes) = handle.ReadMessage() | |
397 if result == system.RESULT_OK and message_receiver: | |
398 message_receiver.Accept(Message(bytearray(), [])) | |
399 if result != system.RESULT_RESOURCE_EXHAUSTED: | |
400 return result | |
401 (result, data, _) = handle.ReadMessage(bytearray(sizes[0]), sizes[1]) | |
402 if result == system.RESULT_OK and message_receiver: | |
403 message_receiver.Accept(Message(data[0], data[1])) | |
404 return result | |
405 | |
406 def _HasRequestId(flags): | |
407 return flags & (MESSAGE_EXPECTS_RESPONSE_FLAG|MESSAGE_IS_RESPONSE_FLAG) != 0 | |
OLD | NEW |