Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(251)

Side by Side Diff: mojo/edk/system/message_pipe.cc

Issue 799113004: Update mojo sdk to rev 59145288bae55b0fce4276b017df6a1117bcf00f (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: add mojo's ply to checklicenses whitelist Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/message_pipe.h ('k') | mojo/edk/system/message_pipe_test_utils.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe.h" 5 #include "mojo/edk/system/message_pipe.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "mojo/edk/system/channel.h" 8 #include "mojo/edk/system/channel.h"
9 #include "mojo/edk/system/channel_endpoint.h" 9 #include "mojo/edk/system/channel_endpoint.h"
10 #include "mojo/edk/system/channel_endpoint_id.h" 10 #include "mojo/edk/system/channel_endpoint_id.h"
11 #include "mojo/edk/system/endpoint_relayer.h" 11 #include "mojo/edk/system/endpoint_relayer.h"
12 #include "mojo/edk/system/incoming_endpoint.h"
12 #include "mojo/edk/system/local_message_pipe_endpoint.h" 13 #include "mojo/edk/system/local_message_pipe_endpoint.h"
13 #include "mojo/edk/system/message_in_transit.h" 14 #include "mojo/edk/system/message_in_transit.h"
14 #include "mojo/edk/system/message_pipe_dispatcher.h" 15 #include "mojo/edk/system/message_pipe_dispatcher.h"
15 #include "mojo/edk/system/message_pipe_endpoint.h" 16 #include "mojo/edk/system/message_pipe_endpoint.h"
16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" 17 #include "mojo/edk/system/proxy_message_pipe_endpoint.h"
17 18
18 namespace mojo { 19 namespace mojo {
19 namespace system { 20 namespace system {
20 21
21 namespace {
22
23 // TODO(vtl): Move this into |Channel| (and possible further).
24 struct SerializedMessagePipe {
25 // This is the endpoint ID on the receiving side, and should be a "remote ID".
26 // (The receiving side should already have had an endpoint attached and been
27 // run via the |Channel|s. This endpoint will have both IDs assigned, so this
28 // ID is only needed to associate that endpoint with a particular dispatcher.)
29 ChannelEndpointId receiver_endpoint_id;
30 };
31
32 } // namespace
33
34 // static 22 // static
35 MessagePipe* MessagePipe::CreateLocalLocal() { 23 MessagePipe* MessagePipe::CreateLocalLocal() {
36 MessagePipe* message_pipe = new MessagePipe(); 24 MessagePipe* message_pipe = new MessagePipe();
37 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); 25 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
38 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); 26 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
39 return message_pipe; 27 return message_pipe;
40 } 28 }
41 29
42 // static 30 // static
43 MessagePipe* MessagePipe::CreateLocalProxy( 31 MessagePipe* MessagePipe::CreateLocalProxy(
44 scoped_refptr<ChannelEndpoint>* channel_endpoint) { 32 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
45 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. 33 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
46 MessagePipe* message_pipe = new MessagePipe(); 34 MessagePipe* message_pipe = new MessagePipe();
47 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); 35 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
48 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); 36 *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
49 message_pipe->endpoints_[1].reset( 37 message_pipe->endpoints_[1].reset(
50 new ProxyMessagePipeEndpoint(channel_endpoint->get())); 38 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
51 return message_pipe; 39 return message_pipe;
52 } 40 }
53 41
54 // static 42 // static
43 MessagePipe* MessagePipe::CreateLocalProxyFromExisting(
44 MessageInTransitQueue* message_queue,
45 ChannelEndpoint* channel_endpoint) {
46 DCHECK(message_queue);
47 MessagePipe* message_pipe = new MessagePipe();
48 message_pipe->endpoints_[0].reset(
49 new LocalMessagePipeEndpoint(message_queue));
50 if (channel_endpoint) {
51 bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1);
52 message_pipe->endpoints_[1].reset(
53 new ProxyMessagePipeEndpoint(channel_endpoint));
54 if (!attached_to_channel)
55 message_pipe->OnDetachFromChannel(1);
56 } else {
57 // This means that the proxy side was already closed; we only need to inform
58 // the local side of this.
59 // TODO(vtl): This is safe to do without locking (but perhaps slightly
60 // dubious), since no other thread has access to |message_pipe| yet.
61 message_pipe->endpoints_[0]->OnPeerClose();
62 }
63 return message_pipe;
64 }
65
66 // static
55 MessagePipe* MessagePipe::CreateProxyLocal( 67 MessagePipe* MessagePipe::CreateProxyLocal(
56 scoped_refptr<ChannelEndpoint>* channel_endpoint) { 68 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
57 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. 69 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
58 MessagePipe* message_pipe = new MessagePipe(); 70 MessagePipe* message_pipe = new MessagePipe();
59 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); 71 *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
60 message_pipe->endpoints_[0].reset( 72 message_pipe->endpoints_[0].reset(
61 new ProxyMessagePipeEndpoint(channel_endpoint->get())); 73 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
62 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); 74 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
63 return message_pipe; 75 return message_pipe;
64 } 76 }
65 77
66 // static 78 // static
67 unsigned MessagePipe::GetPeerPort(unsigned port) { 79 unsigned MessagePipe::GetPeerPort(unsigned port) {
68 DCHECK(port == 0 || port == 1); 80 DCHECK(port == 0 || port == 1);
69 return port ^ 1; 81 return port ^ 1;
70 } 82 }
71 83
72 // static 84 // static
73 bool MessagePipe::Deserialize(Channel* channel, 85 bool MessagePipe::Deserialize(Channel* channel,
74 const void* source, 86 const void* source,
75 size_t size, 87 size_t size,
76 scoped_refptr<MessagePipe>* message_pipe, 88 scoped_refptr<MessagePipe>* message_pipe,
77 unsigned* port) { 89 unsigned* port) {
78 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. 90 DCHECK(!*message_pipe); // Not technically wrong, but unlikely.
79 91
80 if (size != sizeof(SerializedMessagePipe)) { 92 if (size != channel->GetSerializedEndpointSize()) {
81 LOG(ERROR) << "Invalid serialized message pipe"; 93 LOG(ERROR) << "Invalid serialized message pipe";
82 return false; 94 return false;
83 } 95 }
84 96
85 const SerializedMessagePipe* s = 97 scoped_refptr<IncomingEndpoint> incoming_endpoint =
86 static_cast<const SerializedMessagePipe*>(source); 98 channel->DeserializeEndpoint(source);
87 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); 99 if (!incoming_endpoint)
88 if (!*message_pipe) {
89 LOG(ERROR) << "Failed to deserialize message pipe (ID = "
90 << s->receiver_endpoint_id << ")";
91 return false; 100 return false;
92 }
93 101
94 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = " 102 *message_pipe = incoming_endpoint->ConvertToMessagePipe();
95 << s->receiver_endpoint_id << ")"; 103 DCHECK(*message_pipe);
96 *port = 0; 104 *port = 0;
97 return true; 105 return true;
98 } 106 }
99 107
100 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { 108 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
101 DCHECK(port == 0 || port == 1); 109 DCHECK(port == 0 || port == 1);
102 base::AutoLock locker(lock_); 110 base::AutoLock locker(lock_);
103 DCHECK(endpoints_[port]); 111 DCHECK(endpoints_[port]);
104 112
105 return endpoints_[port]->GetType(); 113 return endpoints_[port]->GetType();
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after
193 HandleSignalsState* signals_state) { 201 HandleSignalsState* signals_state) {
194 DCHECK(port == 0 || port == 1); 202 DCHECK(port == 0 || port == 1);
195 203
196 base::AutoLock locker(lock_); 204 base::AutoLock locker(lock_);
197 DCHECK(endpoints_[port]); 205 DCHECK(endpoints_[port]);
198 206
199 endpoints_[port]->RemoveAwakable(awakable, signals_state); 207 endpoints_[port]->RemoveAwakable(awakable, signals_state);
200 } 208 }
201 209
202 void MessagePipe::StartSerialize(unsigned /*port*/, 210 void MessagePipe::StartSerialize(unsigned /*port*/,
203 Channel* /*channel*/, 211 Channel* channel,
204 size_t* max_size, 212 size_t* max_size,
205 size_t* max_platform_handles) { 213 size_t* max_platform_handles) {
206 *max_size = sizeof(SerializedMessagePipe); 214 *max_size = channel->GetSerializedEndpointSize();
207 *max_platform_handles = 0; 215 *max_platform_handles = 0;
208 } 216 }
209 217
210 bool MessagePipe::EndSerialize( 218 bool MessagePipe::EndSerialize(
211 unsigned port, 219 unsigned port,
212 Channel* channel, 220 Channel* channel,
213 void* destination, 221 void* destination,
214 size_t* actual_size, 222 size_t* actual_size,
215 embedder::PlatformHandleVector* /*platform_handles*/) { 223 embedder::PlatformHandleVector* /*platform_handles*/) {
216 DCHECK(port == 0 || port == 1); 224 DCHECK(port == 0 || port == 1);
(...skipping 25 matching lines...) Expand all
242 // port's message pipe dispatcher will continue to hold a reference to 250 // port's message pipe dispatcher will continue to hold a reference to
243 // us. 251 // us.
244 // 252 //
245 // 3. The peer port is remote. 253 // 3. The peer port is remote.
246 // 254 //
247 // We also pass its |ChannelEndpoint| to the channel, which then decides 255 // We also pass its |ChannelEndpoint| to the channel, which then decides
248 // what to do. We have no reason to continue to exist. 256 // what to do. We have no reason to continue to exist.
249 // 257 //
250 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|). 258 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|).
251 259
260 // The replacement for |endpoints_[port]|, if any.
261 MessagePipeEndpoint* replacement_endpoint = nullptr;
262
252 unsigned peer_port = GetPeerPort(port); 263 unsigned peer_port = GetPeerPort(port);
253 if (!endpoints_[peer_port]) { 264 if (!endpoints_[peer_port]) { // Case 1.
254 // Case 1.
255 channel_endpoint = new ChannelEndpoint( 265 channel_endpoint = new ChannelEndpoint(
256 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( 266 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
257 endpoints_[port].get())->message_queue()); 267 endpoints_[port].get())->message_queue());
258 endpoints_[port]->Close();
259 endpoints_[port].reset();
260 } else if (endpoints_[peer_port]->GetType() == 268 } else if (endpoints_[peer_port]->GetType() ==
261 MessagePipeEndpoint::kTypeLocal) { 269 MessagePipeEndpoint::kTypeLocal) { // Case 2.
262 // Case 2.
263 channel_endpoint = new ChannelEndpoint( 270 channel_endpoint = new ChannelEndpoint(
264 this, port, static_cast<LocalMessagePipeEndpoint*>( 271 this, port, static_cast<LocalMessagePipeEndpoint*>(
265 endpoints_[port].get())->message_queue()); 272 endpoints_[port].get())->message_queue());
266 endpoints_[port]->Close(); 273 replacement_endpoint =
267 endpoints_[port].reset( 274 new ProxyMessagePipeEndpoint(channel_endpoint.get());
268 new ProxyMessagePipeEndpoint(channel_endpoint.get())); 275 } else { // Case 3.
269 } else {
270 // Case 3.
271 DLOG(WARNING) << "Direct message pipe passing across multiple channels " 276 DLOG(WARNING) << "Direct message pipe passing across multiple channels "
272 "not yet implemented; will proxy"; 277 "not yet implemented; will proxy";
273 278
274 // Create an |EndpointRelayer| to replace ourselves (rather than having a 279 // Create an |EndpointRelayer| to replace ourselves (rather than having a
275 // |MessagePipe| object that exists solely to relay messages between two 280 // |MessagePipe| object that exists solely to relay messages between two
276 // |ChannelEndpoint|s, owned by the |Channel| through them. 281 // |ChannelEndpoint|s, owned by the |Channel| through them.
277 // 282 //
278 // This reduces overhead somewhat, and more importantly restores some 283 // This reduces overhead somewhat, and more importantly restores some
279 // invariants, e.g., that |MessagePipe|s are owned by dispatchers. 284 // invariants, e.g., that |MessagePipe|s are owned by dispatchers.
280 // 285 //
(...skipping 14 matching lines...) Expand all
295 300
296 scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer()); 301 scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer());
297 // We'll assign our peer port's endpoint to the relayer's port 1, and this 302 // We'll assign our peer port's endpoint to the relayer's port 1, and this
298 // port's endpoint to the relayer's port 0. 303 // port's endpoint to the relayer's port 0.
299 channel_endpoint = new ChannelEndpoint( 304 channel_endpoint = new ChannelEndpoint(
300 relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>( 305 relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>(
301 endpoints_[port].get())->message_queue()); 306 endpoints_[port].get())->message_queue());
302 relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get()); 307 relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get());
303 peer_channel_endpoint->ReplaceClient(relayer.get(), 1); 308 peer_channel_endpoint->ReplaceClient(relayer.get(), 1);
304 309
305 endpoints_[port]->Close();
306 endpoints_[port].reset();
307 // No need to call |Close()| after |ReleaseChannelEndpoint()|. 310 // No need to call |Close()| after |ReleaseChannelEndpoint()|.
308 endpoints_[peer_port].reset(); 311 endpoints_[peer_port].reset();
309 } 312 }
313
314 endpoints_[port]->Close();
315 endpoints_[port].reset(replacement_endpoint);
310 } 316 }
311 317
312 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); 318 // TODO(vtl): More/most of the above should be moved into (some variant of)
313 319 // |Channel::SerializeEndpoint()|.
314 // Convert the local endpoint to a proxy endpoint (moving the message queue) 320 channel->SerializeEndpoint(channel_endpoint, destination);
315 // and attach it to the channel. 321 *actual_size = channel->GetSerializedEndpointSize();
316 s->receiver_endpoint_id =
317 channel->AttachAndRunEndpoint(channel_endpoint, false);
318 DVLOG(2) << "Serializing message pipe (remote ID = "
319 << s->receiver_endpoint_id << ")";
320 *actual_size = sizeof(SerializedMessagePipe);
321 return true; 322 return true;
322 } 323 }
323 324
324 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { 325 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) {
325 base::AutoLock locker(lock_); 326 base::AutoLock locker(lock_);
326 327
327 if (!endpoints_[port]) { 328 if (!endpoints_[port]) {
328 // This will happen only on the rare occasion that the call to 329 // This will happen only on the rare occasion that the call to
329 // |OnReadMessage()| is racing with us calling 330 // |OnReadMessage()| is racing with us calling
330 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, 331 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message,
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
423 LOG(WARNING) << "Enqueueing null dispatcher"; 424 LOG(WARNING) << "Enqueueing null dispatcher";
424 dispatchers->push_back(nullptr); 425 dispatchers->push_back(nullptr);
425 } 426 }
426 } 427 }
427 message->SetDispatchers(dispatchers.Pass()); 428 message->SetDispatchers(dispatchers.Pass());
428 return MOJO_RESULT_OK; 429 return MOJO_RESULT_OK;
429 } 430 }
430 431
431 } // namespace system 432 } // namespace system
432 } // namespace mojo 433 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/message_pipe.h ('k') | mojo/edk/system/message_pipe_test_utils.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698