| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe.h" | 5 #include "mojo/edk/system/message_pipe.h" |
| 6 | 6 |
| 7 #include <memory> | 7 #include <memory> |
| 8 #include <utility> | 8 #include <utility> |
| 9 | 9 |
| 10 #include "base/logging.h" | 10 #include "base/logging.h" |
| 11 #include "mojo/edk/system/channel.h" | 11 #include "mojo/edk/system/channel.h" |
| 12 #include "mojo/edk/system/channel_endpoint.h" | 12 #include "mojo/edk/system/channel_endpoint.h" |
| 13 #include "mojo/edk/system/channel_endpoint_id.h" | 13 #include "mojo/edk/system/channel_endpoint_id.h" |
| 14 #include "mojo/edk/system/incoming_endpoint.h" | 14 #include "mojo/edk/system/incoming_endpoint.h" |
| 15 #include "mojo/edk/system/local_message_pipe_endpoint.h" | 15 #include "mojo/edk/system/local_message_pipe_endpoint.h" |
| 16 #include "mojo/edk/system/message_in_transit.h" | 16 #include "mojo/edk/system/message_in_transit.h" |
| 17 #include "mojo/edk/system/message_pipe_dispatcher.h" | 17 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 18 #include "mojo/edk/system/message_pipe_endpoint.h" | 18 #include "mojo/edk/system/message_pipe_endpoint.h" |
| 19 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | 19 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" |
| 20 #include "mojo/edk/util/make_unique.h" | 20 #include "mojo/edk/util/make_unique.h" |
| 21 | 21 |
| 22 namespace mojo { | 22 namespace mojo { |
| 23 namespace system { | 23 namespace system { |
| 24 | 24 |
| 25 // static | 25 // static |
| 26 MessagePipe* MessagePipe::CreateLocalLocal() MOJO_NO_THREAD_SAFETY_ANALYSIS { | 26 RefPtr<MessagePipe> MessagePipe::CreateLocalLocal() |
| 27 MessagePipe* message_pipe = new MessagePipe(); | 27 MOJO_NO_THREAD_SAFETY_ANALYSIS { |
| 28 RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe()); |
| 28 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 29 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
| 29 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 30 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
| 30 return message_pipe; | 31 return message_pipe; |
| 31 } | 32 } |
| 32 | 33 |
| 33 // static | 34 // static |
| 34 MessagePipe* MessagePipe::CreateLocalProxy( | 35 RefPtr<MessagePipe> MessagePipe::CreateLocalProxy( |
| 35 RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { | 36 RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { |
| 36 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. | 37 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
| 37 MessagePipe* message_pipe = new MessagePipe(); | 38 RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe()); |
| 38 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 39 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
| 39 *channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe, 1); | 40 *channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe.Clone(), 1); |
| 40 message_pipe->endpoints_[1].reset( | 41 message_pipe->endpoints_[1].reset( |
| 41 new ProxyMessagePipeEndpoint(channel_endpoint->Clone())); | 42 new ProxyMessagePipeEndpoint(channel_endpoint->Clone())); |
| 42 return message_pipe; | 43 return message_pipe; |
| 43 } | 44 } |
| 44 | 45 |
| 45 // static | 46 // static |
| 46 MessagePipe* MessagePipe::CreateLocalProxyFromExisting( | 47 RefPtr<MessagePipe> MessagePipe::CreateLocalProxyFromExisting( |
| 47 MessageInTransitQueue* message_queue, | 48 MessageInTransitQueue* message_queue, |
| 48 RefPtr<ChannelEndpoint>&& channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { | 49 RefPtr<ChannelEndpoint>&& channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { |
| 49 DCHECK(message_queue); | 50 DCHECK(message_queue); |
| 50 MessagePipe* message_pipe = new MessagePipe(); | 51 RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe()); |
| 51 message_pipe->endpoints_[0].reset( | 52 message_pipe->endpoints_[0].reset( |
| 52 new LocalMessagePipeEndpoint(message_queue)); | 53 new LocalMessagePipeEndpoint(message_queue)); |
| 53 if (channel_endpoint) { | 54 if (channel_endpoint) { |
| 54 bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1); | 55 bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1); |
| 55 message_pipe->endpoints_[1].reset( | 56 message_pipe->endpoints_[1].reset( |
| 56 new ProxyMessagePipeEndpoint(std::move(channel_endpoint))); | 57 new ProxyMessagePipeEndpoint(std::move(channel_endpoint))); |
| 57 if (!attached_to_channel) | 58 if (!attached_to_channel) |
| 58 message_pipe->OnDetachFromChannel(1); | 59 message_pipe->OnDetachFromChannel(1); |
| 59 } else { | 60 } else { |
| 60 // This means that the proxy side was already closed; we only need to inform | 61 // This means that the proxy side was already closed; we only need to inform |
| 61 // the local side of this. | 62 // the local side of this. |
| 62 // TODO(vtl): This is safe to do without locking (but perhaps slightly | 63 // TODO(vtl): This is safe to do without locking (but perhaps slightly |
| 63 // dubious), since no other thread has access to |message_pipe| yet. | 64 // dubious), since no other thread has access to |message_pipe| yet. |
| 64 message_pipe->endpoints_[0]->OnPeerClose(); | 65 message_pipe->endpoints_[0]->OnPeerClose(); |
| 65 } | 66 } |
| 66 return message_pipe; | 67 return message_pipe; |
| 67 } | 68 } |
| 68 | 69 |
| 69 // static | 70 // static |
| 70 MessagePipe* MessagePipe::CreateProxyLocal( | 71 RefPtr<MessagePipe> MessagePipe::CreateProxyLocal( |
| 71 RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { | 72 RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { |
| 72 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. | 73 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
| 73 MessagePipe* message_pipe = new MessagePipe(); | 74 RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe()); |
| 74 *channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe, 0); | 75 *channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe, 0); |
| 75 message_pipe->endpoints_[0].reset( | 76 message_pipe->endpoints_[0].reset( |
| 76 new ProxyMessagePipeEndpoint(channel_endpoint->Clone())); | 77 new ProxyMessagePipeEndpoint(channel_endpoint->Clone())); |
| 77 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 78 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
| 78 return message_pipe; | 79 return message_pipe; |
| 79 } | 80 } |
| 80 | 81 |
| 81 // static | 82 // static |
| 82 unsigned MessagePipe::GetPeerPort(unsigned port) { | 83 unsigned MessagePipe::GetPeerPort(unsigned port) { |
| 83 DCHECK(port == 0 || port == 1); | 84 DCHECK(port == 0 || port == 1); |
| 84 return port ^ 1; | 85 return port ^ 1; |
| 85 } | 86 } |
| 86 | 87 |
| 87 // static | 88 // static |
| 88 bool MessagePipe::Deserialize(Channel* channel, | 89 bool MessagePipe::Deserialize(Channel* channel, |
| 89 const void* source, | 90 const void* source, |
| 90 size_t size, | 91 size_t size, |
| 91 scoped_refptr<MessagePipe>* message_pipe, | 92 RefPtr<MessagePipe>* message_pipe, |
| 92 unsigned* port) { | 93 unsigned* port) { |
| 93 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. | 94 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. |
| 94 | 95 |
| 95 if (size != channel->GetSerializedEndpointSize()) { | 96 if (size != channel->GetSerializedEndpointSize()) { |
| 96 LOG(ERROR) << "Invalid serialized message pipe"; | 97 LOG(ERROR) << "Invalid serialized message pipe"; |
| 97 return false; | 98 return false; |
| 98 } | 99 } |
| 99 | 100 |
| 100 scoped_refptr<IncomingEndpoint> incoming_endpoint = | 101 RefPtr<IncomingEndpoint> incoming_endpoint = |
| 101 channel->DeserializeEndpoint(source); | 102 channel->DeserializeEndpoint(source); |
| 102 if (!incoming_endpoint) | 103 if (!incoming_endpoint) |
| 103 return false; | 104 return false; |
| 104 | 105 |
| 105 *message_pipe = incoming_endpoint->ConvertToMessagePipe(); | 106 *message_pipe = incoming_endpoint->ConvertToMessagePipe(); |
| 106 DCHECK(*message_pipe); | 107 DCHECK(*message_pipe); |
| 107 *port = 0; | 108 *port = 0; |
| 108 return true; | 109 return true; |
| 109 } | 110 } |
| 110 | 111 |
| (...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 244 if (!endpoints_[peer_port]) { | 245 if (!endpoints_[peer_port]) { |
| 245 // Case 1: (known-)closed peer port. There's no reason for us to continue to | 246 // Case 1: (known-)closed peer port. There's no reason for us to continue to |
| 246 // exist afterwards. | 247 // exist afterwards. |
| 247 channel->SerializeEndpointWithClosedPeer(destination, message_queue); | 248 channel->SerializeEndpointWithClosedPeer(destination, message_queue); |
| 248 } else if (endpoints_[peer_port]->GetType() == | 249 } else if (endpoints_[peer_port]->GetType() == |
| 249 MessagePipeEndpoint::kTypeLocal) { | 250 MessagePipeEndpoint::kTypeLocal) { |
| 250 // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint| | 251 // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint| |
| 251 // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that | 252 // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that |
| 252 // the |Channel| returns to us. | 253 // the |Channel| returns to us. |
| 253 RefPtr<ChannelEndpoint> channel_endpoint = | 254 RefPtr<ChannelEndpoint> channel_endpoint = |
| 254 channel->SerializeEndpointWithLocalPeer(destination, message_queue, | 255 channel->SerializeEndpointWithLocalPeer( |
| 255 this, port); | 256 destination, message_queue, RefPtr<ChannelEndpointClient>(this), |
| 257 port); |
| 256 replacement_endpoint = | 258 replacement_endpoint = |
| 257 new ProxyMessagePipeEndpoint(std::move(channel_endpoint)); | 259 new ProxyMessagePipeEndpoint(std::move(channel_endpoint)); |
| 258 } else { | 260 } else { |
| 259 // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and | 261 // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and |
| 260 // pass it to the |Channel|. There's no reason for us to continue to exist | 262 // pass it to the |Channel|. There's no reason for us to continue to exist |
| 261 // afterwards. | 263 // afterwards. |
| 262 DCHECK_EQ(endpoints_[peer_port]->GetType(), | 264 DCHECK_EQ(endpoints_[peer_port]->GetType(), |
| 263 MessagePipeEndpoint::kTypeProxy); | 265 MessagePipeEndpoint::kTypeProxy); |
| 264 ProxyMessagePipeEndpoint* peer_endpoint = | 266 ProxyMessagePipeEndpoint* peer_endpoint = |
| 265 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); | 267 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); |
| (...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 380 LOG(WARNING) << "Enqueueing null dispatcher"; | 382 LOG(WARNING) << "Enqueueing null dispatcher"; |
| 381 dispatchers->push_back(nullptr); | 383 dispatchers->push_back(nullptr); |
| 382 } | 384 } |
| 383 } | 385 } |
| 384 message->SetDispatchers(std::move(dispatchers)); | 386 message->SetDispatchers(std::move(dispatchers)); |
| 385 return MOJO_RESULT_OK; | 387 return MOJO_RESULT_OK; |
| 386 } | 388 } |
| 387 | 389 |
| 388 } // namespace system | 390 } // namespace system |
| 389 } // namespace mojo | 391 } // namespace mojo |
| OLD | NEW |