Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2131)

Side by Side Diff: mojo/edk/system/message_pipe.cc

Issue 1396783004: Convert mojo::system::ChannelEndpointClient to use our new refcounting stuff (instead of base's). (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/message_pipe.h ('k') | mojo/edk/system/message_pipe_dispatcher.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe.h" 5 #include "mojo/edk/system/message_pipe.h"
6 6
7 #include <memory> 7 #include <memory>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/logging.h" 10 #include "base/logging.h"
11 #include "mojo/edk/system/channel.h" 11 #include "mojo/edk/system/channel.h"
12 #include "mojo/edk/system/channel_endpoint.h" 12 #include "mojo/edk/system/channel_endpoint.h"
13 #include "mojo/edk/system/channel_endpoint_id.h" 13 #include "mojo/edk/system/channel_endpoint_id.h"
14 #include "mojo/edk/system/incoming_endpoint.h" 14 #include "mojo/edk/system/incoming_endpoint.h"
15 #include "mojo/edk/system/local_message_pipe_endpoint.h" 15 #include "mojo/edk/system/local_message_pipe_endpoint.h"
16 #include "mojo/edk/system/message_in_transit.h" 16 #include "mojo/edk/system/message_in_transit.h"
17 #include "mojo/edk/system/message_pipe_dispatcher.h" 17 #include "mojo/edk/system/message_pipe_dispatcher.h"
18 #include "mojo/edk/system/message_pipe_endpoint.h" 18 #include "mojo/edk/system/message_pipe_endpoint.h"
19 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" 19 #include "mojo/edk/system/proxy_message_pipe_endpoint.h"
20 #include "mojo/edk/util/make_unique.h" 20 #include "mojo/edk/util/make_unique.h"
21 21
22 namespace mojo { 22 namespace mojo {
23 namespace system { 23 namespace system {
24 24
25 // static 25 // static
26 MessagePipe* MessagePipe::CreateLocalLocal() MOJO_NO_THREAD_SAFETY_ANALYSIS { 26 RefPtr<MessagePipe> MessagePipe::CreateLocalLocal()
27 MessagePipe* message_pipe = new MessagePipe(); 27 MOJO_NO_THREAD_SAFETY_ANALYSIS {
28 RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe());
28 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); 29 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
29 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); 30 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
30 return message_pipe; 31 return message_pipe;
31 } 32 }
32 33
33 // static 34 // static
34 MessagePipe* MessagePipe::CreateLocalProxy( 35 RefPtr<MessagePipe> MessagePipe::CreateLocalProxy(
35 RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { 36 RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS {
36 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. 37 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
37 MessagePipe* message_pipe = new MessagePipe(); 38 RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe());
38 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); 39 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
39 *channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe, 1); 40 *channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe.Clone(), 1);
40 message_pipe->endpoints_[1].reset( 41 message_pipe->endpoints_[1].reset(
41 new ProxyMessagePipeEndpoint(channel_endpoint->Clone())); 42 new ProxyMessagePipeEndpoint(channel_endpoint->Clone()));
42 return message_pipe; 43 return message_pipe;
43 } 44 }
44 45
45 // static 46 // static
46 MessagePipe* MessagePipe::CreateLocalProxyFromExisting( 47 RefPtr<MessagePipe> MessagePipe::CreateLocalProxyFromExisting(
47 MessageInTransitQueue* message_queue, 48 MessageInTransitQueue* message_queue,
48 RefPtr<ChannelEndpoint>&& channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { 49 RefPtr<ChannelEndpoint>&& channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS {
49 DCHECK(message_queue); 50 DCHECK(message_queue);
50 MessagePipe* message_pipe = new MessagePipe(); 51 RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe());
51 message_pipe->endpoints_[0].reset( 52 message_pipe->endpoints_[0].reset(
52 new LocalMessagePipeEndpoint(message_queue)); 53 new LocalMessagePipeEndpoint(message_queue));
53 if (channel_endpoint) { 54 if (channel_endpoint) {
54 bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1); 55 bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1);
55 message_pipe->endpoints_[1].reset( 56 message_pipe->endpoints_[1].reset(
56 new ProxyMessagePipeEndpoint(std::move(channel_endpoint))); 57 new ProxyMessagePipeEndpoint(std::move(channel_endpoint)));
57 if (!attached_to_channel) 58 if (!attached_to_channel)
58 message_pipe->OnDetachFromChannel(1); 59 message_pipe->OnDetachFromChannel(1);
59 } else { 60 } else {
60 // This means that the proxy side was already closed; we only need to inform 61 // This means that the proxy side was already closed; we only need to inform
61 // the local side of this. 62 // the local side of this.
62 // TODO(vtl): This is safe to do without locking (but perhaps slightly 63 // TODO(vtl): This is safe to do without locking (but perhaps slightly
63 // dubious), since no other thread has access to |message_pipe| yet. 64 // dubious), since no other thread has access to |message_pipe| yet.
64 message_pipe->endpoints_[0]->OnPeerClose(); 65 message_pipe->endpoints_[0]->OnPeerClose();
65 } 66 }
66 return message_pipe; 67 return message_pipe;
67 } 68 }
68 69
69 // static 70 // static
70 MessagePipe* MessagePipe::CreateProxyLocal( 71 RefPtr<MessagePipe> MessagePipe::CreateProxyLocal(
71 RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS { 72 RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS {
72 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. 73 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
73 MessagePipe* message_pipe = new MessagePipe(); 74 RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe());
74 *channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe, 0); 75 *channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe, 0);
75 message_pipe->endpoints_[0].reset( 76 message_pipe->endpoints_[0].reset(
76 new ProxyMessagePipeEndpoint(channel_endpoint->Clone())); 77 new ProxyMessagePipeEndpoint(channel_endpoint->Clone()));
77 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); 78 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
78 return message_pipe; 79 return message_pipe;
79 } 80 }
80 81
81 // static 82 // static
82 unsigned MessagePipe::GetPeerPort(unsigned port) { 83 unsigned MessagePipe::GetPeerPort(unsigned port) {
83 DCHECK(port == 0 || port == 1); 84 DCHECK(port == 0 || port == 1);
84 return port ^ 1; 85 return port ^ 1;
85 } 86 }
86 87
87 // static 88 // static
88 bool MessagePipe::Deserialize(Channel* channel, 89 bool MessagePipe::Deserialize(Channel* channel,
89 const void* source, 90 const void* source,
90 size_t size, 91 size_t size,
91 scoped_refptr<MessagePipe>* message_pipe, 92 RefPtr<MessagePipe>* message_pipe,
92 unsigned* port) { 93 unsigned* port) {
93 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. 94 DCHECK(!*message_pipe); // Not technically wrong, but unlikely.
94 95
95 if (size != channel->GetSerializedEndpointSize()) { 96 if (size != channel->GetSerializedEndpointSize()) {
96 LOG(ERROR) << "Invalid serialized message pipe"; 97 LOG(ERROR) << "Invalid serialized message pipe";
97 return false; 98 return false;
98 } 99 }
99 100
100 scoped_refptr<IncomingEndpoint> incoming_endpoint = 101 RefPtr<IncomingEndpoint> incoming_endpoint =
101 channel->DeserializeEndpoint(source); 102 channel->DeserializeEndpoint(source);
102 if (!incoming_endpoint) 103 if (!incoming_endpoint)
103 return false; 104 return false;
104 105
105 *message_pipe = incoming_endpoint->ConvertToMessagePipe(); 106 *message_pipe = incoming_endpoint->ConvertToMessagePipe();
106 DCHECK(*message_pipe); 107 DCHECK(*message_pipe);
107 *port = 0; 108 *port = 0;
108 return true; 109 return true;
109 } 110 }
110 111
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
244 if (!endpoints_[peer_port]) { 245 if (!endpoints_[peer_port]) {
245 // Case 1: (known-)closed peer port. There's no reason for us to continue to 246 // Case 1: (known-)closed peer port. There's no reason for us to continue to
246 // exist afterwards. 247 // exist afterwards.
247 channel->SerializeEndpointWithClosedPeer(destination, message_queue); 248 channel->SerializeEndpointWithClosedPeer(destination, message_queue);
248 } else if (endpoints_[peer_port]->GetType() == 249 } else if (endpoints_[peer_port]->GetType() ==
249 MessagePipeEndpoint::kTypeLocal) { 250 MessagePipeEndpoint::kTypeLocal) {
250 // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint| 251 // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint|
251 // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that 252 // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that
252 // the |Channel| returns to us. 253 // the |Channel| returns to us.
253 RefPtr<ChannelEndpoint> channel_endpoint = 254 RefPtr<ChannelEndpoint> channel_endpoint =
254 channel->SerializeEndpointWithLocalPeer(destination, message_queue, 255 channel->SerializeEndpointWithLocalPeer(
255 this, port); 256 destination, message_queue, RefPtr<ChannelEndpointClient>(this),
257 port);
256 replacement_endpoint = 258 replacement_endpoint =
257 new ProxyMessagePipeEndpoint(std::move(channel_endpoint)); 259 new ProxyMessagePipeEndpoint(std::move(channel_endpoint));
258 } else { 260 } else {
259 // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and 261 // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and
260 // pass it to the |Channel|. There's no reason for us to continue to exist 262 // pass it to the |Channel|. There's no reason for us to continue to exist
261 // afterwards. 263 // afterwards.
262 DCHECK_EQ(endpoints_[peer_port]->GetType(), 264 DCHECK_EQ(endpoints_[peer_port]->GetType(),
263 MessagePipeEndpoint::kTypeProxy); 265 MessagePipeEndpoint::kTypeProxy);
264 ProxyMessagePipeEndpoint* peer_endpoint = 266 ProxyMessagePipeEndpoint* peer_endpoint =
265 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); 267 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get());
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after
380 LOG(WARNING) << "Enqueueing null dispatcher"; 382 LOG(WARNING) << "Enqueueing null dispatcher";
381 dispatchers->push_back(nullptr); 383 dispatchers->push_back(nullptr);
382 } 384 }
383 } 385 }
384 message->SetDispatchers(std::move(dispatchers)); 386 message->SetDispatchers(std::move(dispatchers));
385 return MOJO_RESULT_OK; 387 return MOJO_RESULT_OK;
386 } 388 }
387 389
388 } // namespace system 390 } // namespace system
389 } // namespace mojo 391 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/message_pipe.h ('k') | mojo/edk/system/message_pipe_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698