OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe.h" | 5 #include "mojo/edk/system/message_pipe.h" |
6 | 6 |
7 #include <memory> | 7 #include <memory> |
8 #include <utility> | 8 #include <utility> |
9 | 9 |
10 #include "base/logging.h" | 10 #include "base/logging.h" |
11 #include "mojo/edk/system/channel.h" | 11 #include "mojo/edk/system/channel.h" |
12 #include "mojo/edk/system/channel_endpoint.h" | 12 #include "mojo/edk/system/channel_endpoint.h" |
13 #include "mojo/edk/system/channel_endpoint_id.h" | 13 #include "mojo/edk/system/channel_endpoint_id.h" |
14 #include "mojo/edk/system/incoming_endpoint.h" | 14 #include "mojo/edk/system/incoming_endpoint.h" |
15 #include "mojo/edk/system/local_message_pipe_endpoint.h" | 15 #include "mojo/edk/system/local_message_pipe_endpoint.h" |
16 #include "mojo/edk/system/message_in_transit.h" | 16 #include "mojo/edk/system/message_in_transit.h" |
17 #include "mojo/edk/system/message_pipe_dispatcher.h" | 17 #include "mojo/edk/system/message_pipe_dispatcher.h" |
18 #include "mojo/edk/system/message_pipe_endpoint.h" | 18 #include "mojo/edk/system/message_pipe_endpoint.h" |
19 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | 19 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" |
20 #include "mojo/edk/util/make_unique.h" | 20 #include "mojo/edk/util/make_unique.h" |
21 | 21 |
22 namespace mojo { | 22 namespace mojo { |
23 namespace system { | 23 namespace system { |
24 | 24 |
25 // static | 25 // static |
26 MessagePipe* MessagePipe::CreateLocalLocal() MOJO_NO_THREAD_SAFETY_ANALYSIS { | 26 RefPtr<MessagePipe> MessagePipe::CreateLocalLocal() |
27 MessagePipe* message_pipe = new MessagePipe(); | 27 MOJO_NO_THREAD_SAFETY_ANALYSIS { |
| 28 RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe()); |
28 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 29 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
29 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 30 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
30 return message_pipe; | 31 return message_pipe; |
31 } | 32 } |
32 | 33 |
33 // static | 34 // static |
34 MessagePipe* MessagePipe::CreateLocalProxy( | 35 RefPtr<MessagePipe> MessagePipe::CreateLocalProxy( |
35 RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { | 36 RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { |
36 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. | 37 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
37 MessagePipe* message_pipe = new MessagePipe(); | 38 RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe()); |
38 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 39 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
39 *channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe, 1); | 40 *channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe.Clone(), 1); |
40 message_pipe->endpoints_[1].reset( | 41 message_pipe->endpoints_[1].reset( |
41 new ProxyMessagePipeEndpoint(channel_endpoint->Clone())); | 42 new ProxyMessagePipeEndpoint(channel_endpoint->Clone())); |
42 return message_pipe; | 43 return message_pipe; |
43 } | 44 } |
44 | 45 |
45 // static | 46 // static |
46 MessagePipe* MessagePipe::CreateLocalProxyFromExisting( | 47 RefPtr<MessagePipe> MessagePipe::CreateLocalProxyFromExisting( |
47 MessageInTransitQueue* message_queue, | 48 MessageInTransitQueue* message_queue, |
48 RefPtr<ChannelEndpoint>&& channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { | 49 RefPtr<ChannelEndpoint>&& channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { |
49 DCHECK(message_queue); | 50 DCHECK(message_queue); |
50 MessagePipe* message_pipe = new MessagePipe(); | 51 RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe()); |
51 message_pipe->endpoints_[0].reset( | 52 message_pipe->endpoints_[0].reset( |
52 new LocalMessagePipeEndpoint(message_queue)); | 53 new LocalMessagePipeEndpoint(message_queue)); |
53 if (channel_endpoint) { | 54 if (channel_endpoint) { |
54 bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1); | 55 bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1); |
55 message_pipe->endpoints_[1].reset( | 56 message_pipe->endpoints_[1].reset( |
56 new ProxyMessagePipeEndpoint(std::move(channel_endpoint))); | 57 new ProxyMessagePipeEndpoint(std::move(channel_endpoint))); |
57 if (!attached_to_channel) | 58 if (!attached_to_channel) |
58 message_pipe->OnDetachFromChannel(1); | 59 message_pipe->OnDetachFromChannel(1); |
59 } else { | 60 } else { |
60 // This means that the proxy side was already closed; we only need to inform | 61 // This means that the proxy side was already closed; we only need to inform |
61 // the local side of this. | 62 // the local side of this. |
62 // TODO(vtl): This is safe to do without locking (but perhaps slightly | 63 // TODO(vtl): This is safe to do without locking (but perhaps slightly |
63 // dubious), since no other thread has access to |message_pipe| yet. | 64 // dubious), since no other thread has access to |message_pipe| yet. |
64 message_pipe->endpoints_[0]->OnPeerClose(); | 65 message_pipe->endpoints_[0]->OnPeerClose(); |
65 } | 66 } |
66 return message_pipe; | 67 return message_pipe; |
67 } | 68 } |
68 | 69 |
69 // static | 70 // static |
70 MessagePipe* MessagePipe::CreateProxyLocal( | 71 RefPtr<MessagePipe> MessagePipe::CreateProxyLocal( |
71 RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { | 72 RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { |
72 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. | 73 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
73 MessagePipe* message_pipe = new MessagePipe(); | 74 RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe()); |
74 *channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe, 0); | 75 *channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe, 0); |
75 message_pipe->endpoints_[0].reset( | 76 message_pipe->endpoints_[0].reset( |
76 new ProxyMessagePipeEndpoint(channel_endpoint->Clone())); | 77 new ProxyMessagePipeEndpoint(channel_endpoint->Clone())); |
77 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 78 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
78 return message_pipe; | 79 return message_pipe; |
79 } | 80 } |
80 | 81 |
81 // static | 82 // static |
82 unsigned MessagePipe::GetPeerPort(unsigned port) { | 83 unsigned MessagePipe::GetPeerPort(unsigned port) { |
83 DCHECK(port == 0 || port == 1); | 84 DCHECK(port == 0 || port == 1); |
84 return port ^ 1; | 85 return port ^ 1; |
85 } | 86 } |
86 | 87 |
87 // static | 88 // static |
88 bool MessagePipe::Deserialize(Channel* channel, | 89 bool MessagePipe::Deserialize(Channel* channel, |
89 const void* source, | 90 const void* source, |
90 size_t size, | 91 size_t size, |
91 scoped_refptr<MessagePipe>* message_pipe, | 92 RefPtr<MessagePipe>* message_pipe, |
92 unsigned* port) { | 93 unsigned* port) { |
93 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. | 94 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. |
94 | 95 |
95 if (size != channel->GetSerializedEndpointSize()) { | 96 if (size != channel->GetSerializedEndpointSize()) { |
96 LOG(ERROR) << "Invalid serialized message pipe"; | 97 LOG(ERROR) << "Invalid serialized message pipe"; |
97 return false; | 98 return false; |
98 } | 99 } |
99 | 100 |
100 scoped_refptr<IncomingEndpoint> incoming_endpoint = | 101 RefPtr<IncomingEndpoint> incoming_endpoint = |
101 channel->DeserializeEndpoint(source); | 102 channel->DeserializeEndpoint(source); |
102 if (!incoming_endpoint) | 103 if (!incoming_endpoint) |
103 return false; | 104 return false; |
104 | 105 |
105 *message_pipe = incoming_endpoint->ConvertToMessagePipe(); | 106 *message_pipe = incoming_endpoint->ConvertToMessagePipe(); |
106 DCHECK(*message_pipe); | 107 DCHECK(*message_pipe); |
107 *port = 0; | 108 *port = 0; |
108 return true; | 109 return true; |
109 } | 110 } |
110 | 111 |
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
244 if (!endpoints_[peer_port]) { | 245 if (!endpoints_[peer_port]) { |
245 // Case 1: (known-)closed peer port. There's no reason for us to continue to | 246 // Case 1: (known-)closed peer port. There's no reason for us to continue to |
246 // exist afterwards. | 247 // exist afterwards. |
247 channel->SerializeEndpointWithClosedPeer(destination, message_queue); | 248 channel->SerializeEndpointWithClosedPeer(destination, message_queue); |
248 } else if (endpoints_[peer_port]->GetType() == | 249 } else if (endpoints_[peer_port]->GetType() == |
249 MessagePipeEndpoint::kTypeLocal) { | 250 MessagePipeEndpoint::kTypeLocal) { |
250 // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint| | 251 // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint| |
251 // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that | 252 // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that |
252 // the |Channel| returns to us. | 253 // the |Channel| returns to us. |
253 RefPtr<ChannelEndpoint> channel_endpoint = | 254 RefPtr<ChannelEndpoint> channel_endpoint = |
254 channel->SerializeEndpointWithLocalPeer(destination, message_queue, | 255 channel->SerializeEndpointWithLocalPeer( |
255 this, port); | 256 destination, message_queue, RefPtr<ChannelEndpointClient>(this), |
| 257 port); |
256 replacement_endpoint = | 258 replacement_endpoint = |
257 new ProxyMessagePipeEndpoint(std::move(channel_endpoint)); | 259 new ProxyMessagePipeEndpoint(std::move(channel_endpoint)); |
258 } else { | 260 } else { |
259 // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and | 261 // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and |
260 // pass it to the |Channel|. There's no reason for us to continue to exist | 262 // pass it to the |Channel|. There's no reason for us to continue to exist |
261 // afterwards. | 263 // afterwards. |
262 DCHECK_EQ(endpoints_[peer_port]->GetType(), | 264 DCHECK_EQ(endpoints_[peer_port]->GetType(), |
263 MessagePipeEndpoint::kTypeProxy); | 265 MessagePipeEndpoint::kTypeProxy); |
264 ProxyMessagePipeEndpoint* peer_endpoint = | 266 ProxyMessagePipeEndpoint* peer_endpoint = |
265 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); | 267 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); |
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
380 LOG(WARNING) << "Enqueueing null dispatcher"; | 382 LOG(WARNING) << "Enqueueing null dispatcher"; |
381 dispatchers->push_back(nullptr); | 383 dispatchers->push_back(nullptr); |
382 } | 384 } |
383 } | 385 } |
384 message->SetDispatchers(std::move(dispatchers)); | 386 message->SetDispatchers(std::move(dispatchers)); |
385 return MOJO_RESULT_OK; | 387 return MOJO_RESULT_OK; |
386 } | 388 } |
387 | 389 |
388 } // namespace system | 390 } // namespace system |
389 } // namespace mojo | 391 } // namespace mojo |
OLD | NEW |