OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe.h" | 5 #include "mojo/edk/system/message_pipe.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
8 #include "mojo/edk/system/channel.h" | 8 #include "mojo/edk/system/channel.h" |
9 #include "mojo/edk/system/channel_endpoint.h" | 9 #include "mojo/edk/system/channel_endpoint.h" |
10 #include "mojo/edk/system/channel_endpoint_id.h" | 10 #include "mojo/edk/system/channel_endpoint_id.h" |
11 #include "mojo/edk/system/endpoint_relayer.h" | |
12 #include "mojo/edk/system/incoming_endpoint.h" | 11 #include "mojo/edk/system/incoming_endpoint.h" |
13 #include "mojo/edk/system/local_message_pipe_endpoint.h" | 12 #include "mojo/edk/system/local_message_pipe_endpoint.h" |
14 #include "mojo/edk/system/message_in_transit.h" | 13 #include "mojo/edk/system/message_in_transit.h" |
15 #include "mojo/edk/system/message_pipe_dispatcher.h" | 14 #include "mojo/edk/system/message_pipe_dispatcher.h" |
16 #include "mojo/edk/system/message_pipe_endpoint.h" | 15 #include "mojo/edk/system/message_pipe_endpoint.h" |
17 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | 16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" |
18 | 17 |
19 namespace mojo { | 18 namespace mojo { |
20 namespace system { | 19 namespace system { |
21 | 20 |
(...skipping 194 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
216 } | 215 } |
217 | 216 |
218 bool MessagePipe::EndSerialize( | 217 bool MessagePipe::EndSerialize( |
219 unsigned port, | 218 unsigned port, |
220 Channel* channel, | 219 Channel* channel, |
221 void* destination, | 220 void* destination, |
222 size_t* actual_size, | 221 size_t* actual_size, |
223 embedder::PlatformHandleVector* /*platform_handles*/) { | 222 embedder::PlatformHandleVector* /*platform_handles*/) { |
224 DCHECK(port == 0 || port == 1); | 223 DCHECK(port == 0 || port == 1); |
225 | 224 |
226 scoped_refptr<ChannelEndpoint> channel_endpoint; | 225 base::AutoLock locker(lock_); |
227 { | 226 DCHECK(endpoints_[port]); |
228 base::AutoLock locker(lock_); | |
229 DCHECK(endpoints_[port]); | |
230 | 227 |
231 // The port being serialized must be local. | 228 // The port being serialized must be local. |
232 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); | 229 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); |
233 | 230 |
234 // There are three possibilities for the peer port (below). In all cases, we | 231 unsigned peer_port = GetPeerPort(port); |
235 // pass the contents of |port|'s message queue to the channel, and it'll | 232 MessageInTransitQueue* message_queue = |
236 // (presumably) make a |ChannelEndpoint| from it. | 233 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()) |
237 // | 234 ->message_queue(); |
238 // 1. The peer port is (known to be) closed. | 235 // The replacement for |endpoints_[port]|, if any. |
239 // | 236 MessagePipeEndpoint* replacement_endpoint = nullptr; |
240 // There's no reason for us to continue to exist and no need for the | |
241 // channel to give us the |ChannelEndpoint|. It only remains for us to | |
242 // "close" |port|'s |LocalMessagePipeEndpoint| and prepare for | |
243 // destruction. | |
244 // | |
245 // 2. The peer port is local (the typical case). | |
246 // | |
247 // The channel gives us back a |ChannelEndpoint|, which we hook up to a | |
248 // |ProxyMessagePipeEndpoint| to replace |port|'s | |
249 // |LocalMessagePipeEndpoint|. We continue to exist, since the peer | |
250 // port's message pipe dispatcher will continue to hold a reference to | |
251 // us. | |
252 // | |
253 // 3. The peer port is remote. | |
254 // | |
255 // We also pass its |ChannelEndpoint| to the channel, which then decides | |
256 // what to do. We have no reason to continue to exist. | |
257 // | |
258 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|). | |
259 | 237 |
260 // The replacement for |endpoints_[port]|, if any. | 238 // The three cases below correspond to the ones described above |
261 MessagePipeEndpoint* replacement_endpoint = nullptr; | 239 // |Channel::SerializeEndpoint...()| (in channel.h). |
262 | 240 if (!endpoints_[peer_port]) { |
263 unsigned peer_port = GetPeerPort(port); | 241 // Case 1: (known-)closed peer port. There's no reason for us to continue to |
264 if (!endpoints_[peer_port]) { // Case 1. | 242 // exist afterwards. |
265 channel_endpoint = new ChannelEndpoint( | 243 channel->SerializeEndpointWithClosedPeer(destination, message_queue); |
266 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( | 244 } else if (endpoints_[peer_port]->GetType() == |
267 endpoints_[port].get())->message_queue()); | 245 MessagePipeEndpoint::kTypeLocal) { |
268 } else if (endpoints_[peer_port]->GetType() == | 246 // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint| |
269 MessagePipeEndpoint::kTypeLocal) { // Case 2. | 247 // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that |
270 channel_endpoint = new ChannelEndpoint( | 248 // the |Channel| returns to us. |
271 this, port, static_cast<LocalMessagePipeEndpoint*>( | 249 scoped_refptr<ChannelEndpoint> channel_endpoint = |
272 endpoints_[port].get())->message_queue()); | 250 channel->SerializeEndpointWithLocalPeer(destination, message_queue, |
273 replacement_endpoint = | 251 this, port); |
274 new ProxyMessagePipeEndpoint(channel_endpoint.get()); | 252 replacement_endpoint = new ProxyMessagePipeEndpoint(channel_endpoint.get()); |
275 } else { // Case 3. | 253 } else { |
276 DLOG(WARNING) << "Direct message pipe passing across multiple channels " | 254 // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and |
277 "not yet implemented; will proxy"; | 255 // pass it to the |Channel|. There's no reason for us to continue to exist |
278 | 256 // afterwards. |
279 // Create an |EndpointRelayer| to replace ourselves (rather than having a | 257 DCHECK_EQ(endpoints_[peer_port]->GetType(), |
280 // |MessagePipe| object that exists solely to relay messages between two | 258 MessagePipeEndpoint::kTypeProxy); |
281 // |ChannelEndpoint|s, owned by the |Channel| through them. | 259 ProxyMessagePipeEndpoint* peer_endpoint = |
282 // | 260 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); |
283 // This reduces overhead somewhat, and more importantly restores some | 261 scoped_refptr<ChannelEndpoint> peer_channel_endpoint = |
284 // invariants, e.g., that |MessagePipe|s are owned by dispatchers. | 262 peer_endpoint->ReleaseChannelEndpoint(); |
285 // | 263 channel->SerializeEndpointWithRemotePeer(destination, message_queue, |
286 // TODO(vtl): If we get the |Channel| to own/track the relayer directly, | 264 peer_channel_endpoint); |
287 // then possibly we could make |ChannelEndpoint|'s |client_| pointer a raw | 265 // No need to call |Close()| after |ReleaseChannelEndpoint()|. |
288 // pointer (and not have the |Channel| owning the relayer via its | 266 endpoints_[peer_port].reset(); |
289 // |ChannelEndpoint|s. | |
290 // | |
291 // TODO(vtl): This is not obviously the right place for (all of) this | |
292 // logic, nor is it obviously factored correctly. | |
293 | |
294 DCHECK_EQ(endpoints_[peer_port]->GetType(), | |
295 MessagePipeEndpoint::kTypeProxy); | |
296 ProxyMessagePipeEndpoint* peer_endpoint = | |
297 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); | |
298 scoped_refptr<ChannelEndpoint> peer_channel_endpoint = | |
299 peer_endpoint->ReleaseChannelEndpoint(); | |
300 | |
301 scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer()); | |
302 // We'll assign our peer port's endpoint to the relayer's port 1, and this | |
303 // port's endpoint to the relayer's port 0. | |
304 channel_endpoint = new ChannelEndpoint( | |
305 relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>( | |
306 endpoints_[port].get())->message_queue()); | |
307 relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get()); | |
308 peer_channel_endpoint->ReplaceClient(relayer.get(), 1); | |
309 | |
310 // No need to call |Close()| after |ReleaseChannelEndpoint()|. | |
311 endpoints_[peer_port].reset(); | |
312 } | |
313 | |
314 endpoints_[port]->Close(); | |
315 endpoints_[port].reset(replacement_endpoint); | |
316 } | 267 } |
317 | 268 |
318 // TODO(vtl): More/most of the above should be moved into (some variant of) | 269 endpoints_[port]->Close(); |
319 // |Channel::SerializeEndpoint()|. | 270 endpoints_[port].reset(replacement_endpoint); |
320 channel->SerializeEndpoint(channel_endpoint, destination); | 271 |
321 *actual_size = channel->GetSerializedEndpointSize(); | 272 *actual_size = channel->GetSerializedEndpointSize(); |
322 return true; | 273 return true; |
323 } | 274 } |
324 | 275 |
325 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { | 276 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { |
326 base::AutoLock locker(lock_); | 277 base::AutoLock locker(lock_); |
327 | 278 |
328 if (!endpoints_[port]) { | 279 if (!endpoints_[port]) { |
329 // This will happen only on the rare occasion that the call to | 280 // This will happen only on the rare occasion that the call to |
330 // |OnReadMessage()| is racing with us calling | 281 // |OnReadMessage()| is racing with us calling |
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
424 LOG(WARNING) << "Enqueueing null dispatcher"; | 375 LOG(WARNING) << "Enqueueing null dispatcher"; |
425 dispatchers->push_back(nullptr); | 376 dispatchers->push_back(nullptr); |
426 } | 377 } |
427 } | 378 } |
428 message->SetDispatchers(dispatchers.Pass()); | 379 message->SetDispatchers(dispatchers.Pass()); |
429 return MOJO_RESULT_OK; | 380 return MOJO_RESULT_OK; |
430 } | 381 } |
431 | 382 |
432 } // namespace system | 383 } // namespace system |
433 } // namespace mojo | 384 } // namespace mojo |
OLD | NEW |