| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe.h" | 5 #include "mojo/edk/system/message_pipe.h" |
| 6 | 6 |
| 7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "mojo/edk/system/channel.h" | 8 #include "mojo/edk/system/channel.h" |
| 9 #include "mojo/edk/system/channel_endpoint.h" | 9 #include "mojo/edk/system/channel_endpoint.h" |
| 10 #include "mojo/edk/system/channel_endpoint_id.h" | 10 #include "mojo/edk/system/channel_endpoint_id.h" |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 132 } | 132 } |
| 133 | 133 |
| 134 // TODO(vtl): Handle flags. | 134 // TODO(vtl): Handle flags. |
| 135 MojoResult MessagePipe::WriteMessage( | 135 MojoResult MessagePipe::WriteMessage( |
| 136 unsigned port, | 136 unsigned port, |
| 137 UserPointer<const void> bytes, | 137 UserPointer<const void> bytes, |
| 138 uint32_t num_bytes, | 138 uint32_t num_bytes, |
| 139 std::vector<DispatcherTransport>* transports, | 139 std::vector<DispatcherTransport>* transports, |
| 140 MojoWriteMessageFlags flags) { | 140 MojoWriteMessageFlags flags) { |
| 141 DCHECK(port == 0 || port == 1); | 141 DCHECK(port == 0 || port == 1); |
| 142 return EnqueueMessage( | 142 |
| 143 base::AutoLock locker(lock_); |
| 144 return EnqueueMessageNoLock( |
| 143 GetPeerPort(port), | 145 GetPeerPort(port), |
| 144 make_scoped_ptr(new MessageInTransit( | 146 make_scoped_ptr(new MessageInTransit( |
| 145 MessageInTransit::kTypeEndpoint, | 147 MessageInTransit::kTypeEndpoint, |
| 146 MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)), | 148 MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)), |
| 147 transports); | 149 transports); |
| 148 } | 150 } |
| 149 | 151 |
| 150 MojoResult MessagePipe::ReadMessage(unsigned port, | 152 MojoResult MessagePipe::ReadMessage(unsigned port, |
| 151 UserPointer<void> bytes, | 153 UserPointer<void> bytes, |
| 152 UserPointer<uint32_t> num_bytes, | 154 UserPointer<uint32_t> num_bytes, |
| (...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 280 // Convert the local endpoint to a proxy endpoint (moving the message queue) | 282 // Convert the local endpoint to a proxy endpoint (moving the message queue) |
| 281 // and attach it to the channel. | 283 // and attach it to the channel. |
| 282 s->receiver_endpoint_id = | 284 s->receiver_endpoint_id = |
| 283 channel->AttachAndRunEndpoint(channel_endpoint, false); | 285 channel->AttachAndRunEndpoint(channel_endpoint, false); |
| 284 DVLOG(2) << "Serializing message pipe (remote ID = " | 286 DVLOG(2) << "Serializing message pipe (remote ID = " |
| 285 << s->receiver_endpoint_id << ")"; | 287 << s->receiver_endpoint_id << ")"; |
| 286 *actual_size = sizeof(SerializedMessagePipe); | 288 *actual_size = sizeof(SerializedMessagePipe); |
| 287 return true; | 289 return true; |
| 288 } | 290 } |
| 289 | 291 |
| 290 void MessagePipe::OnReadMessage(unsigned port, | 292 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { |
| 291 scoped_ptr<MessageInTransit> message) { | 293 base::AutoLock locker(lock_); |
| 294 |
| 295 if (!endpoints_[port]) { |
| 296 // This will happen only on the rare occasion that the call to |
| 297 // |OnReadMessage()| is racing with us calling |
| 298 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, |
| 299 // and the |ChannelEndpoint| can retry (calling the new client's |
| 300 // |OnReadMessage()|). |
| 301 return false; |
| 302 } |
| 303 |
| 292 // This is called when the |ChannelEndpoint| for the | 304 // This is called when the |ChannelEndpoint| for the |
| 293 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). | 305 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). |
| 294 // We need to pass this message on to its peer port (typically a | 306 // We need to pass this message on to its peer port (typically a |
| 295 // |LocalMessagePipeEndpoint|). | 307 // |LocalMessagePipeEndpoint|). |
| 296 MojoResult result = | 308 MojoResult result = EnqueueMessageNoLock(GetPeerPort(port), |
| 297 EnqueueMessage(GetPeerPort(port), message.Pass(), nullptr); | 309 make_scoped_ptr(message), nullptr); |
| 298 DLOG_IF(WARNING, result != MOJO_RESULT_OK) | 310 DLOG_IF(WARNING, result != MOJO_RESULT_OK) |
| 299 << "EnqueueMessage() failed (result = " << result << ")"; | 311 << "EnqueueMessageNoLock() failed (result = " << result << ")"; |
| 312 return true; |
| 300 } | 313 } |
| 301 | 314 |
| 302 void MessagePipe::OnDetachFromChannel(unsigned port) { | 315 void MessagePipe::OnDetachFromChannel(unsigned port) { |
| 303 Close(port); | 316 Close(port); |
| 304 } | 317 } |
| 305 | 318 |
| 306 MessagePipe::MessagePipe() { | 319 MessagePipe::MessagePipe() { |
| 307 } | 320 } |
| 308 | 321 |
| 309 MessagePipe::~MessagePipe() { | 322 MessagePipe::~MessagePipe() { |
| 310 // Owned by the dispatchers. The owning dispatchers should only release us via | 323 // Owned by the dispatchers. The owning dispatchers should only release us via |
| 311 // their |Close()| method, which should inform us of being closed via our | 324 // their |Close()| method, which should inform us of being closed via our |
| 312 // |Close()|. Thus these should already be null. | 325 // |Close()|. Thus these should already be null. |
| 313 DCHECK(!endpoints_[0]); | 326 DCHECK(!endpoints_[0]); |
| 314 DCHECK(!endpoints_[1]); | 327 DCHECK(!endpoints_[1]); |
| 315 } | 328 } |
| 316 | 329 |
| 317 MojoResult MessagePipe::EnqueueMessage( | 330 MojoResult MessagePipe::EnqueueMessageNoLock( |
| 318 unsigned port, | 331 unsigned port, |
| 319 scoped_ptr<MessageInTransit> message, | 332 scoped_ptr<MessageInTransit> message, |
| 320 std::vector<DispatcherTransport>* transports) { | 333 std::vector<DispatcherTransport>* transports) { |
| 321 DCHECK(port == 0 || port == 1); | 334 DCHECK(port == 0 || port == 1); |
| 322 DCHECK(message); | 335 DCHECK(message); |
| 323 | 336 |
| 324 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); | 337 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); |
| 325 | |
| 326 base::AutoLock locker(lock_); | |
| 327 DCHECK(endpoints_[GetPeerPort(port)]); | 338 DCHECK(endpoints_[GetPeerPort(port)]); |
| 328 | 339 |
| 329 // The destination port need not be open, unlike the source port. | 340 // The destination port need not be open, unlike the source port. |
| 330 if (!endpoints_[port]) | 341 if (!endpoints_[port]) |
| 331 return MOJO_RESULT_FAILED_PRECONDITION; | 342 return MOJO_RESULT_FAILED_PRECONDITION; |
| 332 | 343 |
| 333 if (transports) { | 344 if (transports) { |
| 334 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); | 345 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); |
| 335 if (result != MOJO_RESULT_OK) | 346 if (result != MOJO_RESULT_OK) |
| 336 return result; | 347 return result; |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 380 LOG(WARNING) << "Enqueueing null dispatcher"; | 391 LOG(WARNING) << "Enqueueing null dispatcher"; |
| 381 dispatchers->push_back(nullptr); | 392 dispatchers->push_back(nullptr); |
| 382 } | 393 } |
| 383 } | 394 } |
| 384 message->SetDispatchers(dispatchers.Pass()); | 395 message->SetDispatchers(dispatchers.Pass()); |
| 385 return MOJO_RESULT_OK; | 396 return MOJO_RESULT_OK; |
| 386 } | 397 } |
| 387 | 398 |
| 388 } // namespace system | 399 } // namespace system |
| 389 } // namespace mojo | 400 } // namespace mojo |
| OLD | NEW |