| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe.h" | 5 #include "mojo/edk/system/message_pipe.h" |
| 6 | 6 |
| 7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "mojo/edk/system/channel.h" | 8 #include "mojo/edk/system/channel.h" |
| 9 #include "mojo/edk/system/channel_endpoint.h" | 9 #include "mojo/edk/system/channel_endpoint.h" |
| 10 #include "mojo/edk/system/channel_endpoint_id.h" | 10 #include "mojo/edk/system/channel_endpoint_id.h" |
| 11 #include "mojo/edk/system/endpoint_relayer.h" | 11 #include "mojo/edk/system/endpoint_relayer.h" |
| 12 #include "mojo/edk/system/incoming_endpoint.h" |
| 12 #include "mojo/edk/system/local_message_pipe_endpoint.h" | 13 #include "mojo/edk/system/local_message_pipe_endpoint.h" |
| 13 #include "mojo/edk/system/message_in_transit.h" | 14 #include "mojo/edk/system/message_in_transit.h" |
| 14 #include "mojo/edk/system/message_pipe_dispatcher.h" | 15 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 15 #include "mojo/edk/system/message_pipe_endpoint.h" | 16 #include "mojo/edk/system/message_pipe_endpoint.h" |
| 16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | 17 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" |
| 17 | 18 |
| 18 namespace mojo { | 19 namespace mojo { |
| 19 namespace system { | 20 namespace system { |
| 20 | 21 |
| 21 namespace { | |
| 22 | |
| 23 // TODO(vtl): Move this into |Channel| (and possible further). | |
| 24 struct SerializedMessagePipe { | |
| 25 // This is the endpoint ID on the receiving side, and should be a "remote ID". | |
| 26 // (The receiving side should already have had an endpoint attached and been | |
| 27 // run via the |Channel|s. This endpoint will have both IDs assigned, so this | |
| 28 // ID is only needed to associate that endpoint with a particular dispatcher.) | |
| 29 ChannelEndpointId receiver_endpoint_id; | |
| 30 }; | |
| 31 | |
| 32 } // namespace | |
| 33 | |
| 34 // static | 22 // static |
| 35 MessagePipe* MessagePipe::CreateLocalLocal() { | 23 MessagePipe* MessagePipe::CreateLocalLocal() { |
| 36 MessagePipe* message_pipe = new MessagePipe(); | 24 MessagePipe* message_pipe = new MessagePipe(); |
| 37 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 25 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
| 38 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 26 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
| 39 return message_pipe; | 27 return message_pipe; |
| 40 } | 28 } |
| 41 | 29 |
| 42 // static | 30 // static |
| 43 MessagePipe* MessagePipe::CreateLocalProxy( | 31 MessagePipe* MessagePipe::CreateLocalProxy( |
| 44 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | 32 scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
| 45 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. | 33 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
| 46 MessagePipe* message_pipe = new MessagePipe(); | 34 MessagePipe* message_pipe = new MessagePipe(); |
| 47 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 35 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
| 48 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); | 36 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); |
| 49 message_pipe->endpoints_[1].reset( | 37 message_pipe->endpoints_[1].reset( |
| 50 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | 38 new ProxyMessagePipeEndpoint(channel_endpoint->get())); |
| 51 return message_pipe; | 39 return message_pipe; |
| 52 } | 40 } |
| 53 | 41 |
| 54 // static | 42 // static |
| 43 MessagePipe* MessagePipe::CreateLocalProxyFromExisting( |
| 44 MessageInTransitQueue* message_queue, |
| 45 ChannelEndpoint* channel_endpoint) { |
| 46 DCHECK(message_queue); |
| 47 MessagePipe* message_pipe = new MessagePipe(); |
| 48 message_pipe->endpoints_[0].reset( |
| 49 new LocalMessagePipeEndpoint(message_queue)); |
| 50 if (channel_endpoint) { |
| 51 bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1); |
| 52 message_pipe->endpoints_[1].reset( |
| 53 new ProxyMessagePipeEndpoint(channel_endpoint)); |
| 54 if (!attached_to_channel) |
| 55 message_pipe->OnDetachFromChannel(1); |
| 56 } else { |
| 57 // This means that the proxy side was already closed; we only need to inform |
| 58 // the local side of this. |
| 59 // TODO(vtl): This is safe to do without locking (but perhaps slightly |
| 60 // dubious), since no other thread has access to |message_pipe| yet. |
| 61 message_pipe->endpoints_[0]->OnPeerClose(); |
| 62 } |
| 63 return message_pipe; |
| 64 } |
| 65 |
| 66 // static |
| 55 MessagePipe* MessagePipe::CreateProxyLocal( | 67 MessagePipe* MessagePipe::CreateProxyLocal( |
| 56 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | 68 scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
| 57 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. | 69 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
| 58 MessagePipe* message_pipe = new MessagePipe(); | 70 MessagePipe* message_pipe = new MessagePipe(); |
| 59 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); | 71 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); |
| 60 message_pipe->endpoints_[0].reset( | 72 message_pipe->endpoints_[0].reset( |
| 61 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | 73 new ProxyMessagePipeEndpoint(channel_endpoint->get())); |
| 62 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 74 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
| 63 return message_pipe; | 75 return message_pipe; |
| 64 } | 76 } |
| 65 | 77 |
| 66 // static | 78 // static |
| 67 unsigned MessagePipe::GetPeerPort(unsigned port) { | 79 unsigned MessagePipe::GetPeerPort(unsigned port) { |
| 68 DCHECK(port == 0 || port == 1); | 80 DCHECK(port == 0 || port == 1); |
| 69 return port ^ 1; | 81 return port ^ 1; |
| 70 } | 82 } |
| 71 | 83 |
| 72 // static | 84 // static |
| 73 bool MessagePipe::Deserialize(Channel* channel, | 85 bool MessagePipe::Deserialize(Channel* channel, |
| 74 const void* source, | 86 const void* source, |
| 75 size_t size, | 87 size_t size, |
| 76 scoped_refptr<MessagePipe>* message_pipe, | 88 scoped_refptr<MessagePipe>* message_pipe, |
| 77 unsigned* port) { | 89 unsigned* port) { |
| 78 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. | 90 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. |
| 79 | 91 |
| 80 if (size != sizeof(SerializedMessagePipe)) { | 92 if (size != channel->GetSerializedEndpointSize()) { |
| 81 LOG(ERROR) << "Invalid serialized message pipe"; | 93 LOG(ERROR) << "Invalid serialized message pipe"; |
| 82 return false; | 94 return false; |
| 83 } | 95 } |
| 84 | 96 |
| 85 const SerializedMessagePipe* s = | 97 scoped_refptr<IncomingEndpoint> incoming_endpoint = |
| 86 static_cast<const SerializedMessagePipe*>(source); | 98 channel->DeserializeEndpoint(source); |
| 87 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); | 99 if (!incoming_endpoint) |
| 88 if (!*message_pipe) { | |
| 89 LOG(ERROR) << "Failed to deserialize message pipe (ID = " | |
| 90 << s->receiver_endpoint_id << ")"; | |
| 91 return false; | 100 return false; |
| 92 } | |
| 93 | 101 |
| 94 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = " | 102 *message_pipe = incoming_endpoint->ConvertToMessagePipe(); |
| 95 << s->receiver_endpoint_id << ")"; | 103 DCHECK(*message_pipe); |
| 96 *port = 0; | 104 *port = 0; |
| 97 return true; | 105 return true; |
| 98 } | 106 } |
| 99 | 107 |
| 100 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { | 108 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { |
| 101 DCHECK(port == 0 || port == 1); | 109 DCHECK(port == 0 || port == 1); |
| 102 base::AutoLock locker(lock_); | 110 base::AutoLock locker(lock_); |
| 103 DCHECK(endpoints_[port]); | 111 DCHECK(endpoints_[port]); |
| 104 | 112 |
| 105 return endpoints_[port]->GetType(); | 113 return endpoints_[port]->GetType(); |
| (...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 193 HandleSignalsState* signals_state) { | 201 HandleSignalsState* signals_state) { |
| 194 DCHECK(port == 0 || port == 1); | 202 DCHECK(port == 0 || port == 1); |
| 195 | 203 |
| 196 base::AutoLock locker(lock_); | 204 base::AutoLock locker(lock_); |
| 197 DCHECK(endpoints_[port]); | 205 DCHECK(endpoints_[port]); |
| 198 | 206 |
| 199 endpoints_[port]->RemoveAwakable(awakable, signals_state); | 207 endpoints_[port]->RemoveAwakable(awakable, signals_state); |
| 200 } | 208 } |
| 201 | 209 |
| 202 void MessagePipe::StartSerialize(unsigned /*port*/, | 210 void MessagePipe::StartSerialize(unsigned /*port*/, |
| 203 Channel* /*channel*/, | 211 Channel* channel, |
| 204 size_t* max_size, | 212 size_t* max_size, |
| 205 size_t* max_platform_handles) { | 213 size_t* max_platform_handles) { |
| 206 *max_size = sizeof(SerializedMessagePipe); | 214 *max_size = channel->GetSerializedEndpointSize(); |
| 207 *max_platform_handles = 0; | 215 *max_platform_handles = 0; |
| 208 } | 216 } |
| 209 | 217 |
| 210 bool MessagePipe::EndSerialize( | 218 bool MessagePipe::EndSerialize( |
| 211 unsigned port, | 219 unsigned port, |
| 212 Channel* channel, | 220 Channel* channel, |
| 213 void* destination, | 221 void* destination, |
| 214 size_t* actual_size, | 222 size_t* actual_size, |
| 215 embedder::PlatformHandleVector* /*platform_handles*/) { | 223 embedder::PlatformHandleVector* /*platform_handles*/) { |
| 216 DCHECK(port == 0 || port == 1); | 224 DCHECK(port == 0 || port == 1); |
| (...skipping 25 matching lines...) Expand all Loading... |
| 242 // port's message pipe dispatcher will continue to hold a reference to | 250 // port's message pipe dispatcher will continue to hold a reference to |
| 243 // us. | 251 // us. |
| 244 // | 252 // |
| 245 // 3. The peer port is remote. | 253 // 3. The peer port is remote. |
| 246 // | 254 // |
| 247 // We also pass its |ChannelEndpoint| to the channel, which then decides | 255 // We also pass its |ChannelEndpoint| to the channel, which then decides |
| 248 // what to do. We have no reason to continue to exist. | 256 // what to do. We have no reason to continue to exist. |
| 249 // | 257 // |
| 250 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|). | 258 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|). |
| 251 | 259 |
| 260 // The replacement for |endpoints_[port]|, if any. |
| 261 MessagePipeEndpoint* replacement_endpoint = nullptr; |
| 262 |
| 252 unsigned peer_port = GetPeerPort(port); | 263 unsigned peer_port = GetPeerPort(port); |
| 253 if (!endpoints_[peer_port]) { | 264 if (!endpoints_[peer_port]) { // Case 1. |
| 254 // Case 1. | |
| 255 channel_endpoint = new ChannelEndpoint( | 265 channel_endpoint = new ChannelEndpoint( |
| 256 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( | 266 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( |
| 257 endpoints_[port].get())->message_queue()); | 267 endpoints_[port].get())->message_queue()); |
| 258 endpoints_[port]->Close(); | |
| 259 endpoints_[port].reset(); | |
| 260 } else if (endpoints_[peer_port]->GetType() == | 268 } else if (endpoints_[peer_port]->GetType() == |
| 261 MessagePipeEndpoint::kTypeLocal) { | 269 MessagePipeEndpoint::kTypeLocal) { // Case 2. |
| 262 // Case 2. | |
| 263 channel_endpoint = new ChannelEndpoint( | 270 channel_endpoint = new ChannelEndpoint( |
| 264 this, port, static_cast<LocalMessagePipeEndpoint*>( | 271 this, port, static_cast<LocalMessagePipeEndpoint*>( |
| 265 endpoints_[port].get())->message_queue()); | 272 endpoints_[port].get())->message_queue()); |
| 266 endpoints_[port]->Close(); | 273 replacement_endpoint = |
| 267 endpoints_[port].reset( | 274 new ProxyMessagePipeEndpoint(channel_endpoint.get()); |
| 268 new ProxyMessagePipeEndpoint(channel_endpoint.get())); | 275 } else { // Case 3. |
| 269 } else { | |
| 270 // Case 3. | |
| 271 DLOG(WARNING) << "Direct message pipe passing across multiple channels " | 276 DLOG(WARNING) << "Direct message pipe passing across multiple channels " |
| 272 "not yet implemented; will proxy"; | 277 "not yet implemented; will proxy"; |
| 273 | 278 |
| 274 // Create an |EndpointRelayer| to replace ourselves (rather than having a | 279 // Create an |EndpointRelayer| to replace ourselves (rather than having a |
| 275 // |MessagePipe| object that exists solely to relay messages between two | 280 // |MessagePipe| object that exists solely to relay messages between two |
| 276 // |ChannelEndpoint|s, owned by the |Channel| through them. | 281 // |ChannelEndpoint|s, owned by the |Channel| through them. |
| 277 // | 282 // |
| 278 // This reduces overhead somewhat, and more importantly restores some | 283 // This reduces overhead somewhat, and more importantly restores some |
| 279 // invariants, e.g., that |MessagePipe|s are owned by dispatchers. | 284 // invariants, e.g., that |MessagePipe|s are owned by dispatchers. |
| 280 // | 285 // |
| (...skipping 14 matching lines...) Expand all Loading... |
| 295 | 300 |
| 296 scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer()); | 301 scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer()); |
| 297 // We'll assign our peer port's endpoint to the relayer's port 1, and this | 302 // We'll assign our peer port's endpoint to the relayer's port 1, and this |
| 298 // port's endpoint to the relayer's port 0. | 303 // port's endpoint to the relayer's port 0. |
| 299 channel_endpoint = new ChannelEndpoint( | 304 channel_endpoint = new ChannelEndpoint( |
| 300 relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>( | 305 relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>( |
| 301 endpoints_[port].get())->message_queue()); | 306 endpoints_[port].get())->message_queue()); |
| 302 relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get()); | 307 relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get()); |
| 303 peer_channel_endpoint->ReplaceClient(relayer.get(), 1); | 308 peer_channel_endpoint->ReplaceClient(relayer.get(), 1); |
| 304 | 309 |
| 305 endpoints_[port]->Close(); | |
| 306 endpoints_[port].reset(); | |
| 307 // No need to call |Close()| after |ReleaseChannelEndpoint()|. | 310 // No need to call |Close()| after |ReleaseChannelEndpoint()|. |
| 308 endpoints_[peer_port].reset(); | 311 endpoints_[peer_port].reset(); |
| 309 } | 312 } |
| 313 |
| 314 endpoints_[port]->Close(); |
| 315 endpoints_[port].reset(replacement_endpoint); |
| 310 } | 316 } |
| 311 | 317 |
| 312 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); | 318 // TODO(vtl): More/most of the above should be moved into (some variant of) |
| 313 | 319 // |Channel::SerializeEndpoint()|. |
| 314 // Convert the local endpoint to a proxy endpoint (moving the message queue) | 320 channel->SerializeEndpoint(channel_endpoint, destination); |
| 315 // and attach it to the channel. | 321 *actual_size = channel->GetSerializedEndpointSize(); |
| 316 s->receiver_endpoint_id = | |
| 317 channel->AttachAndRunEndpoint(channel_endpoint, false); | |
| 318 DVLOG(2) << "Serializing message pipe (remote ID = " | |
| 319 << s->receiver_endpoint_id << ")"; | |
| 320 *actual_size = sizeof(SerializedMessagePipe); | |
| 321 return true; | 322 return true; |
| 322 } | 323 } |
| 323 | 324 |
| 324 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { | 325 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { |
| 325 base::AutoLock locker(lock_); | 326 base::AutoLock locker(lock_); |
| 326 | 327 |
| 327 if (!endpoints_[port]) { | 328 if (!endpoints_[port]) { |
| 328 // This will happen only on the rare occasion that the call to | 329 // This will happen only on the rare occasion that the call to |
| 329 // |OnReadMessage()| is racing with us calling | 330 // |OnReadMessage()| is racing with us calling |
| 330 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, | 331 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, |
| (...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 423 LOG(WARNING) << "Enqueueing null dispatcher"; | 424 LOG(WARNING) << "Enqueueing null dispatcher"; |
| 424 dispatchers->push_back(nullptr); | 425 dispatchers->push_back(nullptr); |
| 425 } | 426 } |
| 426 } | 427 } |
| 427 message->SetDispatchers(dispatchers.Pass()); | 428 message->SetDispatchers(dispatchers.Pass()); |
| 428 return MOJO_RESULT_OK; | 429 return MOJO_RESULT_OK; |
| 429 } | 430 } |
| 430 | 431 |
| 431 } // namespace system | 432 } // namespace system |
| 432 } // namespace mojo | 433 } // namespace mojo |
| OLD | NEW |