OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include <limits> | 7 #include <limits> |
8 | 8 |
9 #include "base/macros.h" | 9 #include "base/macros.h" |
10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
11 #include "base/memory/scoped_ptr.h" | 11 #include "base/memory/scoped_ptr.h" |
12 #include "mojo/edk/embedder/embedder_internal.h" | 12 #include "mojo/edk/embedder/embedder_internal.h" |
13 #include "mojo/edk/system/core.h" | 13 #include "mojo/edk/system/core.h" |
14 #include "mojo/edk/system/message_for_transit.h" | |
14 #include "mojo/edk/system/node_controller.h" | 15 #include "mojo/edk/system/node_controller.h" |
15 #include "mojo/edk/system/ports_message.h" | 16 #include "mojo/edk/system/ports_message.h" |
16 #include "mojo/edk/system/request_context.h" | 17 #include "mojo/edk/system/request_context.h" |
17 | 18 |
18 namespace mojo { | 19 namespace mojo { |
19 namespace edk { | 20 namespace edk { |
20 | 21 |
21 namespace { | 22 namespace { |
22 | 23 |
23 #pragma pack(push, 1) | 24 #pragma pack(push, 1) |
24 | 25 |
25 // Header attached to every message sent over a message pipe. | |
26 struct MessageHeader { | |
27 // The number of serialized dispatchers included in this header. | |
28 uint32_t num_dispatchers; | |
29 | |
30 // Total size of the header, including serialized dispatcher data. | |
31 uint32_t header_size; | |
32 }; | |
33 | |
34 static_assert(sizeof(MessageHeader) % 8 == 0, "Invalid MessageHeader size."); | |
35 | |
36 // Header for each dispatcher, immediately following the message header. | |
37 struct DispatcherHeader { | |
38 // The type of the dispatcher, correpsonding to the Dispatcher::Type enum. | |
39 int32_t type; | |
40 | |
41 // The size of the serialized dispatcher, not including this header. | |
42 uint32_t num_bytes; | |
43 | |
44 // The number of ports needed to deserialize this dispatcher. | |
45 uint32_t num_ports; | |
46 | |
47 // The number of platform handles needed to deserialize this dispatcher. | |
48 uint32_t num_platform_handles; | |
49 }; | |
50 | |
51 static_assert(sizeof(DispatcherHeader) % 8 == 0, | |
52 "Invalid DispatcherHeader size."); | |
53 | |
54 struct SerializedState { | 26 struct SerializedState { |
55 uint64_t pipe_id; | 27 uint64_t pipe_id; |
56 int8_t endpoint; | 28 int8_t endpoint; |
57 char padding[7]; | 29 char padding[7]; |
58 }; | 30 }; |
59 | 31 |
60 static_assert(sizeof(SerializedState) % 8 == 0, | 32 static_assert(sizeof(SerializedState) % 8 == 0, |
61 "Invalid SerializedState size."); | 33 "Invalid SerializedState size."); |
62 | 34 |
63 #pragma pack(pop) | 35 #pragma pack(pop) |
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
150 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { | 122 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { |
151 base::AutoLock lock(signal_lock_); | 123 base::AutoLock lock(signal_lock_); |
152 | 124 |
153 if (port_closed_ || in_transit_) | 125 if (port_closed_ || in_transit_) |
154 return MOJO_RESULT_INVALID_ARGUMENT; | 126 return MOJO_RESULT_INVALID_ARGUMENT; |
155 | 127 |
156 return awakables_.RemoveWatcher(context); | 128 return awakables_.RemoveWatcher(context); |
157 } | 129 } |
158 | 130 |
159 MojoResult MessagePipeDispatcher::WriteMessage( | 131 MojoResult MessagePipeDispatcher::WriteMessage( |
160 const void* bytes, | 132 std::unique_ptr<MessageForTransit> message, |
161 uint32_t num_bytes, | |
162 const DispatcherInTransit* dispatchers, | |
163 uint32_t num_dispatchers, | |
164 MojoWriteMessageFlags flags) { | 133 MojoWriteMessageFlags flags) { |
165 | 134 |
166 { | 135 { |
167 base::AutoLock lock(signal_lock_); | 136 base::AutoLock lock(signal_lock_); |
168 if (port_closed_ || in_transit_) | 137 if (port_closed_ || in_transit_) |
169 return MOJO_RESULT_INVALID_ARGUMENT; | 138 return MOJO_RESULT_INVALID_ARGUMENT; |
170 } | 139 } |
171 | 140 |
172 // A structure for retaining information about every Dispatcher we're about | 141 size_t num_bytes = message->num_bytes(); |
173 // to send. This information is collected by calling StartSerialize() on | 142 int rv = node_controller_->SendMessage(port_, message->message()); |
174 // each dispatcher in sequence. | |
175 struct DispatcherInfo { | |
176 uint32_t num_bytes; | |
177 uint32_t num_ports; | |
178 uint32_t num_handles; | |
179 }; | |
180 | 143 |
181 // This is only the base header size. It will grow as we accumulate the | 144 DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
182 // size of serialized state for each dispatcher. | 145 << " [port=" << port_.name() << "; rv=" << rv |
183 size_t header_size = sizeof(MessageHeader) + | 146 << "; num_bytes=" << num_bytes << "]"; |
184 num_dispatchers * sizeof(DispatcherHeader); | |
185 | 147 |
186 size_t num_ports = 0; | 148 if (rv != ports::OK) { |
187 size_t num_handles = 0; | 149 if (rv == ports::ERROR_PORT_UNKNOWN || |
150 rv == ports::ERROR_PORT_STATE_UNEXPECTED || | |
151 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { | |
152 return MOJO_RESULT_INVALID_ARGUMENT; | |
153 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { | |
154 base::AutoLock lock(signal_lock_); | |
155 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | |
156 return MOJO_RESULT_FAILED_PRECONDITION; | |
157 } | |
188 | 158 |
189 std::vector<DispatcherInfo> dispatcher_info(num_dispatchers); | 159 NOTREACHED(); |
190 for (size_t i = 0; i < num_dispatchers; ++i) { | 160 return MOJO_RESULT_UNKNOWN; |
191 Dispatcher* d = dispatchers[i].dispatcher.get(); | |
192 d->StartSerialize(&dispatcher_info[i].num_bytes, | |
193 &dispatcher_info[i].num_ports, | |
194 &dispatcher_info[i].num_handles); | |
195 header_size += dispatcher_info[i].num_bytes; | |
196 num_ports += dispatcher_info[i].num_ports; | |
197 num_handles += dispatcher_info[i].num_handles; | |
198 } | 161 } |
199 | 162 |
200 // We now have enough information to fully allocate the message storage. | 163 return MOJO_RESULT_OK; |
201 scoped_ptr<PortsMessage> message = PortsMessage::NewUserMessage( | |
202 header_size + num_bytes, num_ports, num_handles); | |
203 DCHECK(message); | |
204 | |
205 // Populate the message header with information about serialized dispatchers. | |
206 // | |
207 // The front of the message is always a MessageHeader followed by a | |
208 // DispatcherHeader for each dispatcher to be sent. | |
209 MessageHeader* header = | |
210 static_cast<MessageHeader*>(message->mutable_payload_bytes()); | |
211 DispatcherHeader* dispatcher_headers = | |
212 reinterpret_cast<DispatcherHeader*>(header + 1); | |
213 | |
214 // Serialized dispatcher state immediately follows the series of | |
215 // DispatcherHeaders. | |
216 char* dispatcher_data = | |
217 reinterpret_cast<char*>(dispatcher_headers + num_dispatchers); | |
218 | |
219 header->num_dispatchers = num_dispatchers; | |
220 | |
221 // |header_size| is the total number of bytes preceding the message payload, | |
222 // including all dispatcher headers and serialized dispatcher state. | |
223 DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max()); | |
224 header->header_size = static_cast<uint32_t>(header_size); | |
225 | |
226 bool cancel_transit = false; | |
227 if (num_dispatchers > 0) { | |
228 ScopedPlatformHandleVectorPtr handles( | |
229 new PlatformHandleVector(num_handles)); | |
230 size_t port_index = 0; | |
231 size_t handle_index = 0; | |
232 for (size_t i = 0; i < num_dispatchers; ++i) { | |
233 Dispatcher* d = dispatchers[i].dispatcher.get(); | |
234 DispatcherHeader* dh = &dispatcher_headers[i]; | |
235 const DispatcherInfo& info = dispatcher_info[i]; | |
236 | |
237 // Fill in the header for this dispatcher. | |
238 dh->type = static_cast<int32_t>(d->GetType()); | |
239 dh->num_bytes = info.num_bytes; | |
240 dh->num_ports = info.num_ports; | |
241 dh->num_platform_handles = info.num_handles; | |
242 | |
243 // Fill in serialized state, ports, and platform handles. We'll cancel | |
244 // the send if the dispatcher implementation rejects for some reason. | |
245 if (!d->EndSerialize(static_cast<void*>(dispatcher_data), | |
246 message->mutable_ports() + port_index, | |
247 handles->data() + handle_index)) { | |
248 cancel_transit = true; | |
249 break; | |
250 } | |
251 | |
252 dispatcher_data += info.num_bytes; | |
253 port_index += info.num_ports; | |
254 handle_index += info.num_handles; | |
255 } | |
256 | |
257 if (!cancel_transit) { | |
258 // Take ownership of all the handles and move them into message storage. | |
259 message->SetHandles(std::move(handles)); | |
260 } else { | |
261 // Release any platform handles we've accumulated. Their dispatchers | |
262 // retain ownership when transit is canceled, so these are not actually | |
263 // leaking. | |
264 handles->clear(); | |
265 } | |
266 } | |
267 | |
268 MojoResult result = MOJO_RESULT_OK; | |
269 if (!cancel_transit) { | |
270 // Copy the message body. | |
271 void* message_body = static_cast<void*>( | |
272 static_cast<char*>(message->mutable_payload_bytes()) + header_size); | |
273 memcpy(message_body, bytes, num_bytes); | |
274 | |
275 int rv = node_controller_->SendMessage(port_, &message); | |
276 | |
277 DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ | |
278 << " [port=" << port_.name() << "; rv=" << rv | |
279 << "; num_bytes=" << num_bytes << "]"; | |
280 | |
281 if (rv != ports::OK) { | |
282 if (rv == ports::ERROR_PORT_UNKNOWN || | |
283 rv == ports::ERROR_PORT_STATE_UNEXPECTED || | |
284 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { | |
285 result = MOJO_RESULT_INVALID_ARGUMENT; | |
286 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { | |
287 base::AutoLock lock(signal_lock_); | |
288 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | |
289 result = MOJO_RESULT_FAILED_PRECONDITION; | |
290 } else { | |
291 NOTREACHED(); | |
292 result = MOJO_RESULT_UNKNOWN; | |
293 } | |
294 cancel_transit = true; | |
295 } else { | |
296 DCHECK(!message); | |
297 } | |
298 } | |
299 | |
300 if (cancel_transit) { | |
301 // We ended up not sending the message. Release all the platform handles. | |
302 // Their dipatchers retain ownership when transit is canceled, so these are | |
303 // not actually leaking. | |
304 DCHECK(message); | |
305 Channel::MessagePtr m = message->TakeChannelMessage(); | |
306 ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); | |
307 if (handles) | |
308 handles->clear(); | |
309 } | |
310 | |
311 return result; | |
312 } | 164 } |
313 | 165 |
314 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, | 166 MojoResult MessagePipeDispatcher::ReadMessage( |
315 uint32_t* num_bytes, | 167 std::unique_ptr<MessageForTransit>* message, |
316 MojoHandle* handles, | 168 uint32_t* num_bytes, |
317 uint32_t* num_handles, | 169 MojoHandle* handles, |
318 MojoReadMessageFlags flags) { | 170 uint32_t* num_handles, |
171 MojoReadMessageFlags flags, | |
172 bool ignore_num_bytes) { | |
173 | |
319 { | 174 { |
320 base::AutoLock lock(signal_lock_); | 175 base::AutoLock lock(signal_lock_); |
321 // We can't read from a port that's closed or in transit! | 176 // We can't read from a port that's closed or in transit! |
322 if (port_closed_ || in_transit_) | 177 if (port_closed_ || in_transit_) |
323 return MOJO_RESULT_INVALID_ARGUMENT; | 178 return MOJO_RESULT_INVALID_ARGUMENT; |
324 } | 179 } |
325 | 180 |
326 bool no_space = false; | 181 bool no_space = false; |
327 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; | 182 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; |
328 | 183 |
329 // Ensure the provided buffers are large enough to hold the next message. | 184 // Grab a message if the provided handles buffer is large enough. If the input |
330 // GetMessageIf provides an atomic way to test the next message without | 185 // |num_bytes| is provided and |ignore_num_bytes| is false, we also ensure |
331 // committing to removing it from the port's underlying message queue until | 186 // that it specifies a size at least as large as the next available payload. |
332 // we are sure we can consume it. | |
333 | 187 |
334 ports::ScopedMessage ports_message; | 188 ports::ScopedMessage ports_message; |
335 int rv = node_controller_->node()->GetMessageIf( | 189 int rv = node_controller_->node()->GetMessageIf( |
336 port_, | 190 port_, |
337 [num_bytes, num_handles, &no_space, &may_discard]( | 191 [num_handles, num_bytes, &no_space, &may_discard, &ignore_num_bytes]( |
Anand Mistry (off Chromium)
2016/04/19 12:57:35
why did you swap bytes and handles? And why is ign
Ken Rockot(use gerrit already)
2016/04/19 18:11:25
Didn't notice I had done the swap - probably from
| |
338 const ports::Message& next_message) { | 192 const ports::Message& next_message) { |
339 const PortsMessage& message = | 193 const PortsMessage& message = |
340 static_cast<const PortsMessage&>(next_message); | 194 static_cast<const PortsMessage&>(next_message); |
341 DCHECK_GE(message.num_payload_bytes(), sizeof(MessageHeader)); | 195 DCHECK_GE(message.num_payload_bytes(), |
196 sizeof(MessageForTransit::MessageHeader)); | |
Anand Mistry (off Chromium)
2016/04/19 12:57:36
You could stick a "using MessageForTransit::Messag
Ken Rockot(use gerrit already)
2016/04/19 18:11:25
Done
| |
342 | 197 |
343 const MessageHeader* header = | 198 const MessageForTransit::MessageHeader* header = |
344 static_cast<const MessageHeader*>(message.payload_bytes()); | 199 static_cast<const MessageForTransit::MessageHeader*>( |
200 message.payload_bytes()); | |
345 DCHECK_LE(header->header_size, message.num_payload_bytes()); | 201 DCHECK_LE(header->header_size, message.num_payload_bytes()); |
346 | 202 |
347 uint32_t bytes_to_read = 0; | 203 uint32_t bytes_to_read = 0; |
348 uint32_t bytes_available = | 204 uint32_t bytes_available = |
349 static_cast<uint32_t>(message.num_payload_bytes()) - | 205 static_cast<uint32_t>(message.num_payload_bytes()) - |
350 header->header_size; | 206 header->header_size; |
351 if (num_bytes) { | 207 if (num_bytes) { |
352 bytes_to_read = std::min(*num_bytes, bytes_available); | 208 bytes_to_read = std::min(*num_bytes, bytes_available); |
353 *num_bytes = bytes_available; | 209 *num_bytes = bytes_available; |
354 } | 210 } |
355 | 211 |
356 uint32_t handles_to_read = 0; | 212 uint32_t handles_to_read = 0; |
357 uint32_t handles_available = header->num_dispatchers; | 213 uint32_t handles_available = header->num_dispatchers; |
358 if (num_handles) { | 214 if (num_handles) { |
359 handles_to_read = std::min(*num_handles, handles_available); | 215 handles_to_read = std::min(*num_handles, handles_available); |
360 *num_handles = handles_available; | 216 *num_handles = handles_available; |
361 } | 217 } |
362 | 218 |
363 if (bytes_to_read < bytes_available || | 219 if (handles_to_read < handles_available || |
364 handles_to_read < handles_available) { | 220 (!ignore_num_bytes && bytes_to_read < bytes_available)) { |
365 no_space = true; | 221 no_space = true; |
366 return may_discard; | 222 return may_discard; |
367 } | 223 } |
368 | 224 |
369 return true; | 225 return true; |
370 }, | 226 }, |
371 &ports_message); | 227 &ports_message); |
372 | 228 |
373 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { | 229 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { |
374 if (rv == ports::ERROR_PORT_UNKNOWN || | 230 if (rv == ports::ERROR_PORT_UNKNOWN || |
375 rv == ports::ERROR_PORT_STATE_UNEXPECTED) | 231 rv == ports::ERROR_PORT_STATE_UNEXPECTED) |
376 return MOJO_RESULT_INVALID_ARGUMENT; | 232 return MOJO_RESULT_INVALID_ARGUMENT; |
377 | 233 |
378 NOTREACHED(); | 234 NOTREACHED(); |
379 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? | 235 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? |
380 } | 236 } |
381 | 237 |
382 if (no_space) { | 238 if (no_space) { |
383 // Either |*num_bytes| or |*num_handles| wasn't sufficient to hold this | 239 // |*num_handles| wasn't sufficient to hold this message's data. The message |
384 // message's data. The message will still be in queue unless | 240 // will still be in queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
385 // MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. | |
386 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 241 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
387 } | 242 } |
388 | 243 |
389 if (!ports_message) { | 244 if (!ports_message) { |
390 // No message was available in queue. | 245 // No message was available in queue. |
391 | 246 |
392 if (rv == ports::OK) | 247 if (rv == ports::OK) |
393 return MOJO_RESULT_SHOULD_WAIT; | 248 return MOJO_RESULT_SHOULD_WAIT; |
394 | 249 |
395 // Peer is closed and there are no more messages to read. | 250 // Peer is closed and there are no more messages to read. |
396 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); | 251 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); |
397 base::AutoLock lock(signal_lock_); | 252 base::AutoLock lock(signal_lock_); |
398 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 253 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
399 return MOJO_RESULT_FAILED_PRECONDITION; | 254 return MOJO_RESULT_FAILED_PRECONDITION; |
400 } | 255 } |
401 | 256 |
402 // Alright! We have a message and the caller has provided sufficient storage | 257 // Alright! We have a message and the caller has provided sufficient storage |
403 // in which to receive it. | 258 // in which to receive it. |
404 | 259 |
405 scoped_ptr<PortsMessage> message( | 260 scoped_ptr<PortsMessage> msg( |
406 static_cast<PortsMessage*>(ports_message.release())); | 261 static_cast<PortsMessage*>(ports_message.release())); |
407 | 262 |
408 const MessageHeader* header = | 263 const MessageForTransit::MessageHeader* header = |
409 static_cast<const MessageHeader*>(message->payload_bytes()); | 264 static_cast<const MessageForTransit::MessageHeader*>( |
410 const DispatcherHeader* dispatcher_headers = | 265 msg->payload_bytes()); |
411 reinterpret_cast<const DispatcherHeader*>(header + 1); | 266 const MessageForTransit::DispatcherHeader* dispatcher_headers = |
267 reinterpret_cast<const MessageForTransit::DispatcherHeader*>(header + 1); | |
412 | 268 |
413 const char* dispatcher_data = reinterpret_cast<const char*>( | 269 const char* dispatcher_data = reinterpret_cast<const char*>( |
414 dispatcher_headers + header->num_dispatchers); | 270 dispatcher_headers + header->num_dispatchers); |
415 | 271 |
416 // Deserialize dispatchers. | 272 // Deserialize dispatchers. |
417 if (header->num_dispatchers > 0) { | 273 if (header->num_dispatchers > 0) { |
418 CHECK(handles); | 274 CHECK(handles); |
419 std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers); | 275 std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers); |
420 size_t data_payload_index = sizeof(MessageHeader) + | 276 size_t data_payload_index = sizeof(MessageForTransit::MessageHeader) + |
421 header->num_dispatchers * sizeof(DispatcherHeader); | 277 header->num_dispatchers * sizeof(MessageForTransit::DispatcherHeader); |
422 size_t port_index = 0; | 278 size_t port_index = 0; |
423 size_t platform_handle_index = 0; | 279 size_t platform_handle_index = 0; |
424 for (size_t i = 0; i < header->num_dispatchers; ++i) { | 280 for (size_t i = 0; i < header->num_dispatchers; ++i) { |
425 const DispatcherHeader& dh = dispatcher_headers[i]; | 281 const MessageForTransit::DispatcherHeader& dh = dispatcher_headers[i]; |
426 Type type = static_cast<Type>(dh.type); | 282 Type type = static_cast<Type>(dh.type); |
427 | 283 |
428 DCHECK_GE(message->num_payload_bytes(), | 284 DCHECK_GE(msg->num_payload_bytes(), |
429 data_payload_index + dh.num_bytes); | 285 data_payload_index + dh.num_bytes); |
430 DCHECK_GE(message->num_ports(), | 286 DCHECK_GE(msg->num_ports(), |
431 port_index + dh.num_ports); | 287 port_index + dh.num_ports); |
432 DCHECK_GE(message->num_handles(), | 288 DCHECK_GE(msg->num_handles(), |
433 platform_handle_index + dh.num_platform_handles); | 289 platform_handle_index + dh.num_platform_handles); |
434 | 290 |
435 PlatformHandle* out_handles = | 291 PlatformHandle* out_handles = |
436 message->num_handles() ? message->handles() + platform_handle_index | 292 msg->num_handles() ? msg->handles() + platform_handle_index : nullptr; |
437 : nullptr; | |
438 dispatchers[i].dispatcher = Dispatcher::Deserialize( | 293 dispatchers[i].dispatcher = Dispatcher::Deserialize( |
439 type, dispatcher_data, dh.num_bytes, message->ports() + port_index, | 294 type, dispatcher_data, dh.num_bytes, msg->ports() + port_index, |
440 dh.num_ports, out_handles, dh.num_platform_handles); | 295 dh.num_ports, out_handles, dh.num_platform_handles); |
441 if (!dispatchers[i].dispatcher) | 296 if (!dispatchers[i].dispatcher) |
442 return MOJO_RESULT_UNKNOWN; | 297 return MOJO_RESULT_UNKNOWN; |
443 | 298 |
444 dispatcher_data += dh.num_bytes; | 299 dispatcher_data += dh.num_bytes; |
445 data_payload_index += dh.num_bytes; | 300 data_payload_index += dh.num_bytes; |
446 port_index += dh.num_ports; | 301 port_index += dh.num_ports; |
447 platform_handle_index += dh.num_platform_handles; | 302 platform_handle_index += dh.num_platform_handles; |
448 } | 303 } |
449 | 304 |
450 if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers, | 305 if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers, |
451 handles)) | 306 handles)) |
452 return MOJO_RESULT_UNKNOWN; | 307 return MOJO_RESULT_UNKNOWN; |
453 } | 308 } |
454 | 309 |
455 // Copy message bytes. | 310 CHECK(msg); |
456 DCHECK_GE(message->num_payload_bytes(), header->header_size); | 311 message->reset(MessageForTransit::WrapPortsMessage(std::move(msg))); |
457 const char* payload = reinterpret_cast<const char*>(message->payload_bytes()); | |
458 memcpy(bytes, payload + header->header_size, | |
459 message->num_payload_bytes() - header->header_size); | |
460 | |
461 return MOJO_RESULT_OK; | 312 return MOJO_RESULT_OK; |
462 } | 313 } |
463 | 314 |
464 HandleSignalsState | 315 HandleSignalsState |
465 MessagePipeDispatcher::GetHandleSignalsState() const { | 316 MessagePipeDispatcher::GetHandleSignalsState() const { |
466 base::AutoLock lock(signal_lock_); | 317 base::AutoLock lock(signal_lock_); |
467 return GetHandleSignalsStateNoLock(); | 318 return GetHandleSignalsStateNoLock(); |
468 } | 319 } |
469 | 320 |
470 MojoResult MessagePipeDispatcher::AddAwakable( | 321 MojoResult MessagePipeDispatcher::AddAwakable( |
(...skipping 197 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
668 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ | 519 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ |
669 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 520 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
670 } | 521 } |
671 #endif | 522 #endif |
672 | 523 |
673 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 524 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
674 } | 525 } |
675 | 526 |
676 } // namespace edk | 527 } // namespace edk |
677 } // namespace mojo | 528 } // namespace mojo |
OLD | NEW |