| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe.h" | 5 #include "mojo/edk/system/message_pipe.h" |
| 6 | 6 |
| 7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "mojo/edk/system/channel.h" | 8 #include "mojo/edk/system/channel.h" |
| 9 #include "mojo/edk/system/channel_endpoint.h" | 9 #include "mojo/edk/system/channel_endpoint.h" |
| 10 #include "mojo/edk/system/channel_endpoint_id.h" | 10 #include "mojo/edk/system/channel_endpoint_id.h" |
| 11 #include "mojo/edk/system/endpoint_relayer.h" | |
| 12 #include "mojo/edk/system/incoming_endpoint.h" | 11 #include "mojo/edk/system/incoming_endpoint.h" |
| 13 #include "mojo/edk/system/local_message_pipe_endpoint.h" | 12 #include "mojo/edk/system/local_message_pipe_endpoint.h" |
| 14 #include "mojo/edk/system/message_in_transit.h" | 13 #include "mojo/edk/system/message_in_transit.h" |
| 15 #include "mojo/edk/system/message_pipe_dispatcher.h" | 14 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 16 #include "mojo/edk/system/message_pipe_endpoint.h" | 15 #include "mojo/edk/system/message_pipe_endpoint.h" |
| 17 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | 16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" |
| 18 | 17 |
| 19 namespace mojo { | 18 namespace mojo { |
| 20 namespace system { | 19 namespace system { |
| 21 | 20 |
| (...skipping 194 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 216 } | 215 } |
| 217 | 216 |
| 218 bool MessagePipe::EndSerialize( | 217 bool MessagePipe::EndSerialize( |
| 219 unsigned port, | 218 unsigned port, |
| 220 Channel* channel, | 219 Channel* channel, |
| 221 void* destination, | 220 void* destination, |
| 222 size_t* actual_size, | 221 size_t* actual_size, |
| 223 embedder::PlatformHandleVector* /*platform_handles*/) { | 222 embedder::PlatformHandleVector* /*platform_handles*/) { |
| 224 DCHECK(port == 0 || port == 1); | 223 DCHECK(port == 0 || port == 1); |
| 225 | 224 |
| 226 scoped_refptr<ChannelEndpoint> channel_endpoint; | 225 base::AutoLock locker(lock_); |
| 227 { | 226 DCHECK(endpoints_[port]); |
| 228 base::AutoLock locker(lock_); | |
| 229 DCHECK(endpoints_[port]); | |
| 230 | 227 |
| 231 // The port being serialized must be local. | 228 // The port being serialized must be local. |
| 232 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); | 229 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); |
| 233 | 230 |
| 234 // There are three possibilities for the peer port (below). In all cases, we | 231 unsigned peer_port = GetPeerPort(port); |
| 235 // pass the contents of |port|'s message queue to the channel, and it'll | 232 MessageInTransitQueue* message_queue = |
| 236 // (presumably) make a |ChannelEndpoint| from it. | 233 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()) |
| 237 // | 234 ->message_queue(); |
| 238 // 1. The peer port is (known to be) closed. | 235 // The replacement for |endpoints_[port]|, if any. |
| 239 // | 236 MessagePipeEndpoint* replacement_endpoint = nullptr; |
| 240 // There's no reason for us to continue to exist and no need for the | |
| 241 // channel to give us the |ChannelEndpoint|. It only remains for us to | |
| 242 // "close" |port|'s |LocalMessagePipeEndpoint| and prepare for | |
| 243 // destruction. | |
| 244 // | |
| 245 // 2. The peer port is local (the typical case). | |
| 246 // | |
| 247 // The channel gives us back a |ChannelEndpoint|, which we hook up to a | |
| 248 // |ProxyMessagePipeEndpoint| to replace |port|'s | |
| 249 // |LocalMessagePipeEndpoint|. We continue to exist, since the peer | |
| 250 // port's message pipe dispatcher will continue to hold a reference to | |
| 251 // us. | |
| 252 // | |
| 253 // 3. The peer port is remote. | |
| 254 // | |
| 255 // We also pass its |ChannelEndpoint| to the channel, which then decides | |
| 256 // what to do. We have no reason to continue to exist. | |
| 257 // | |
| 258 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|). | |
| 259 | 237 |
| 260 // The replacement for |endpoints_[port]|, if any. | 238 // The three cases below correspond to the ones described above |
| 261 MessagePipeEndpoint* replacement_endpoint = nullptr; | 239 // |Channel::SerializeEndpoint...()| (in channel.h). |
| 262 | 240 if (!endpoints_[peer_port]) { |
| 263 unsigned peer_port = GetPeerPort(port); | 241 // Case 1: (known-)closed peer port. There's no reason for us to continue to |
| 264 if (!endpoints_[peer_port]) { // Case 1. | 242 // exist afterwards. |
| 265 channel_endpoint = new ChannelEndpoint( | 243 channel->SerializeEndpointWithClosedPeer(destination, message_queue); |
| 266 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( | 244 } else if (endpoints_[peer_port]->GetType() == |
| 267 endpoints_[port].get())->message_queue()); | 245 MessagePipeEndpoint::kTypeLocal) { |
| 268 } else if (endpoints_[peer_port]->GetType() == | 246 // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint| |
| 269 MessagePipeEndpoint::kTypeLocal) { // Case 2. | 247 // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that |
| 270 channel_endpoint = new ChannelEndpoint( | 248 // the |Channel| returns to us. |
| 271 this, port, static_cast<LocalMessagePipeEndpoint*>( | 249 scoped_refptr<ChannelEndpoint> channel_endpoint = |
| 272 endpoints_[port].get())->message_queue()); | 250 channel->SerializeEndpointWithLocalPeer(destination, message_queue, |
| 273 replacement_endpoint = | 251 this, port); |
| 274 new ProxyMessagePipeEndpoint(channel_endpoint.get()); | 252 replacement_endpoint = new ProxyMessagePipeEndpoint(channel_endpoint.get()); |
| 275 } else { // Case 3. | 253 } else { |
| 276 DLOG(WARNING) << "Direct message pipe passing across multiple channels " | 254 // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and |
| 277 "not yet implemented; will proxy"; | 255 // pass it to the |Channel|. There's no reason for us to continue to exist |
| 278 | 256 // afterwards. |
| 279 // Create an |EndpointRelayer| to replace ourselves (rather than having a | 257 DCHECK_EQ(endpoints_[peer_port]->GetType(), |
| 280 // |MessagePipe| object that exists solely to relay messages between two | 258 MessagePipeEndpoint::kTypeProxy); |
| 281 // |ChannelEndpoint|s, owned by the |Channel| through them. | 259 ProxyMessagePipeEndpoint* peer_endpoint = |
| 282 // | 260 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); |
| 283 // This reduces overhead somewhat, and more importantly restores some | 261 scoped_refptr<ChannelEndpoint> peer_channel_endpoint = |
| 284 // invariants, e.g., that |MessagePipe|s are owned by dispatchers. | 262 peer_endpoint->ReleaseChannelEndpoint(); |
| 285 // | 263 channel->SerializeEndpointWithRemotePeer(destination, message_queue, |
| 286 // TODO(vtl): If we get the |Channel| to own/track the relayer directly, | 264 peer_channel_endpoint); |
| 287 // then possibly we could make |ChannelEndpoint|'s |client_| pointer a raw | 265 // No need to call |Close()| after |ReleaseChannelEndpoint()|. |
| 288 // pointer (and not have the |Channel| owning the relayer via its | 266 endpoints_[peer_port].reset(); |
| 289 // |ChannelEndpoint|s. | |
| 290 // | |
| 291 // TODO(vtl): This is not obviously the right place for (all of) this | |
| 292 // logic, nor is it obviously factored correctly. | |
| 293 | |
| 294 DCHECK_EQ(endpoints_[peer_port]->GetType(), | |
| 295 MessagePipeEndpoint::kTypeProxy); | |
| 296 ProxyMessagePipeEndpoint* peer_endpoint = | |
| 297 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); | |
| 298 scoped_refptr<ChannelEndpoint> peer_channel_endpoint = | |
| 299 peer_endpoint->ReleaseChannelEndpoint(); | |
| 300 | |
| 301 scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer()); | |
| 302 // We'll assign our peer port's endpoint to the relayer's port 1, and this | |
| 303 // port's endpoint to the relayer's port 0. | |
| 304 channel_endpoint = new ChannelEndpoint( | |
| 305 relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>( | |
| 306 endpoints_[port].get())->message_queue()); | |
| 307 relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get()); | |
| 308 peer_channel_endpoint->ReplaceClient(relayer.get(), 1); | |
| 309 | |
| 310 // No need to call |Close()| after |ReleaseChannelEndpoint()|. | |
| 311 endpoints_[peer_port].reset(); | |
| 312 } | |
| 313 | |
| 314 endpoints_[port]->Close(); | |
| 315 endpoints_[port].reset(replacement_endpoint); | |
| 316 } | 267 } |
| 317 | 268 |
| 318 // TODO(vtl): More/most of the above should be moved into (some variant of) | 269 endpoints_[port]->Close(); |
| 319 // |Channel::SerializeEndpoint()|. | 270 endpoints_[port].reset(replacement_endpoint); |
| 320 channel->SerializeEndpoint(channel_endpoint, destination); | 271 |
| 321 *actual_size = channel->GetSerializedEndpointSize(); | 272 *actual_size = channel->GetSerializedEndpointSize(); |
| 322 return true; | 273 return true; |
| 323 } | 274 } |
| 324 | 275 |
| 325 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { | 276 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { |
| 326 base::AutoLock locker(lock_); | 277 base::AutoLock locker(lock_); |
| 327 | 278 |
| 328 if (!endpoints_[port]) { | 279 if (!endpoints_[port]) { |
| 329 // This will happen only on the rare occasion that the call to | 280 // This will happen only on the rare occasion that the call to |
| 330 // |OnReadMessage()| is racing with us calling | 281 // |OnReadMessage()| is racing with us calling |
| (...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 424 LOG(WARNING) << "Enqueueing null dispatcher"; | 375 LOG(WARNING) << "Enqueueing null dispatcher"; |
| 425 dispatchers->push_back(nullptr); | 376 dispatchers->push_back(nullptr); |
| 426 } | 377 } |
| 427 } | 378 } |
| 428 message->SetDispatchers(dispatchers.Pass()); | 379 message->SetDispatchers(dispatchers.Pass()); |
| 429 return MOJO_RESULT_OK; | 380 return MOJO_RESULT_OK; |
| 430 } | 381 } |
| 431 | 382 |
| 432 } // namespace system | 383 } // namespace system |
| 433 } // namespace mojo | 384 } // namespace mojo |
| OLD | NEW |