| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 6 | 6 |
| 7 #include <limits> | 7 #include <limits> |
| 8 | 8 |
| 9 #include "base/macros.h" | 9 #include "base/macros.h" |
| 10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
| 11 #include "base/memory/scoped_ptr.h" | 11 #include "base/memory/scoped_ptr.h" |
| 12 #include "mojo/edk/embedder/embedder_internal.h" | 12 #include "mojo/edk/embedder/embedder_internal.h" |
| 13 #include "mojo/edk/system/core.h" | 13 #include "mojo/edk/system/core.h" |
| 14 #include "mojo/edk/system/message_for_transit.h" |
| 14 #include "mojo/edk/system/node_controller.h" | 15 #include "mojo/edk/system/node_controller.h" |
| 15 #include "mojo/edk/system/ports_message.h" | 16 #include "mojo/edk/system/ports_message.h" |
| 16 #include "mojo/edk/system/request_context.h" | 17 #include "mojo/edk/system/request_context.h" |
| 17 | 18 |
| 18 namespace mojo { | 19 namespace mojo { |
| 19 namespace edk { | 20 namespace edk { |
| 20 | 21 |
| 21 namespace { | 22 namespace { |
| 22 | 23 |
| 24 using DispatcherHeader = MessageForTransit::DispatcherHeader; |
| 25 using MessageHeader = MessageForTransit::MessageHeader; |
| 26 |
| 23 #pragma pack(push, 1) | 27 #pragma pack(push, 1) |
| 24 | 28 |
| 25 // Header attached to every message sent over a message pipe. | |
| 26 struct MessageHeader { | |
| 27 // The number of serialized dispatchers included in this header. | |
| 28 uint32_t num_dispatchers; | |
| 29 | |
| 30 // Total size of the header, including serialized dispatcher data. | |
| 31 uint32_t header_size; | |
| 32 }; | |
| 33 | |
| 34 static_assert(sizeof(MessageHeader) % 8 == 0, "Invalid MessageHeader size."); | |
| 35 | |
| 36 // Header for each dispatcher, immediately following the message header. | |
| 37 struct DispatcherHeader { | |
| 38 // The type of the dispatcher, correpsonding to the Dispatcher::Type enum. | |
| 39 int32_t type; | |
| 40 | |
| 41 // The size of the serialized dispatcher, not including this header. | |
| 42 uint32_t num_bytes; | |
| 43 | |
| 44 // The number of ports needed to deserialize this dispatcher. | |
| 45 uint32_t num_ports; | |
| 46 | |
| 47 // The number of platform handles needed to deserialize this dispatcher. | |
| 48 uint32_t num_platform_handles; | |
| 49 }; | |
| 50 | |
| 51 static_assert(sizeof(DispatcherHeader) % 8 == 0, | |
| 52 "Invalid DispatcherHeader size."); | |
| 53 | |
| 54 struct SerializedState { | 29 struct SerializedState { |
| 55 uint64_t pipe_id; | 30 uint64_t pipe_id; |
| 56 int8_t endpoint; | 31 int8_t endpoint; |
| 57 char padding[7]; | 32 char padding[7]; |
| 58 }; | 33 }; |
| 59 | 34 |
| 60 static_assert(sizeof(SerializedState) % 8 == 0, | 35 static_assert(sizeof(SerializedState) % 8 == 0, |
| 61 "Invalid SerializedState size."); | 36 "Invalid SerializedState size."); |
| 62 | 37 |
| 63 #pragma pack(pop) | 38 #pragma pack(pop) |
| (...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 150 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { | 125 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { |
| 151 base::AutoLock lock(signal_lock_); | 126 base::AutoLock lock(signal_lock_); |
| 152 | 127 |
| 153 if (port_closed_ || in_transit_) | 128 if (port_closed_ || in_transit_) |
| 154 return MOJO_RESULT_INVALID_ARGUMENT; | 129 return MOJO_RESULT_INVALID_ARGUMENT; |
| 155 | 130 |
| 156 return awakables_.RemoveWatcher(context); | 131 return awakables_.RemoveWatcher(context); |
| 157 } | 132 } |
| 158 | 133 |
| 159 MojoResult MessagePipeDispatcher::WriteMessage( | 134 MojoResult MessagePipeDispatcher::WriteMessage( |
| 160 const void* bytes, | 135 std::unique_ptr<MessageForTransit> message, |
| 161 uint32_t num_bytes, | |
| 162 const DispatcherInTransit* dispatchers, | |
| 163 uint32_t num_dispatchers, | |
| 164 MojoWriteMessageFlags flags) { | 136 MojoWriteMessageFlags flags) { |
| 165 | 137 |
| 166 | 138 |
| 167 if (port_closed_ || in_transit_) | 139 if (port_closed_ || in_transit_) |
| 168 return MOJO_RESULT_INVALID_ARGUMENT; | 140 return MOJO_RESULT_INVALID_ARGUMENT; |
| 169 | 141 |
| 170 // A structure for retaining information about every Dispatcher we're about | 142 size_t num_bytes = message->num_bytes(); |
| 171 // to send. This information is collected by calling StartSerialize() on | 143 std::unique_ptr<PortsMessage> msg = message->TakePortsMessage(); |
| 172 // each dispatcher in sequence. | 144 int rv = node_controller_->SendMessage(port_, &msg); |
| 173 struct DispatcherInfo { | |
| 174 uint32_t num_bytes; | |
| 175 uint32_t num_ports; | |
| 176 uint32_t num_handles; | |
| 177 }; | |
| 178 | 145 |
| 179 // This is only the base header size. It will grow as we accumulate the | 146 DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
| 180 // size of serialized state for each dispatcher. | 147 << " [port=" << port_.name() << "; rv=" << rv |
| 181 size_t header_size = sizeof(MessageHeader) + | 148 << "; num_bytes=" << num_bytes << "]"; |
| 182 num_dispatchers * sizeof(DispatcherHeader); | |
| 183 | 149 |
| 184 size_t num_ports = 0; | 150 if (rv != ports::OK) { |
| 185 size_t num_handles = 0; | 151 if (rv == ports::ERROR_PORT_UNKNOWN || |
| 152 rv == ports::ERROR_PORT_STATE_UNEXPECTED || |
| 153 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { |
| 154 return MOJO_RESULT_INVALID_ARGUMENT; |
| 155 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { |
| 156 base::AutoLock lock(signal_lock_); |
| 157 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 158 return MOJO_RESULT_FAILED_PRECONDITION; |
| 159 } |
| 186 | 160 |
| 187 std::vector<DispatcherInfo> dispatcher_info(num_dispatchers); | 161 NOTREACHED(); |
| 188 for (size_t i = 0; i < num_dispatchers; ++i) { | 162 return MOJO_RESULT_UNKNOWN; |
| 189 Dispatcher* d = dispatchers[i].dispatcher.get(); | |
| 190 d->StartSerialize(&dispatcher_info[i].num_bytes, | |
| 191 &dispatcher_info[i].num_ports, | |
| 192 &dispatcher_info[i].num_handles); | |
| 193 header_size += dispatcher_info[i].num_bytes; | |
| 194 num_ports += dispatcher_info[i].num_ports; | |
| 195 num_handles += dispatcher_info[i].num_handles; | |
| 196 } | 163 } |
| 197 | 164 |
| 198 // We now have enough information to fully allocate the message storage. | 165 return MOJO_RESULT_OK; |
| 199 scoped_ptr<PortsMessage> message = PortsMessage::NewUserMessage( | |
| 200 header_size + num_bytes, num_ports, num_handles); | |
| 201 DCHECK(message); | |
| 202 | |
| 203 // Populate the message header with information about serialized dispatchers. | |
| 204 // | |
| 205 // The front of the message is always a MessageHeader followed by a | |
| 206 // DispatcherHeader for each dispatcher to be sent. | |
| 207 MessageHeader* header = | |
| 208 static_cast<MessageHeader*>(message->mutable_payload_bytes()); | |
| 209 DispatcherHeader* dispatcher_headers = | |
| 210 reinterpret_cast<DispatcherHeader*>(header + 1); | |
| 211 | |
| 212 // Serialized dispatcher state immediately follows the series of | |
| 213 // DispatcherHeaders. | |
| 214 char* dispatcher_data = | |
| 215 reinterpret_cast<char*>(dispatcher_headers + num_dispatchers); | |
| 216 | |
| 217 header->num_dispatchers = num_dispatchers; | |
| 218 | |
| 219 // |header_size| is the total number of bytes preceding the message payload, | |
| 220 // including all dispatcher headers and serialized dispatcher state. | |
| 221 DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max()); | |
| 222 header->header_size = static_cast<uint32_t>(header_size); | |
| 223 | |
| 224 bool cancel_transit = false; | |
| 225 if (num_dispatchers > 0) { | |
| 226 ScopedPlatformHandleVectorPtr handles( | |
| 227 new PlatformHandleVector(num_handles)); | |
| 228 size_t port_index = 0; | |
| 229 size_t handle_index = 0; | |
| 230 for (size_t i = 0; i < num_dispatchers; ++i) { | |
| 231 Dispatcher* d = dispatchers[i].dispatcher.get(); | |
| 232 DispatcherHeader* dh = &dispatcher_headers[i]; | |
| 233 const DispatcherInfo& info = dispatcher_info[i]; | |
| 234 | |
| 235 // Fill in the header for this dispatcher. | |
| 236 dh->type = static_cast<int32_t>(d->GetType()); | |
| 237 dh->num_bytes = info.num_bytes; | |
| 238 dh->num_ports = info.num_ports; | |
| 239 dh->num_platform_handles = info.num_handles; | |
| 240 | |
| 241 // Fill in serialized state, ports, and platform handles. We'll cancel | |
| 242 // the send if the dispatcher implementation rejects for some reason. | |
| 243 if (!d->EndSerialize(static_cast<void*>(dispatcher_data), | |
| 244 message->mutable_ports() + port_index, | |
| 245 handles->data() + handle_index)) { | |
| 246 cancel_transit = true; | |
| 247 break; | |
| 248 } | |
| 249 | |
| 250 dispatcher_data += info.num_bytes; | |
| 251 port_index += info.num_ports; | |
| 252 handle_index += info.num_handles; | |
| 253 } | |
| 254 | |
| 255 if (!cancel_transit) { | |
| 256 // Take ownership of all the handles and move them into message storage. | |
| 257 message->SetHandles(std::move(handles)); | |
| 258 } else { | |
| 259 // Release any platform handles we've accumulated. Their dispatchers | |
| 260 // retain ownership when transit is canceled, so these are not actually | |
| 261 // leaking. | |
| 262 handles->clear(); | |
| 263 } | |
| 264 } | |
| 265 | |
| 266 MojoResult result = MOJO_RESULT_OK; | |
| 267 if (!cancel_transit) { | |
| 268 // Copy the message body. | |
| 269 void* message_body = static_cast<void*>( | |
| 270 static_cast<char*>(message->mutable_payload_bytes()) + header_size); | |
| 271 memcpy(message_body, bytes, num_bytes); | |
| 272 | |
| 273 int rv = node_controller_->SendMessage(port_, &message); | |
| 274 | |
| 275 DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ | |
| 276 << " [port=" << port_.name() << "; rv=" << rv | |
| 277 << "; num_bytes=" << num_bytes << "]"; | |
| 278 | |
| 279 if (rv != ports::OK) { | |
| 280 if (rv == ports::ERROR_PORT_UNKNOWN || | |
| 281 rv == ports::ERROR_PORT_STATE_UNEXPECTED || | |
| 282 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { | |
| 283 result = MOJO_RESULT_INVALID_ARGUMENT; | |
| 284 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { | |
| 285 base::AutoLock lock(signal_lock_); | |
| 286 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | |
| 287 result = MOJO_RESULT_FAILED_PRECONDITION; | |
| 288 } else { | |
| 289 NOTREACHED(); | |
| 290 result = MOJO_RESULT_UNKNOWN; | |
| 291 } | |
| 292 cancel_transit = true; | |
| 293 } else { | |
| 294 DCHECK(!message); | |
| 295 } | |
| 296 } | |
| 297 | |
| 298 if (cancel_transit) { | |
| 299 // We ended up not sending the message. Release all the platform handles. | |
| 300 // Their dipatchers retain ownership when transit is canceled, so these are | |
| 301 // not actually leaking. | |
| 302 DCHECK(message); | |
| 303 Channel::MessagePtr m = message->TakeChannelMessage(); | |
| 304 ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); | |
| 305 if (handles) | |
| 306 handles->clear(); | |
| 307 } | |
| 308 | |
| 309 return result; | |
| 310 } | 166 } |
| 311 | 167 |
| 312 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, | 168 MojoResult MessagePipeDispatcher::ReadMessage( |
| 313 uint32_t* num_bytes, | 169 std::unique_ptr<MessageForTransit>* message, |
| 314 MojoHandle* handles, | 170 uint32_t* num_bytes, |
| 315 uint32_t* num_handles, | 171 MojoHandle* handles, |
| 316 MojoReadMessageFlags flags) { | 172 uint32_t* num_handles, |
| 173 MojoReadMessageFlags flags, |
| 174 bool read_any_size) { |
| 317 // We can't read from a port that's closed or in transit! | 175 // We can't read from a port that's closed or in transit! |
| 318 if (port_closed_ || in_transit_) | 176 if (port_closed_ || in_transit_) |
| 319 return MOJO_RESULT_INVALID_ARGUMENT; | 177 return MOJO_RESULT_INVALID_ARGUMENT; |
| 320 | 178 |
| 321 bool no_space = false; | 179 bool no_space = false; |
| 322 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; | 180 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; |
| 323 | 181 |
| 324 // Ensure the provided buffers are large enough to hold the next message. | 182 // Grab a message if the provided handles buffer is large enough. If the input |
| 325 // GetMessageIf provides an atomic way to test the next message without | 183 // |num_bytes| is provided and |read_any_size| is false, we also ensure |
| 326 // committing to removing it from the port's underlying message queue until | 184 // that it specifies a size at least as large as the next available payload. |
| 327 // we are sure we can consume it. | 185 // |
| 186 // If |read_any_size| is true, the input value of |*num_bytes| is ignored. |
| 187 // This flag exists to support both new and old API behavior. |
| 328 | 188 |
| 329 ports::ScopedMessage ports_message; | 189 ports::ScopedMessage ports_message; |
| 330 int rv = node_controller_->node()->GetMessageIf( | 190 int rv = node_controller_->node()->GetMessageIf( |
| 331 port_, | 191 port_, |
| 332 [num_bytes, num_handles, &no_space, &may_discard]( | 192 [read_any_size, num_bytes, num_handles, &no_space, &may_discard]( |
| 333 const ports::Message& next_message) { | 193 const ports::Message& next_message) { |
| 334 const PortsMessage& message = | 194 const PortsMessage& message = |
| 335 static_cast<const PortsMessage&>(next_message); | 195 static_cast<const PortsMessage&>(next_message); |
| 336 DCHECK_GE(message.num_payload_bytes(), sizeof(MessageHeader)); | 196 DCHECK_GE(message.num_payload_bytes(), sizeof(MessageHeader)); |
| 337 | 197 |
| 338 const MessageHeader* header = | 198 const MessageHeader* header = |
| 339 static_cast<const MessageHeader*>(message.payload_bytes()); | 199 static_cast<const MessageHeader*>(message.payload_bytes()); |
| 340 DCHECK_LE(header->header_size, message.num_payload_bytes()); | 200 DCHECK_LE(header->header_size, message.num_payload_bytes()); |
| 341 | 201 |
| 342 uint32_t bytes_to_read = 0; | 202 uint32_t bytes_to_read = 0; |
| 343 uint32_t bytes_available = | 203 uint32_t bytes_available = |
| 344 static_cast<uint32_t>(message.num_payload_bytes()) - | 204 static_cast<uint32_t>(message.num_payload_bytes()) - |
| 345 header->header_size; | 205 header->header_size; |
| 346 if (num_bytes) { | 206 if (num_bytes) { |
| 347 bytes_to_read = std::min(*num_bytes, bytes_available); | 207 bytes_to_read = std::min(*num_bytes, bytes_available); |
| 348 *num_bytes = bytes_available; | 208 *num_bytes = bytes_available; |
| 349 } | 209 } |
| 350 | 210 |
| 351 uint32_t handles_to_read = 0; | 211 uint32_t handles_to_read = 0; |
| 352 uint32_t handles_available = header->num_dispatchers; | 212 uint32_t handles_available = header->num_dispatchers; |
| 353 if (num_handles) { | 213 if (num_handles) { |
| 354 handles_to_read = std::min(*num_handles, handles_available); | 214 handles_to_read = std::min(*num_handles, handles_available); |
| 355 *num_handles = handles_available; | 215 *num_handles = handles_available; |
| 356 } | 216 } |
| 357 | 217 |
| 358 if (bytes_to_read < bytes_available || | 218 if (handles_to_read < handles_available || |
| 359 handles_to_read < handles_available) { | 219 (!read_any_size && bytes_to_read < bytes_available)) { |
| 360 no_space = true; | 220 no_space = true; |
| 361 return may_discard; | 221 return may_discard; |
| 362 } | 222 } |
| 363 | 223 |
| 364 return true; | 224 return true; |
| 365 }, | 225 }, |
| 366 &ports_message); | 226 &ports_message); |
| 367 | 227 |
| 368 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { | 228 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { |
| 369 if (rv == ports::ERROR_PORT_UNKNOWN || | 229 if (rv == ports::ERROR_PORT_UNKNOWN || |
| 370 rv == ports::ERROR_PORT_STATE_UNEXPECTED) | 230 rv == ports::ERROR_PORT_STATE_UNEXPECTED) |
| 371 return MOJO_RESULT_INVALID_ARGUMENT; | 231 return MOJO_RESULT_INVALID_ARGUMENT; |
| 372 | 232 |
| 373 NOTREACHED(); | 233 NOTREACHED(); |
| 374 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? | 234 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? |
| 375 } | 235 } |
| 376 | 236 |
| 377 if (no_space) { | 237 if (no_space) { |
| 378 // Either |*num_bytes| or |*num_handles| wasn't sufficient to hold this | 238 // |*num_handles| wasn't sufficient to hold this message's data. The message |
| 379 // message's data. The message will still be in queue unless | 239 // will still be in queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
| 380 // MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. | |
| 381 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 240 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| 382 } | 241 } |
| 383 | 242 |
| 384 if (!ports_message) { | 243 if (!ports_message) { |
| 385 // No message was available in queue. | 244 // No message was available in queue. |
| 386 | 245 |
| 387 if (rv == ports::OK) | 246 if (rv == ports::OK) |
| 388 return MOJO_RESULT_SHOULD_WAIT; | 247 return MOJO_RESULT_SHOULD_WAIT; |
| 389 | 248 |
| 390 // Peer is closed and there are no more messages to read. | 249 // Peer is closed and there are no more messages to read. |
| 391 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); | 250 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); |
| 392 base::AutoLock lock(signal_lock_); | 251 base::AutoLock lock(signal_lock_); |
| 393 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 252 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 394 return MOJO_RESULT_FAILED_PRECONDITION; | 253 return MOJO_RESULT_FAILED_PRECONDITION; |
| 395 } | 254 } |
| 396 | 255 |
| 397 // Alright! We have a message and the caller has provided sufficient storage | 256 // Alright! We have a message and the caller has provided sufficient storage |
| 398 // in which to receive it. | 257 // in which to receive it. |
| 399 | 258 |
| 400 scoped_ptr<PortsMessage> message( | 259 scoped_ptr<PortsMessage> msg( |
| 401 static_cast<PortsMessage*>(ports_message.release())); | 260 static_cast<PortsMessage*>(ports_message.release())); |
| 402 | 261 |
| 403 const MessageHeader* header = | 262 const MessageHeader* header = |
| 404 static_cast<const MessageHeader*>(message->payload_bytes()); | 263 static_cast<const MessageHeader*>( msg->payload_bytes()); |
| 405 const DispatcherHeader* dispatcher_headers = | 264 const DispatcherHeader* dispatcher_headers = |
| 406 reinterpret_cast<const DispatcherHeader*>(header + 1); | 265 reinterpret_cast<const DispatcherHeader*>(header + 1); |
| 407 | 266 |
| 408 const char* dispatcher_data = reinterpret_cast<const char*>( | 267 const char* dispatcher_data = reinterpret_cast<const char*>( |
| 409 dispatcher_headers + header->num_dispatchers); | 268 dispatcher_headers + header->num_dispatchers); |
| 410 | 269 |
| 411 // Deserialize dispatchers. | 270 // Deserialize dispatchers. |
| 412 if (header->num_dispatchers > 0) { | 271 if (header->num_dispatchers > 0) { |
| 413 CHECK(handles); | 272 CHECK(handles); |
| 414 std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers); | 273 std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers); |
| 415 size_t data_payload_index = sizeof(MessageHeader) + | 274 size_t data_payload_index = sizeof(MessageHeader) + |
| 416 header->num_dispatchers * sizeof(DispatcherHeader); | 275 header->num_dispatchers * sizeof(DispatcherHeader); |
| 417 size_t port_index = 0; | 276 size_t port_index = 0; |
| 418 size_t platform_handle_index = 0; | 277 size_t platform_handle_index = 0; |
| 419 for (size_t i = 0; i < header->num_dispatchers; ++i) { | 278 for (size_t i = 0; i < header->num_dispatchers; ++i) { |
| 420 const DispatcherHeader& dh = dispatcher_headers[i]; | 279 const DispatcherHeader& dh = dispatcher_headers[i]; |
| 421 Type type = static_cast<Type>(dh.type); | 280 Type type = static_cast<Type>(dh.type); |
| 422 | 281 |
| 423 DCHECK_GE(message->num_payload_bytes(), | 282 DCHECK_GE(msg->num_payload_bytes(), |
| 424 data_payload_index + dh.num_bytes); | 283 data_payload_index + dh.num_bytes); |
| 425 DCHECK_GE(message->num_ports(), | 284 DCHECK_GE(msg->num_ports(), |
| 426 port_index + dh.num_ports); | 285 port_index + dh.num_ports); |
| 427 DCHECK_GE(message->num_handles(), | 286 DCHECK_GE(msg->num_handles(), |
| 428 platform_handle_index + dh.num_platform_handles); | 287 platform_handle_index + dh.num_platform_handles); |
| 429 | 288 |
| 430 PlatformHandle* out_handles = | 289 PlatformHandle* out_handles = |
| 431 message->num_handles() ? message->handles() + platform_handle_index | 290 msg->num_handles() ? msg->handles() + platform_handle_index : nullptr; |
| 432 : nullptr; | |
| 433 dispatchers[i].dispatcher = Dispatcher::Deserialize( | 291 dispatchers[i].dispatcher = Dispatcher::Deserialize( |
| 434 type, dispatcher_data, dh.num_bytes, message->ports() + port_index, | 292 type, dispatcher_data, dh.num_bytes, msg->ports() + port_index, |
| 435 dh.num_ports, out_handles, dh.num_platform_handles); | 293 dh.num_ports, out_handles, dh.num_platform_handles); |
| 436 if (!dispatchers[i].dispatcher) | 294 if (!dispatchers[i].dispatcher) |
| 437 return MOJO_RESULT_UNKNOWN; | 295 return MOJO_RESULT_UNKNOWN; |
| 438 | 296 |
| 439 dispatcher_data += dh.num_bytes; | 297 dispatcher_data += dh.num_bytes; |
| 440 data_payload_index += dh.num_bytes; | 298 data_payload_index += dh.num_bytes; |
| 441 port_index += dh.num_ports; | 299 port_index += dh.num_ports; |
| 442 platform_handle_index += dh.num_platform_handles; | 300 platform_handle_index += dh.num_platform_handles; |
| 443 } | 301 } |
| 444 | 302 |
| 445 if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers, | 303 if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers, |
| 446 handles)) | 304 handles)) |
| 447 return MOJO_RESULT_UNKNOWN; | 305 return MOJO_RESULT_UNKNOWN; |
| 448 } | 306 } |
| 449 | 307 |
| 450 // Copy message bytes. | 308 CHECK(msg); |
| 451 DCHECK_GE(message->num_payload_bytes(), header->header_size); | 309 message->reset(MessageForTransit::WrapPortsMessage(std::move(msg))); |
| 452 const char* payload = reinterpret_cast<const char*>(message->payload_bytes()); | |
| 453 memcpy(bytes, payload + header->header_size, | |
| 454 message->num_payload_bytes() - header->header_size); | |
| 455 | |
| 456 return MOJO_RESULT_OK; | 310 return MOJO_RESULT_OK; |
| 457 } | 311 } |
| 458 | 312 |
| 459 HandleSignalsState | 313 HandleSignalsState |
| 460 MessagePipeDispatcher::GetHandleSignalsState() const { | 314 MessagePipeDispatcher::GetHandleSignalsState() const { |
| 461 base::AutoLock lock(signal_lock_); | 315 base::AutoLock lock(signal_lock_); |
| 462 return GetHandleSignalsStateNoLock(); | 316 return GetHandleSignalsStateNoLock(); |
| 463 } | 317 } |
| 464 | 318 |
| 465 MojoResult MessagePipeDispatcher::AddAwakable( | 319 MojoResult MessagePipeDispatcher::AddAwakable( |
| (...skipping 197 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 663 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ | 517 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ |
| 664 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 518 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
| 665 } | 519 } |
| 666 #endif | 520 #endif |
| 667 | 521 |
| 668 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 522 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 669 } | 523 } |
| 670 | 524 |
| 671 } // namespace edk | 525 } // namespace edk |
| 672 } // namespace mojo | 526 } // namespace mojo |
| OLD | NEW |