| OLD | NEW |
| (Empty) |
| 1 # Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 # Use of this source code is governed by a BSD-style license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 """Utility classes to handle sending and receiving messages.""" | |
| 6 | |
| 7 | |
| 8 import struct | |
| 9 import sys | |
| 10 import weakref | |
| 11 | |
| 12 import mojo_bindings.serialization as serialization | |
| 13 | |
| 14 # pylint: disable=E0611,F0401 | |
| 15 import mojo_system as system | |
| 16 | |
| 17 | |
| 18 # The flag values for a message header. | |
| 19 NO_FLAG = 0 | |
| 20 MESSAGE_EXPECTS_RESPONSE_FLAG = 1 << 0 | |
| 21 MESSAGE_IS_RESPONSE_FLAG = 1 << 1 | |
| 22 | |
| 23 | |
| 24 class MessagingException(Exception): | |
| 25 def __init__(self, *args, **kwargs): | |
| 26 Exception.__init__(self, *args, **kwargs) | |
| 27 self.__traceback__ = sys.exc_info()[2] | |
| 28 | |
| 29 | |
| 30 class MessageHeader(object): | |
| 31 """The header of a mojo message.""" | |
| 32 | |
| 33 _SIMPLE_MESSAGE_NUM_FIELDS = 2 | |
| 34 _SIMPLE_MESSAGE_STRUCT = struct.Struct("<IIII") | |
| 35 | |
| 36 _REQUEST_ID_STRUCT = struct.Struct("<Q") | |
| 37 _REQUEST_ID_OFFSET = _SIMPLE_MESSAGE_STRUCT.size | |
| 38 | |
| 39 _MESSAGE_WITH_REQUEST_ID_NUM_FIELDS = 3 | |
| 40 _MESSAGE_WITH_REQUEST_ID_SIZE = ( | |
| 41 _SIMPLE_MESSAGE_STRUCT.size + _REQUEST_ID_STRUCT.size) | |
| 42 | |
| 43 def __init__(self, message_type, flags, request_id=0, data=None): | |
| 44 self._message_type = message_type | |
| 45 self._flags = flags | |
| 46 self._request_id = request_id | |
| 47 self._data = data | |
| 48 | |
| 49 @classmethod | |
| 50 def Deserialize(cls, data): | |
| 51 buf = buffer(data) | |
| 52 if len(data) < cls._SIMPLE_MESSAGE_STRUCT.size: | |
| 53 raise serialization.DeserializationException('Header is too short.') | |
| 54 (size, version, message_type, flags) = ( | |
| 55 cls._SIMPLE_MESSAGE_STRUCT.unpack_from(buf)) | |
| 56 if (version < cls._SIMPLE_MESSAGE_NUM_FIELDS): | |
| 57 raise serialization.DeserializationException('Incorrect version.') | |
| 58 request_id = 0 | |
| 59 if _HasRequestId(flags): | |
| 60 if version < cls._MESSAGE_WITH_REQUEST_ID_NUM_FIELDS: | |
| 61 raise serialization.DeserializationException('Incorrect version.') | |
| 62 if (size < cls._MESSAGE_WITH_REQUEST_ID_SIZE or | |
| 63 len(data) < cls._MESSAGE_WITH_REQUEST_ID_SIZE): | |
| 64 raise serialization.DeserializationException('Header is too short.') | |
| 65 (request_id, ) = cls._REQUEST_ID_STRUCT.unpack_from( | |
| 66 buf, cls._REQUEST_ID_OFFSET) | |
| 67 return MessageHeader(message_type, flags, request_id, data) | |
| 68 | |
| 69 @property | |
| 70 def message_type(self): | |
| 71 return self._message_type | |
| 72 | |
| 73 # pylint: disable=E0202 | |
| 74 @property | |
| 75 def request_id(self): | |
| 76 assert self.has_request_id | |
| 77 return self._request_id | |
| 78 | |
| 79 # pylint: disable=E0202 | |
| 80 @request_id.setter | |
| 81 def request_id(self, request_id): | |
| 82 assert self.has_request_id | |
| 83 self._request_id = request_id | |
| 84 self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET, | |
| 85 request_id) | |
| 86 | |
| 87 @property | |
| 88 def has_request_id(self): | |
| 89 return _HasRequestId(self._flags) | |
| 90 | |
| 91 @property | |
| 92 def expects_response(self): | |
| 93 return self._HasFlag(MESSAGE_EXPECTS_RESPONSE_FLAG) | |
| 94 | |
| 95 @property | |
| 96 def is_response(self): | |
| 97 return self._HasFlag(MESSAGE_IS_RESPONSE_FLAG) | |
| 98 | |
| 99 @property | |
| 100 def size(self): | |
| 101 if self.has_request_id: | |
| 102 return self._MESSAGE_WITH_REQUEST_ID_SIZE | |
| 103 return self._SIMPLE_MESSAGE_STRUCT.size | |
| 104 | |
| 105 def Serialize(self): | |
| 106 if not self._data: | |
| 107 self._data = bytearray(self.size) | |
| 108 version = self._SIMPLE_MESSAGE_NUM_FIELDS | |
| 109 size = self._SIMPLE_MESSAGE_STRUCT.size | |
| 110 if self.has_request_id: | |
| 111 version = self._MESSAGE_WITH_REQUEST_ID_NUM_FIELDS | |
| 112 size = self._MESSAGE_WITH_REQUEST_ID_SIZE | |
| 113 self._SIMPLE_MESSAGE_STRUCT.pack_into(self._data, 0, size, version, | |
| 114 self._message_type, self._flags) | |
| 115 if self.has_request_id: | |
| 116 self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET, | |
| 117 self._request_id) | |
| 118 return self._data | |
| 119 | |
| 120 def _HasFlag(self, flag): | |
| 121 return self._flags & flag != 0 | |
| 122 | |
| 123 | |
| 124 class Message(object): | |
| 125 """A message for a message pipe. This contains data and handles.""" | |
| 126 | |
| 127 def __init__(self, data=None, handles=None, header=None): | |
| 128 self.data = data | |
| 129 self.handles = handles | |
| 130 self._header = header | |
| 131 self._payload = None | |
| 132 | |
| 133 @property | |
| 134 def header(self): | |
| 135 if self._header is None: | |
| 136 self._header = MessageHeader.Deserialize(self.data) | |
| 137 return self._header | |
| 138 | |
| 139 @property | |
| 140 def payload(self): | |
| 141 if self._payload is None: | |
| 142 self._payload = Message(self.data[self.header.size:], self.handles) | |
| 143 return self._payload | |
| 144 | |
| 145 def SetRequestId(self, request_id): | |
| 146 header = self.header | |
| 147 header.request_id = request_id | |
| 148 (data, _) = header.Serialize() | |
| 149 self.data[:header.Size] = data[:header.Size] | |
| 150 | |
| 151 | |
| 152 class MessageReceiver(object): | |
| 153 """A class which implements this interface can receive Message objects.""" | |
| 154 | |
| 155 def Accept(self, message): | |
| 156 """ | |
| 157 Receive a Message. The MessageReceiver is allowed to mutate the message. | |
| 158 | |
| 159 Args: | |
| 160 message: the received message. | |
| 161 | |
| 162 Returns: | |
| 163 True if the message has been handled, False otherwise. | |
| 164 """ | |
| 165 raise NotImplementedError() | |
| 166 | |
| 167 | |
| 168 class MessageReceiverWithResponder(MessageReceiver): | |
| 169 """ | |
| 170 A MessageReceiver that can also handle the response message generated from the | |
| 171 given message. | |
| 172 """ | |
| 173 | |
| 174 def AcceptWithResponder(self, message, responder): | |
| 175 """ | |
| 176 A variant on Accept that registers a MessageReceiver (known as the | |
| 177 responder) to handle the response message generated from the given message. | |
| 178 The responder's Accept method may be called as part of the call to | |
| 179 AcceptWithResponder, or some time after its return. | |
| 180 | |
| 181 Args: | |
| 182 message: the received message. | |
| 183 responder: the responder that will receive the response. | |
| 184 | |
| 185 Returns: | |
| 186 True if the message has been handled, False otherwise. | |
| 187 """ | |
| 188 raise NotImplementedError() | |
| 189 | |
| 190 | |
| 191 class ConnectionErrorHandler(object): | |
| 192 """ | |
| 193 A ConnectionErrorHandler is notified of an error happening while using the | |
| 194 bindings over message pipes. | |
| 195 """ | |
| 196 | |
| 197 def OnError(self, result): | |
| 198 raise NotImplementedError() | |
| 199 | |
| 200 | |
| 201 class Connector(MessageReceiver): | |
| 202 """ | |
| 203 A Connector owns a message pipe and will send any received messages to the | |
| 204 registered MessageReceiver. It also acts as a MessageReceiver and will send | |
| 205 any message through the handle. | |
| 206 | |
| 207 The method Start must be called before the Connector will start listening to | |
| 208 incoming messages. | |
| 209 """ | |
| 210 | |
| 211 def __init__(self, handle): | |
| 212 MessageReceiver.__init__(self) | |
| 213 self._handle = handle | |
| 214 self._cancellable = None | |
| 215 self._incoming_message_receiver = None | |
| 216 self._error_handler = None | |
| 217 | |
| 218 def __del__(self): | |
| 219 if self._cancellable: | |
| 220 self._cancellable() | |
| 221 | |
| 222 def SetIncomingMessageReceiver(self, message_receiver): | |
| 223 """ | |
| 224 Set the MessageReceiver that will receive message from the owned message | |
| 225 pipe. | |
| 226 """ | |
| 227 self._incoming_message_receiver = message_receiver | |
| 228 | |
| 229 def SetErrorHandler(self, error_handler): | |
| 230 """ | |
| 231 Set the ConnectionErrorHandler that will be notified of errors on the owned | |
| 232 message pipe. | |
| 233 """ | |
| 234 self._error_handler = error_handler | |
| 235 | |
| 236 def Start(self): | |
| 237 assert not self._cancellable | |
| 238 self._RegisterAsyncWaiterForRead() | |
| 239 | |
| 240 def Accept(self, message): | |
| 241 result = self._handle.WriteMessage(message.data, message.handles) | |
| 242 return result == system.RESULT_OK | |
| 243 | |
| 244 def Close(self): | |
| 245 if self._cancellable: | |
| 246 self._cancellable() | |
| 247 self._cancellable = None | |
| 248 self._handle.Close() | |
| 249 | |
| 250 def PassMessagePipe(self): | |
| 251 if self._cancellable: | |
| 252 self._cancellable() | |
| 253 self._cancellable = None | |
| 254 result = self._handle | |
| 255 self._handle = system.Handle() | |
| 256 return result | |
| 257 | |
| 258 def _OnAsyncWaiterResult(self, result): | |
| 259 self._cancellable = None | |
| 260 if result == system.RESULT_OK: | |
| 261 self._ReadOutstandingMessages() | |
| 262 else: | |
| 263 self._OnError(result) | |
| 264 | |
| 265 def _OnError(self, result): | |
| 266 assert not self._cancellable | |
| 267 if self._error_handler: | |
| 268 self._error_handler.OnError(result) | |
| 269 self._handle.Close() | |
| 270 | |
| 271 def _RegisterAsyncWaiterForRead(self) : | |
| 272 assert not self._cancellable | |
| 273 self._cancellable = self._handle.AsyncWait( | |
| 274 system.HANDLE_SIGNAL_READABLE, | |
| 275 system.DEADLINE_INDEFINITE, | |
| 276 _WeakCallback(self._OnAsyncWaiterResult)) | |
| 277 | |
| 278 def _ReadOutstandingMessages(self): | |
| 279 result = system.RESULT_OK | |
| 280 while result == system.RESULT_OK: | |
| 281 result = _ReadAndDispatchMessage(self._handle, | |
| 282 self._incoming_message_receiver) | |
| 283 if result == system.RESULT_SHOULD_WAIT: | |
| 284 self._RegisterAsyncWaiterForRead() | |
| 285 return | |
| 286 self._OnError(result) | |
| 287 | |
| 288 | |
| 289 class Router(MessageReceiverWithResponder): | |
| 290 """ | |
| 291 A Router will handle mojo message and forward those to a Connector. It deals | |
| 292 with parsing of headers and adding of request ids in order to be able to match | |
| 293 a response to a request. | |
| 294 """ | |
| 295 | |
| 296 def __init__(self, handle): | |
| 297 MessageReceiverWithResponder.__init__(self) | |
| 298 self._incoming_message_receiver = None | |
| 299 self._next_request_id = 1 | |
| 300 self._responders = {} | |
| 301 self._connector = Connector(handle) | |
| 302 self._connector.SetIncomingMessageReceiver( | |
| 303 ForwardingMessageReceiver(_WeakCallback(self._HandleIncomingMessage))) | |
| 304 | |
| 305 def Start(self): | |
| 306 self._connector.Start() | |
| 307 | |
| 308 def SetIncomingMessageReceiver(self, message_receiver): | |
| 309 """ | |
| 310 Set the MessageReceiver that will receive message from the owned message | |
| 311 pipe. | |
| 312 """ | |
| 313 self._incoming_message_receiver = message_receiver | |
| 314 | |
| 315 def SetErrorHandler(self, error_handler): | |
| 316 """ | |
| 317 Set the ConnectionErrorHandler that will be notified of errors on the owned | |
| 318 message pipe. | |
| 319 """ | |
| 320 self._connector.SetErrorHandler(error_handler) | |
| 321 | |
| 322 def Accept(self, message): | |
| 323 # A message without responder is directly forwarded to the connector. | |
| 324 return self._connector.Accept(message) | |
| 325 | |
| 326 def AcceptWithResponder(self, message, responder): | |
| 327 # The message must have a header. | |
| 328 header = message.header | |
| 329 assert header.expects_response | |
| 330 request_id = self._NextRequestId() | |
| 331 header.request_id = request_id | |
| 332 if not self._connector.Accept(message): | |
| 333 return False | |
| 334 self._responders[request_id] = responder | |
| 335 return True | |
| 336 | |
| 337 def Close(self): | |
| 338 self._connector.Close() | |
| 339 | |
| 340 def PassMessagePipe(self): | |
| 341 return self._connector.PassMessagePipe() | |
| 342 | |
| 343 def _HandleIncomingMessage(self, message): | |
| 344 header = message.header | |
| 345 if header.expects_response: | |
| 346 if self._incoming_message_receiver: | |
| 347 return self._incoming_message_receiver.AcceptWithResponder( | |
| 348 message, self) | |
| 349 # If we receive a request expecting a response when the client is not | |
| 350 # listening, then we have no choice but to tear down the pipe. | |
| 351 self.Close() | |
| 352 return False | |
| 353 if header.is_response: | |
| 354 request_id = header.request_id | |
| 355 responder = self._responders.pop(request_id, None) | |
| 356 if responder is None: | |
| 357 return False | |
| 358 return responder.Accept(message) | |
| 359 if self._incoming_message_receiver: | |
| 360 return self._incoming_message_receiver.Accept(message) | |
| 361 # Ok to drop the message | |
| 362 return False | |
| 363 | |
| 364 def _NextRequestId(self): | |
| 365 request_id = self._next_request_id | |
| 366 while request_id == 0 or request_id in self._responders: | |
| 367 request_id = (request_id + 1) % (1 << 64) | |
| 368 self._next_request_id = (request_id + 1) % (1 << 64) | |
| 369 return request_id | |
| 370 | |
| 371 class ForwardingMessageReceiver(MessageReceiver): | |
| 372 """A MessageReceiver that forward calls to |Accept| to a callable.""" | |
| 373 | |
| 374 def __init__(self, callback): | |
| 375 MessageReceiver.__init__(self) | |
| 376 self._callback = callback | |
| 377 | |
| 378 def Accept(self, message): | |
| 379 return self._callback(message) | |
| 380 | |
| 381 | |
| 382 def _WeakCallback(callback): | |
| 383 func = callback.im_func | |
| 384 self = callback.im_self | |
| 385 if not self: | |
| 386 return callback | |
| 387 weak_self = weakref.ref(self) | |
| 388 def Callback(*args, **kwargs): | |
| 389 self = weak_self() | |
| 390 if self: | |
| 391 return func(self, *args, **kwargs) | |
| 392 return Callback | |
| 393 | |
| 394 | |
| 395 def _ReadAndDispatchMessage(handle, message_receiver): | |
| 396 (result, _, sizes) = handle.ReadMessage() | |
| 397 if result == system.RESULT_OK and message_receiver: | |
| 398 message_receiver.Accept(Message(bytearray(), [])) | |
| 399 if result != system.RESULT_RESOURCE_EXHAUSTED: | |
| 400 return result | |
| 401 (result, data, _) = handle.ReadMessage(bytearray(sizes[0]), sizes[1]) | |
| 402 if result == system.RESULT_OK and message_receiver: | |
| 403 message_receiver.Accept(Message(data[0], data[1])) | |
| 404 return result | |
| 405 | |
| 406 def _HasRequestId(flags): | |
| 407 return flags & (MESSAGE_EXPECTS_RESPONSE_FLAG|MESSAGE_IS_RESPONSE_FLAG) != 0 | |
| OLD | NEW |