OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe.h" | 5 #include "mojo/edk/system/message_pipe.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
8 #include "mojo/edk/system/channel.h" | 8 #include "mojo/edk/system/channel.h" |
9 #include "mojo/edk/system/channel_endpoint.h" | 9 #include "mojo/edk/system/channel_endpoint.h" |
10 #include "mojo/edk/system/channel_endpoint_id.h" | 10 #include "mojo/edk/system/channel_endpoint_id.h" |
11 #include "mojo/edk/system/endpoint_relayer.h" | 11 #include "mojo/edk/system/endpoint_relayer.h" |
| 12 #include "mojo/edk/system/incoming_endpoint.h" |
12 #include "mojo/edk/system/local_message_pipe_endpoint.h" | 13 #include "mojo/edk/system/local_message_pipe_endpoint.h" |
13 #include "mojo/edk/system/message_in_transit.h" | 14 #include "mojo/edk/system/message_in_transit.h" |
14 #include "mojo/edk/system/message_pipe_dispatcher.h" | 15 #include "mojo/edk/system/message_pipe_dispatcher.h" |
15 #include "mojo/edk/system/message_pipe_endpoint.h" | 16 #include "mojo/edk/system/message_pipe_endpoint.h" |
16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | 17 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" |
17 | 18 |
18 namespace mojo { | 19 namespace mojo { |
19 namespace system { | 20 namespace system { |
20 | 21 |
21 namespace { | |
22 | |
23 // TODO(vtl): Move this into |Channel| (and possible further). | |
24 struct SerializedMessagePipe { | |
25 // This is the endpoint ID on the receiving side, and should be a "remote ID". | |
26 // (The receiving side should already have had an endpoint attached and been | |
27 // run via the |Channel|s. This endpoint will have both IDs assigned, so this | |
28 // ID is only needed to associate that endpoint with a particular dispatcher.) | |
29 ChannelEndpointId receiver_endpoint_id; | |
30 }; | |
31 | |
32 } // namespace | |
33 | |
34 // static | 22 // static |
35 MessagePipe* MessagePipe::CreateLocalLocal() { | 23 MessagePipe* MessagePipe::CreateLocalLocal() { |
36 MessagePipe* message_pipe = new MessagePipe(); | 24 MessagePipe* message_pipe = new MessagePipe(); |
37 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 25 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
38 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 26 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
39 return message_pipe; | 27 return message_pipe; |
40 } | 28 } |
41 | 29 |
42 // static | 30 // static |
43 MessagePipe* MessagePipe::CreateLocalProxy( | 31 MessagePipe* MessagePipe::CreateLocalProxy( |
44 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | 32 scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
45 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. | 33 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
46 MessagePipe* message_pipe = new MessagePipe(); | 34 MessagePipe* message_pipe = new MessagePipe(); |
47 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 35 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
48 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); | 36 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); |
49 message_pipe->endpoints_[1].reset( | 37 message_pipe->endpoints_[1].reset( |
50 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | 38 new ProxyMessagePipeEndpoint(channel_endpoint->get())); |
51 return message_pipe; | 39 return message_pipe; |
52 } | 40 } |
53 | 41 |
54 // static | 42 // static |
| 43 MessagePipe* MessagePipe::CreateLocalProxyFromExisting( |
| 44 MessageInTransitQueue* message_queue, |
| 45 ChannelEndpoint* channel_endpoint) { |
| 46 DCHECK(message_queue); |
| 47 MessagePipe* message_pipe = new MessagePipe(); |
| 48 message_pipe->endpoints_[0].reset( |
| 49 new LocalMessagePipeEndpoint(message_queue)); |
| 50 if (channel_endpoint) { |
| 51 bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1); |
| 52 message_pipe->endpoints_[1].reset( |
| 53 new ProxyMessagePipeEndpoint(channel_endpoint)); |
| 54 if (!attached_to_channel) |
| 55 message_pipe->OnDetachFromChannel(1); |
| 56 } else { |
| 57 // This means that the proxy side was already closed; we only need to inform |
| 58 // the local side of this. |
| 59 // TODO(vtl): This is safe to do without locking (but perhaps slightly |
| 60 // dubious), since no other thread has access to |message_pipe| yet. |
| 61 message_pipe->endpoints_[0]->OnPeerClose(); |
| 62 } |
| 63 return message_pipe; |
| 64 } |
| 65 |
| 66 // static |
55 MessagePipe* MessagePipe::CreateProxyLocal( | 67 MessagePipe* MessagePipe::CreateProxyLocal( |
56 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | 68 scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
57 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. | 69 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
58 MessagePipe* message_pipe = new MessagePipe(); | 70 MessagePipe* message_pipe = new MessagePipe(); |
59 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); | 71 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); |
60 message_pipe->endpoints_[0].reset( | 72 message_pipe->endpoints_[0].reset( |
61 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | 73 new ProxyMessagePipeEndpoint(channel_endpoint->get())); |
62 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 74 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
63 return message_pipe; | 75 return message_pipe; |
64 } | 76 } |
65 | 77 |
66 // static | 78 // static |
67 unsigned MessagePipe::GetPeerPort(unsigned port) { | 79 unsigned MessagePipe::GetPeerPort(unsigned port) { |
68 DCHECK(port == 0 || port == 1); | 80 DCHECK(port == 0 || port == 1); |
69 return port ^ 1; | 81 return port ^ 1; |
70 } | 82 } |
71 | 83 |
72 // static | 84 // static |
73 bool MessagePipe::Deserialize(Channel* channel, | 85 bool MessagePipe::Deserialize(Channel* channel, |
74 const void* source, | 86 const void* source, |
75 size_t size, | 87 size_t size, |
76 scoped_refptr<MessagePipe>* message_pipe, | 88 scoped_refptr<MessagePipe>* message_pipe, |
77 unsigned* port) { | 89 unsigned* port) { |
78 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. | 90 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. |
79 | 91 |
80 if (size != sizeof(SerializedMessagePipe)) { | 92 if (size != channel->GetSerializedEndpointSize()) { |
81 LOG(ERROR) << "Invalid serialized message pipe"; | 93 LOG(ERROR) << "Invalid serialized message pipe"; |
82 return false; | 94 return false; |
83 } | 95 } |
84 | 96 |
85 const SerializedMessagePipe* s = | 97 scoped_refptr<IncomingEndpoint> incoming_endpoint = |
86 static_cast<const SerializedMessagePipe*>(source); | 98 channel->DeserializeEndpoint(source); |
87 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); | 99 if (!incoming_endpoint) |
88 if (!*message_pipe) { | |
89 LOG(ERROR) << "Failed to deserialize message pipe (ID = " | |
90 << s->receiver_endpoint_id << ")"; | |
91 return false; | 100 return false; |
92 } | |
93 | 101 |
94 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = " | 102 *message_pipe = incoming_endpoint->ConvertToMessagePipe(); |
95 << s->receiver_endpoint_id << ")"; | 103 DCHECK(*message_pipe); |
96 *port = 0; | 104 *port = 0; |
97 return true; | 105 return true; |
98 } | 106 } |
99 | 107 |
100 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { | 108 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { |
101 DCHECK(port == 0 || port == 1); | 109 DCHECK(port == 0 || port == 1); |
102 base::AutoLock locker(lock_); | 110 base::AutoLock locker(lock_); |
103 DCHECK(endpoints_[port]); | 111 DCHECK(endpoints_[port]); |
104 | 112 |
105 return endpoints_[port]->GetType(); | 113 return endpoints_[port]->GetType(); |
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
193 HandleSignalsState* signals_state) { | 201 HandleSignalsState* signals_state) { |
194 DCHECK(port == 0 || port == 1); | 202 DCHECK(port == 0 || port == 1); |
195 | 203 |
196 base::AutoLock locker(lock_); | 204 base::AutoLock locker(lock_); |
197 DCHECK(endpoints_[port]); | 205 DCHECK(endpoints_[port]); |
198 | 206 |
199 endpoints_[port]->RemoveAwakable(awakable, signals_state); | 207 endpoints_[port]->RemoveAwakable(awakable, signals_state); |
200 } | 208 } |
201 | 209 |
202 void MessagePipe::StartSerialize(unsigned /*port*/, | 210 void MessagePipe::StartSerialize(unsigned /*port*/, |
203 Channel* /*channel*/, | 211 Channel* channel, |
204 size_t* max_size, | 212 size_t* max_size, |
205 size_t* max_platform_handles) { | 213 size_t* max_platform_handles) { |
206 *max_size = sizeof(SerializedMessagePipe); | 214 *max_size = channel->GetSerializedEndpointSize(); |
207 *max_platform_handles = 0; | 215 *max_platform_handles = 0; |
208 } | 216 } |
209 | 217 |
210 bool MessagePipe::EndSerialize( | 218 bool MessagePipe::EndSerialize( |
211 unsigned port, | 219 unsigned port, |
212 Channel* channel, | 220 Channel* channel, |
213 void* destination, | 221 void* destination, |
214 size_t* actual_size, | 222 size_t* actual_size, |
215 embedder::PlatformHandleVector* /*platform_handles*/) { | 223 embedder::PlatformHandleVector* /*platform_handles*/) { |
216 DCHECK(port == 0 || port == 1); | 224 DCHECK(port == 0 || port == 1); |
(...skipping 25 matching lines...) Expand all Loading... |
242 // port's message pipe dispatcher will continue to hold a reference to | 250 // port's message pipe dispatcher will continue to hold a reference to |
243 // us. | 251 // us. |
244 // | 252 // |
245 // 3. The peer port is remote. | 253 // 3. The peer port is remote. |
246 // | 254 // |
247 // We also pass its |ChannelEndpoint| to the channel, which then decides | 255 // We also pass its |ChannelEndpoint| to the channel, which then decides |
248 // what to do. We have no reason to continue to exist. | 256 // what to do. We have no reason to continue to exist. |
249 // | 257 // |
250 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|). | 258 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|). |
251 | 259 |
| 260 // The replacement for |endpoints_[port]|, if any. |
| 261 MessagePipeEndpoint* replacement_endpoint = nullptr; |
| 262 |
252 unsigned peer_port = GetPeerPort(port); | 263 unsigned peer_port = GetPeerPort(port); |
253 if (!endpoints_[peer_port]) { | 264 if (!endpoints_[peer_port]) { // Case 1. |
254 // Case 1. | |
255 channel_endpoint = new ChannelEndpoint( | 265 channel_endpoint = new ChannelEndpoint( |
256 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( | 266 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( |
257 endpoints_[port].get())->message_queue()); | 267 endpoints_[port].get())->message_queue()); |
258 endpoints_[port]->Close(); | |
259 endpoints_[port].reset(); | |
260 } else if (endpoints_[peer_port]->GetType() == | 268 } else if (endpoints_[peer_port]->GetType() == |
261 MessagePipeEndpoint::kTypeLocal) { | 269 MessagePipeEndpoint::kTypeLocal) { // Case 2. |
262 // Case 2. | |
263 channel_endpoint = new ChannelEndpoint( | 270 channel_endpoint = new ChannelEndpoint( |
264 this, port, static_cast<LocalMessagePipeEndpoint*>( | 271 this, port, static_cast<LocalMessagePipeEndpoint*>( |
265 endpoints_[port].get())->message_queue()); | 272 endpoints_[port].get())->message_queue()); |
266 endpoints_[port]->Close(); | 273 replacement_endpoint = |
267 endpoints_[port].reset( | 274 new ProxyMessagePipeEndpoint(channel_endpoint.get()); |
268 new ProxyMessagePipeEndpoint(channel_endpoint.get())); | 275 } else { // Case 3. |
269 } else { | |
270 // Case 3. | |
271 DLOG(WARNING) << "Direct message pipe passing across multiple channels " | 276 DLOG(WARNING) << "Direct message pipe passing across multiple channels " |
272 "not yet implemented; will proxy"; | 277 "not yet implemented; will proxy"; |
273 | 278 |
274 // Create an |EndpointRelayer| to replace ourselves (rather than having a | 279 // Create an |EndpointRelayer| to replace ourselves (rather than having a |
275 // |MessagePipe| object that exists solely to relay messages between two | 280 // |MessagePipe| object that exists solely to relay messages between two |
276 // |ChannelEndpoint|s, owned by the |Channel| through them. | 281 // |ChannelEndpoint|s, owned by the |Channel| through them. |
277 // | 282 // |
278 // This reduces overhead somewhat, and more importantly restores some | 283 // This reduces overhead somewhat, and more importantly restores some |
279 // invariants, e.g., that |MessagePipe|s are owned by dispatchers. | 284 // invariants, e.g., that |MessagePipe|s are owned by dispatchers. |
280 // | 285 // |
(...skipping 14 matching lines...) Expand all Loading... |
295 | 300 |
296 scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer()); | 301 scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer()); |
297 // We'll assign our peer port's endpoint to the relayer's port 1, and this | 302 // We'll assign our peer port's endpoint to the relayer's port 1, and this |
298 // port's endpoint to the relayer's port 0. | 303 // port's endpoint to the relayer's port 0. |
299 channel_endpoint = new ChannelEndpoint( | 304 channel_endpoint = new ChannelEndpoint( |
300 relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>( | 305 relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>( |
301 endpoints_[port].get())->message_queue()); | 306 endpoints_[port].get())->message_queue()); |
302 relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get()); | 307 relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get()); |
303 peer_channel_endpoint->ReplaceClient(relayer.get(), 1); | 308 peer_channel_endpoint->ReplaceClient(relayer.get(), 1); |
304 | 309 |
305 endpoints_[port]->Close(); | |
306 endpoints_[port].reset(); | |
307 // No need to call |Close()| after |ReleaseChannelEndpoint()|. | 310 // No need to call |Close()| after |ReleaseChannelEndpoint()|. |
308 endpoints_[peer_port].reset(); | 311 endpoints_[peer_port].reset(); |
309 } | 312 } |
| 313 |
| 314 endpoints_[port]->Close(); |
| 315 endpoints_[port].reset(replacement_endpoint); |
310 } | 316 } |
311 | 317 |
312 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); | 318 // TODO(vtl): More/most of the above should be moved into (some variant of) |
313 | 319 // |Channel::SerializeEndpoint()|. |
314 // Convert the local endpoint to a proxy endpoint (moving the message queue) | 320 channel->SerializeEndpoint(channel_endpoint, destination); |
315 // and attach it to the channel. | 321 *actual_size = channel->GetSerializedEndpointSize(); |
316 s->receiver_endpoint_id = | |
317 channel->AttachAndRunEndpoint(channel_endpoint, false); | |
318 DVLOG(2) << "Serializing message pipe (remote ID = " | |
319 << s->receiver_endpoint_id << ")"; | |
320 *actual_size = sizeof(SerializedMessagePipe); | |
321 return true; | 322 return true; |
322 } | 323 } |
323 | 324 |
324 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { | 325 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { |
325 base::AutoLock locker(lock_); | 326 base::AutoLock locker(lock_); |
326 | 327 |
327 if (!endpoints_[port]) { | 328 if (!endpoints_[port]) { |
328 // This will happen only on the rare occasion that the call to | 329 // This will happen only on the rare occasion that the call to |
329 // |OnReadMessage()| is racing with us calling | 330 // |OnReadMessage()| is racing with us calling |
330 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, | 331 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
423 LOG(WARNING) << "Enqueueing null dispatcher"; | 424 LOG(WARNING) << "Enqueueing null dispatcher"; |
424 dispatchers->push_back(nullptr); | 425 dispatchers->push_back(nullptr); |
425 } | 426 } |
426 } | 427 } |
427 message->SetDispatchers(dispatchers.Pass()); | 428 message->SetDispatchers(dispatchers.Pass()); |
428 return MOJO_RESULT_OK; | 429 return MOJO_RESULT_OK; |
429 } | 430 } |
430 | 431 |
431 } // namespace system | 432 } // namespace system |
432 } // namespace mojo | 433 } // namespace mojo |
OLD | NEW |