OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include <limits> | 7 #include <limits> |
8 #include <memory> | 8 #include <memory> |
9 | 9 |
10 #include "base/macros.h" | 10 #include "base/macros.h" |
11 #include "base/memory/ref_counted.h" | 11 #include "base/memory/ref_counted.h" |
12 #include "mojo/edk/embedder/embedder_internal.h" | 12 #include "mojo/edk/embedder/embedder_internal.h" |
13 #include "mojo/edk/system/core.h" | 13 #include "mojo/edk/system/core.h" |
| 14 #include "mojo/edk/system/message_for_transit.h" |
14 #include "mojo/edk/system/node_controller.h" | 15 #include "mojo/edk/system/node_controller.h" |
15 #include "mojo/edk/system/ports_message.h" | 16 #include "mojo/edk/system/ports_message.h" |
16 #include "mojo/edk/system/request_context.h" | 17 #include "mojo/edk/system/request_context.h" |
17 | 18 |
18 namespace mojo { | 19 namespace mojo { |
19 namespace edk { | 20 namespace edk { |
20 | 21 |
21 namespace { | 22 namespace { |
22 | 23 |
| 24 using DispatcherHeader = MessageForTransit::DispatcherHeader; |
| 25 using MessageHeader = MessageForTransit::MessageHeader; |
| 26 |
23 #pragma pack(push, 1) | 27 #pragma pack(push, 1) |
24 | 28 |
25 // Header attached to every message sent over a message pipe. | |
26 struct MessageHeader { | |
27 // The number of serialized dispatchers included in this header. | |
28 uint32_t num_dispatchers; | |
29 | |
30 // Total size of the header, including serialized dispatcher data. | |
31 uint32_t header_size; | |
32 }; | |
33 | |
34 static_assert(sizeof(MessageHeader) % 8 == 0, "Invalid MessageHeader size."); | |
35 | |
36 // Header for each dispatcher, immediately following the message header. | |
37 struct DispatcherHeader { | |
38 // The type of the dispatcher, correpsonding to the Dispatcher::Type enum. | |
39 int32_t type; | |
40 | |
41 // The size of the serialized dispatcher, not including this header. | |
42 uint32_t num_bytes; | |
43 | |
44 // The number of ports needed to deserialize this dispatcher. | |
45 uint32_t num_ports; | |
46 | |
47 // The number of platform handles needed to deserialize this dispatcher. | |
48 uint32_t num_platform_handles; | |
49 }; | |
50 | |
51 static_assert(sizeof(DispatcherHeader) % 8 == 0, | |
52 "Invalid DispatcherHeader size."); | |
53 | |
54 struct SerializedState { | 29 struct SerializedState { |
55 uint64_t pipe_id; | 30 uint64_t pipe_id; |
56 int8_t endpoint; | 31 int8_t endpoint; |
57 char padding[7]; | 32 char padding[7]; |
58 }; | 33 }; |
59 | 34 |
60 static_assert(sizeof(SerializedState) % 8 == 0, | 35 static_assert(sizeof(SerializedState) % 8 == 0, |
61 "Invalid SerializedState size."); | 36 "Invalid SerializedState size."); |
62 | 37 |
63 #pragma pack(pop) | 38 #pragma pack(pop) |
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
150 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { | 125 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { |
151 base::AutoLock lock(signal_lock_); | 126 base::AutoLock lock(signal_lock_); |
152 | 127 |
153 if (port_closed_ || in_transit_) | 128 if (port_closed_ || in_transit_) |
154 return MOJO_RESULT_INVALID_ARGUMENT; | 129 return MOJO_RESULT_INVALID_ARGUMENT; |
155 | 130 |
156 return awakables_.RemoveWatcher(context); | 131 return awakables_.RemoveWatcher(context); |
157 } | 132 } |
158 | 133 |
159 MojoResult MessagePipeDispatcher::WriteMessage( | 134 MojoResult MessagePipeDispatcher::WriteMessage( |
160 const void* bytes, | 135 std::unique_ptr<MessageForTransit> message, |
161 uint32_t num_bytes, | |
162 const DispatcherInTransit* dispatchers, | |
163 uint32_t num_dispatchers, | |
164 MojoWriteMessageFlags flags) { | 136 MojoWriteMessageFlags flags) { |
165 | |
166 | |
167 if (port_closed_ || in_transit_) | 137 if (port_closed_ || in_transit_) |
168 return MOJO_RESULT_INVALID_ARGUMENT; | 138 return MOJO_RESULT_INVALID_ARGUMENT; |
169 | 139 |
170 // A structure for retaining information about every Dispatcher we're about | 140 size_t num_bytes = message->num_bytes(); |
171 // to send. This information is collected by calling StartSerialize() on | 141 std::unique_ptr<PortsMessage> msg = message->TakePortsMessage(); |
172 // each dispatcher in sequence. | 142 int rv = node_controller_->SendMessage(port_, &msg); |
173 struct DispatcherInfo { | |
174 uint32_t num_bytes; | |
175 uint32_t num_ports; | |
176 uint32_t num_handles; | |
177 }; | |
178 | 143 |
179 // This is only the base header size. It will grow as we accumulate the | 144 DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
180 // size of serialized state for each dispatcher. | 145 << " [port=" << port_.name() << "; rv=" << rv |
181 size_t header_size = sizeof(MessageHeader) + | 146 << "; num_bytes=" << num_bytes << "]"; |
182 num_dispatchers * sizeof(DispatcherHeader); | |
183 | 147 |
184 size_t num_ports = 0; | 148 if (rv != ports::OK) { |
185 size_t num_handles = 0; | 149 if (rv == ports::ERROR_PORT_UNKNOWN || |
| 150 rv == ports::ERROR_PORT_STATE_UNEXPECTED || |
| 151 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { |
| 152 return MOJO_RESULT_INVALID_ARGUMENT; |
| 153 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { |
| 154 base::AutoLock lock(signal_lock_); |
| 155 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 156 return MOJO_RESULT_FAILED_PRECONDITION; |
| 157 } |
186 | 158 |
187 std::vector<DispatcherInfo> dispatcher_info(num_dispatchers); | 159 NOTREACHED(); |
188 for (size_t i = 0; i < num_dispatchers; ++i) { | 160 return MOJO_RESULT_UNKNOWN; |
189 Dispatcher* d = dispatchers[i].dispatcher.get(); | |
190 d->StartSerialize(&dispatcher_info[i].num_bytes, | |
191 &dispatcher_info[i].num_ports, | |
192 &dispatcher_info[i].num_handles); | |
193 header_size += dispatcher_info[i].num_bytes; | |
194 num_ports += dispatcher_info[i].num_ports; | |
195 num_handles += dispatcher_info[i].num_handles; | |
196 } | 161 } |
197 | 162 |
198 // We now have enough information to fully allocate the message storage. | 163 return MOJO_RESULT_OK; |
199 std::unique_ptr<PortsMessage> message = PortsMessage::NewUserMessage( | |
200 header_size + num_bytes, num_ports, num_handles); | |
201 DCHECK(message); | |
202 | |
203 // Populate the message header with information about serialized dispatchers. | |
204 // | |
205 // The front of the message is always a MessageHeader followed by a | |
206 // DispatcherHeader for each dispatcher to be sent. | |
207 MessageHeader* header = | |
208 static_cast<MessageHeader*>(message->mutable_payload_bytes()); | |
209 DispatcherHeader* dispatcher_headers = | |
210 reinterpret_cast<DispatcherHeader*>(header + 1); | |
211 | |
212 // Serialized dispatcher state immediately follows the series of | |
213 // DispatcherHeaders. | |
214 char* dispatcher_data = | |
215 reinterpret_cast<char*>(dispatcher_headers + num_dispatchers); | |
216 | |
217 header->num_dispatchers = num_dispatchers; | |
218 | |
219 // |header_size| is the total number of bytes preceding the message payload, | |
220 // including all dispatcher headers and serialized dispatcher state. | |
221 DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max()); | |
222 header->header_size = static_cast<uint32_t>(header_size); | |
223 | |
224 bool cancel_transit = false; | |
225 if (num_dispatchers > 0) { | |
226 ScopedPlatformHandleVectorPtr handles( | |
227 new PlatformHandleVector(num_handles)); | |
228 size_t port_index = 0; | |
229 size_t handle_index = 0; | |
230 for (size_t i = 0; i < num_dispatchers; ++i) { | |
231 Dispatcher* d = dispatchers[i].dispatcher.get(); | |
232 DispatcherHeader* dh = &dispatcher_headers[i]; | |
233 const DispatcherInfo& info = dispatcher_info[i]; | |
234 | |
235 // Fill in the header for this dispatcher. | |
236 dh->type = static_cast<int32_t>(d->GetType()); | |
237 dh->num_bytes = info.num_bytes; | |
238 dh->num_ports = info.num_ports; | |
239 dh->num_platform_handles = info.num_handles; | |
240 | |
241 // Fill in serialized state, ports, and platform handles. We'll cancel | |
242 // the send if the dispatcher implementation rejects for some reason. | |
243 if (!d->EndSerialize(static_cast<void*>(dispatcher_data), | |
244 message->mutable_ports() + port_index, | |
245 handles->data() + handle_index)) { | |
246 cancel_transit = true; | |
247 break; | |
248 } | |
249 | |
250 dispatcher_data += info.num_bytes; | |
251 port_index += info.num_ports; | |
252 handle_index += info.num_handles; | |
253 } | |
254 | |
255 if (!cancel_transit) { | |
256 // Take ownership of all the handles and move them into message storage. | |
257 message->SetHandles(std::move(handles)); | |
258 } else { | |
259 // Release any platform handles we've accumulated. Their dispatchers | |
260 // retain ownership when transit is canceled, so these are not actually | |
261 // leaking. | |
262 handles->clear(); | |
263 } | |
264 } | |
265 | |
266 MojoResult result = MOJO_RESULT_OK; | |
267 if (!cancel_transit) { | |
268 // Copy the message body. | |
269 void* message_body = static_cast<void*>( | |
270 static_cast<char*>(message->mutable_payload_bytes()) + header_size); | |
271 memcpy(message_body, bytes, num_bytes); | |
272 | |
273 int rv = node_controller_->SendMessage(port_, &message); | |
274 | |
275 DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ | |
276 << " [port=" << port_.name() << "; rv=" << rv | |
277 << "; num_bytes=" << num_bytes << "]"; | |
278 | |
279 if (rv != ports::OK) { | |
280 if (rv == ports::ERROR_PORT_UNKNOWN || | |
281 rv == ports::ERROR_PORT_STATE_UNEXPECTED || | |
282 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { | |
283 result = MOJO_RESULT_INVALID_ARGUMENT; | |
284 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { | |
285 base::AutoLock lock(signal_lock_); | |
286 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | |
287 result = MOJO_RESULT_FAILED_PRECONDITION; | |
288 } else { | |
289 NOTREACHED(); | |
290 result = MOJO_RESULT_UNKNOWN; | |
291 } | |
292 cancel_transit = true; | |
293 } else { | |
294 DCHECK(!message); | |
295 } | |
296 } | |
297 | |
298 if (cancel_transit) { | |
299 // We ended up not sending the message. Release all the platform handles. | |
300 // Their dipatchers retain ownership when transit is canceled, so these are | |
301 // not actually leaking. | |
302 DCHECK(message); | |
303 Channel::MessagePtr m = message->TakeChannelMessage(); | |
304 ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); | |
305 if (handles) | |
306 handles->clear(); | |
307 } | |
308 | |
309 return result; | |
310 } | 164 } |
311 | 165 |
312 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, | 166 MojoResult MessagePipeDispatcher::ReadMessage( |
313 uint32_t* num_bytes, | 167 std::unique_ptr<MessageForTransit>* message, |
314 MojoHandle* handles, | 168 uint32_t* num_bytes, |
315 uint32_t* num_handles, | 169 MojoHandle* handles, |
316 MojoReadMessageFlags flags) { | 170 uint32_t* num_handles, |
| 171 MojoReadMessageFlags flags, |
| 172 bool read_any_size) { |
317 // We can't read from a port that's closed or in transit! | 173 // We can't read from a port that's closed or in transit! |
318 if (port_closed_ || in_transit_) | 174 if (port_closed_ || in_transit_) |
319 return MOJO_RESULT_INVALID_ARGUMENT; | 175 return MOJO_RESULT_INVALID_ARGUMENT; |
320 | 176 |
321 bool no_space = false; | 177 bool no_space = false; |
322 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; | 178 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; |
323 | 179 |
324 // Ensure the provided buffers are large enough to hold the next message. | 180 // Grab a message if the provided handles buffer is large enough. If the input |
325 // GetMessageIf provides an atomic way to test the next message without | 181 // |num_bytes| is provided and |read_any_size| is false, we also ensure |
326 // committing to removing it from the port's underlying message queue until | 182 // that it specifies a size at least as large as the next available payload. |
327 // we are sure we can consume it. | 183 // |
| 184 // If |read_any_size| is true, the input value of |*num_bytes| is ignored. |
| 185 // This flag exists to support both new and old API behavior. |
328 | 186 |
329 ports::ScopedMessage ports_message; | 187 ports::ScopedMessage ports_message; |
330 int rv = node_controller_->node()->GetMessageIf( | 188 int rv = node_controller_->node()->GetMessageIf( |
331 port_, | 189 port_, |
332 [num_bytes, num_handles, &no_space, &may_discard]( | 190 [read_any_size, num_bytes, num_handles, &no_space, &may_discard]( |
333 const ports::Message& next_message) { | 191 const ports::Message& next_message) { |
334 const PortsMessage& message = | 192 const PortsMessage& message = |
335 static_cast<const PortsMessage&>(next_message); | 193 static_cast<const PortsMessage&>(next_message); |
336 DCHECK_GE(message.num_payload_bytes(), sizeof(MessageHeader)); | 194 DCHECK_GE(message.num_payload_bytes(), sizeof(MessageHeader)); |
337 | 195 |
338 const MessageHeader* header = | 196 const MessageHeader* header = |
339 static_cast<const MessageHeader*>(message.payload_bytes()); | 197 static_cast<const MessageHeader*>(message.payload_bytes()); |
340 DCHECK_LE(header->header_size, message.num_payload_bytes()); | 198 DCHECK_LE(header->header_size, message.num_payload_bytes()); |
341 | 199 |
342 uint32_t bytes_to_read = 0; | 200 uint32_t bytes_to_read = 0; |
343 uint32_t bytes_available = | 201 uint32_t bytes_available = |
344 static_cast<uint32_t>(message.num_payload_bytes()) - | 202 static_cast<uint32_t>(message.num_payload_bytes()) - |
345 header->header_size; | 203 header->header_size; |
346 if (num_bytes) { | 204 if (num_bytes) { |
347 bytes_to_read = std::min(*num_bytes, bytes_available); | 205 bytes_to_read = std::min(*num_bytes, bytes_available); |
348 *num_bytes = bytes_available; | 206 *num_bytes = bytes_available; |
349 } | 207 } |
350 | 208 |
351 uint32_t handles_to_read = 0; | 209 uint32_t handles_to_read = 0; |
352 uint32_t handles_available = header->num_dispatchers; | 210 uint32_t handles_available = header->num_dispatchers; |
353 if (num_handles) { | 211 if (num_handles) { |
354 handles_to_read = std::min(*num_handles, handles_available); | 212 handles_to_read = std::min(*num_handles, handles_available); |
355 *num_handles = handles_available; | 213 *num_handles = handles_available; |
356 } | 214 } |
357 | 215 |
358 if (bytes_to_read < bytes_available || | 216 if (handles_to_read < handles_available || |
359 handles_to_read < handles_available) { | 217 (!read_any_size && bytes_to_read < bytes_available)) { |
360 no_space = true; | 218 no_space = true; |
361 return may_discard; | 219 return may_discard; |
362 } | 220 } |
363 | 221 |
364 return true; | 222 return true; |
365 }, | 223 }, |
366 &ports_message); | 224 &ports_message); |
367 | 225 |
368 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { | 226 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { |
369 if (rv == ports::ERROR_PORT_UNKNOWN || | 227 if (rv == ports::ERROR_PORT_UNKNOWN || |
370 rv == ports::ERROR_PORT_STATE_UNEXPECTED) | 228 rv == ports::ERROR_PORT_STATE_UNEXPECTED) |
371 return MOJO_RESULT_INVALID_ARGUMENT; | 229 return MOJO_RESULT_INVALID_ARGUMENT; |
372 | 230 |
373 NOTREACHED(); | 231 NOTREACHED(); |
374 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? | 232 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? |
375 } | 233 } |
376 | 234 |
377 if (no_space) { | 235 if (no_space) { |
378 // Either |*num_bytes| or |*num_handles| wasn't sufficient to hold this | 236 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't |
379 // message's data. The message will still be in queue unless | 237 // sufficient to hold this message's data. The message will still be in |
380 // MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. | 238 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
381 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 239 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
382 } | 240 } |
383 | 241 |
384 if (!ports_message) { | 242 if (!ports_message) { |
385 // No message was available in queue. | 243 // No message was available in queue. |
386 | 244 |
387 if (rv == ports::OK) | 245 if (rv == ports::OK) |
388 return MOJO_RESULT_SHOULD_WAIT; | 246 return MOJO_RESULT_SHOULD_WAIT; |
389 | 247 |
390 // Peer is closed and there are no more messages to read. | 248 // Peer is closed and there are no more messages to read. |
391 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); | 249 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); |
392 base::AutoLock lock(signal_lock_); | 250 base::AutoLock lock(signal_lock_); |
393 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 251 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
394 return MOJO_RESULT_FAILED_PRECONDITION; | 252 return MOJO_RESULT_FAILED_PRECONDITION; |
395 } | 253 } |
396 | 254 |
397 // Alright! We have a message and the caller has provided sufficient storage | 255 // Alright! We have a message and the caller has provided sufficient storage |
398 // in which to receive it. | 256 // in which to receive it. |
399 | 257 |
400 std::unique_ptr<PortsMessage> message( | 258 std::unique_ptr<PortsMessage> msg( |
401 static_cast<PortsMessage*>(ports_message.release())); | 259 static_cast<PortsMessage*>(ports_message.release())); |
402 | 260 |
403 const MessageHeader* header = | 261 const MessageHeader* header = |
404 static_cast<const MessageHeader*>(message->payload_bytes()); | 262 static_cast<const MessageHeader*>( msg->payload_bytes()); |
405 const DispatcherHeader* dispatcher_headers = | 263 const DispatcherHeader* dispatcher_headers = |
406 reinterpret_cast<const DispatcherHeader*>(header + 1); | 264 reinterpret_cast<const DispatcherHeader*>(header + 1); |
407 | 265 |
408 const char* dispatcher_data = reinterpret_cast<const char*>( | 266 const char* dispatcher_data = reinterpret_cast<const char*>( |
409 dispatcher_headers + header->num_dispatchers); | 267 dispatcher_headers + header->num_dispatchers); |
410 | 268 |
411 // Deserialize dispatchers. | 269 // Deserialize dispatchers. |
412 if (header->num_dispatchers > 0) { | 270 if (header->num_dispatchers > 0) { |
413 CHECK(handles); | 271 CHECK(handles); |
414 std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers); | 272 std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers); |
415 size_t data_payload_index = sizeof(MessageHeader) + | 273 size_t data_payload_index = sizeof(MessageHeader) + |
416 header->num_dispatchers * sizeof(DispatcherHeader); | 274 header->num_dispatchers * sizeof(DispatcherHeader); |
417 size_t port_index = 0; | 275 size_t port_index = 0; |
418 size_t platform_handle_index = 0; | 276 size_t platform_handle_index = 0; |
419 for (size_t i = 0; i < header->num_dispatchers; ++i) { | 277 for (size_t i = 0; i < header->num_dispatchers; ++i) { |
420 const DispatcherHeader& dh = dispatcher_headers[i]; | 278 const DispatcherHeader& dh = dispatcher_headers[i]; |
421 Type type = static_cast<Type>(dh.type); | 279 Type type = static_cast<Type>(dh.type); |
422 | 280 |
423 DCHECK_GE(message->num_payload_bytes(), | 281 DCHECK_GE(msg->num_payload_bytes(), |
424 data_payload_index + dh.num_bytes); | 282 data_payload_index + dh.num_bytes); |
425 DCHECK_GE(message->num_ports(), | 283 DCHECK_GE(msg->num_ports(), |
426 port_index + dh.num_ports); | 284 port_index + dh.num_ports); |
427 DCHECK_GE(message->num_handles(), | 285 DCHECK_GE(msg->num_handles(), |
428 platform_handle_index + dh.num_platform_handles); | 286 platform_handle_index + dh.num_platform_handles); |
429 | 287 |
430 PlatformHandle* out_handles = | 288 PlatformHandle* out_handles = |
431 message->num_handles() ? message->handles() + platform_handle_index | 289 msg->num_handles() ? msg->handles() + platform_handle_index : nullptr; |
432 : nullptr; | |
433 dispatchers[i].dispatcher = Dispatcher::Deserialize( | 290 dispatchers[i].dispatcher = Dispatcher::Deserialize( |
434 type, dispatcher_data, dh.num_bytes, message->ports() + port_index, | 291 type, dispatcher_data, dh.num_bytes, msg->ports() + port_index, |
435 dh.num_ports, out_handles, dh.num_platform_handles); | 292 dh.num_ports, out_handles, dh.num_platform_handles); |
436 if (!dispatchers[i].dispatcher) | 293 if (!dispatchers[i].dispatcher) |
437 return MOJO_RESULT_UNKNOWN; | 294 return MOJO_RESULT_UNKNOWN; |
438 | 295 |
439 dispatcher_data += dh.num_bytes; | 296 dispatcher_data += dh.num_bytes; |
440 data_payload_index += dh.num_bytes; | 297 data_payload_index += dh.num_bytes; |
441 port_index += dh.num_ports; | 298 port_index += dh.num_ports; |
442 platform_handle_index += dh.num_platform_handles; | 299 platform_handle_index += dh.num_platform_handles; |
443 } | 300 } |
444 | 301 |
445 if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers, | 302 if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers, |
446 handles)) | 303 handles)) |
447 return MOJO_RESULT_UNKNOWN; | 304 return MOJO_RESULT_UNKNOWN; |
448 } | 305 } |
449 | 306 |
450 // Copy message bytes. | 307 CHECK(msg); |
451 DCHECK_GE(message->num_payload_bytes(), header->header_size); | 308 *message = MessageForTransit::WrapPortsMessage(std::move(msg)); |
452 const char* payload = reinterpret_cast<const char*>(message->payload_bytes()); | |
453 memcpy(bytes, payload + header->header_size, | |
454 message->num_payload_bytes() - header->header_size); | |
455 | |
456 return MOJO_RESULT_OK; | 309 return MOJO_RESULT_OK; |
457 } | 310 } |
458 | 311 |
459 HandleSignalsState | 312 HandleSignalsState |
460 MessagePipeDispatcher::GetHandleSignalsState() const { | 313 MessagePipeDispatcher::GetHandleSignalsState() const { |
461 base::AutoLock lock(signal_lock_); | 314 base::AutoLock lock(signal_lock_); |
462 return GetHandleSignalsStateNoLock(); | 315 return GetHandleSignalsStateNoLock(); |
463 } | 316 } |
464 | 317 |
465 MojoResult MessagePipeDispatcher::AddAwakable( | 318 MojoResult MessagePipeDispatcher::AddAwakable( |
(...skipping 197 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
663 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ | 516 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ |
664 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 517 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
665 } | 518 } |
666 #endif | 519 #endif |
667 | 520 |
668 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 521 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
669 } | 522 } |
670 | 523 |
671 } // namespace edk | 524 } // namespace edk |
672 } // namespace mojo | 525 } // namespace mojo |
OLD | NEW |