Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(47)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 664763002: Mojo: Change the way message pipes are passed over channels. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git/+/master
Patch Set: Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/message_in_transit.cc ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "mojo/edk/system/channel.h" 8 #include "mojo/edk/system/channel.h"
9 #include "mojo/edk/system/channel_endpoint.h" 9 #include "mojo/edk/system/channel_endpoint.h"
10 #include "mojo/edk/system/channel_endpoint_id.h" 10 #include "mojo/edk/system/channel_endpoint_id.h"
11 #include "mojo/edk/system/constants.h" 11 #include "mojo/edk/system/constants.h"
12 #include "mojo/edk/system/local_message_pipe_endpoint.h" 12 #include "mojo/edk/system/local_message_pipe_endpoint.h"
13 #include "mojo/edk/system/memory.h" 13 #include "mojo/edk/system/memory.h"
14 #include "mojo/edk/system/message_pipe.h" 14 #include "mojo/edk/system/message_pipe.h"
15 #include "mojo/edk/system/options_validation.h" 15 #include "mojo/edk/system/options_validation.h"
16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" 16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h"
17 17
18 namespace mojo { 18 namespace mojo {
19 namespace system { 19 namespace system {
20 20
21 namespace { 21 namespace {
22 22
23 const unsigned kInvalidPort = static_cast<unsigned>(-1); 23 const unsigned kInvalidPort = static_cast<unsigned>(-1);
24 24
25 struct SerializedMessagePipeDispatcher { 25 struct SerializedMessagePipeDispatcher {
26 ChannelEndpointId endpoint_id; 26 // This is the endpoint ID on the receiving side, and should be a "remote ID".
27 // (The receiving side should have already have an endpoint attached and run
28 // via the |Channel|s. This endpoint will have both IDs assigned, so this ID
29 // is only needed to associated that endpoint with a particular dispatcher.)
30 ChannelEndpointId receiver_endpoint_id;
27 }; 31 };
28 32
29 } // namespace 33 } // namespace
30 34
31 // MessagePipeDispatcher ------------------------------------------------------- 35 // MessagePipeDispatcher -------------------------------------------------------
32 36
33 // static 37 // static
34 const MojoCreateMessagePipeOptions 38 const MojoCreateMessagePipeOptions
35 MessagePipeDispatcher::kDefaultCreateOptions = { 39 MessagePipeDispatcher::kDefaultCreateOptions = {
36 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), 40 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
97 // static 101 // static
98 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( 102 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
99 Channel* channel, 103 Channel* channel,
100 const void* source, 104 const void* source,
101 size_t size) { 105 size_t size) {
102 if (size != sizeof(SerializedMessagePipeDispatcher)) { 106 if (size != sizeof(SerializedMessagePipeDispatcher)) {
103 LOG(ERROR) << "Invalid serialized message pipe dispatcher"; 107 LOG(ERROR) << "Invalid serialized message pipe dispatcher";
104 return scoped_refptr<MessagePipeDispatcher>(); 108 return scoped_refptr<MessagePipeDispatcher>();
105 } 109 }
106 110
107 scoped_refptr<ChannelEndpoint> channel_endpoint; 111 const SerializedMessagePipeDispatcher* s =
108 scoped_refptr<MessagePipeDispatcher> dispatcher = 112 static_cast<const SerializedMessagePipeDispatcher*>(source);
109 CreateRemoteMessagePipe(&channel_endpoint); 113 scoped_refptr<MessagePipe> message_pipe =
110 114 channel->PassIncomingMessagePipe(s->receiver_endpoint_id);
111 ChannelEndpointId remote_id = 115 if (!message_pipe.get()) {
112 static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id; 116 LOG(ERROR) << "Failed to deserialize message pipe dispatcher (ID = "
113 if (!remote_id.is_valid()) { 117 << s->receiver_endpoint_id << ")";
114 // This means that the other end was closed, and there were no messages
115 // enqueued for us.
116 // TODO(vtl): This is wrong. We should produce a "dead" message pipe
117 // dispatcher.
118 NOTIMPLEMENTED();
119 return scoped_refptr<MessagePipeDispatcher>(); 118 return scoped_refptr<MessagePipeDispatcher>();
120 } 119 }
121 ChannelEndpointId local_id = channel->AttachEndpoint(channel_endpoint);
122 if (!local_id.is_valid()) {
123 LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to "
124 "attach; remote ID = " << remote_id << ")";
125 return scoped_refptr<MessagePipeDispatcher>();
126 }
127 DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = " << remote_id
128 << ", new local ID = " << local_id << ")";
129 120
130 channel->RunEndpoint(channel_endpoint, remote_id); 121 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = "
131 122 << s->receiver_endpoint_id << ")";
132 // TODO(vtl): FIXME -- Need some error handling here. 123 scoped_refptr<MessagePipeDispatcher> dispatcher(
133 channel->RunRemoteMessagePipeEndpoint(local_id, remote_id); 124 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
125 dispatcher->Init(message_pipe, 0);
134 return dispatcher; 126 return dispatcher;
135 } 127 }
136 128
137 MessagePipeDispatcher::~MessagePipeDispatcher() { 129 MessagePipeDispatcher::~MessagePipeDispatcher() {
138 // |Close()|/|CloseImplNoLock()| should have taken care of the pipe. 130 // |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
139 DCHECK(!message_pipe_.get()); 131 DCHECK(!message_pipe_.get());
140 } 132 }
141 133
142 MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const { 134 MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const {
143 lock().AssertAcquired(); 135 lock().AssertAcquired();
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
236 *max_platform_handles = 0; 228 *max_platform_handles = 0;
237 } 229 }
238 230
239 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( 231 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
240 Channel* channel, 232 Channel* channel,
241 void* destination, 233 void* destination,
242 size_t* actual_size, 234 size_t* actual_size,
243 embedder::PlatformHandleVector* /*platform_handles*/) { 235 embedder::PlatformHandleVector* /*platform_handles*/) {
244 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. 236 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
245 237
238 SerializedMessagePipeDispatcher* s =
239 static_cast<SerializedMessagePipeDispatcher*>(destination);
240
246 // Convert the local endpoint to a proxy endpoint (moving the message queue) 241 // Convert the local endpoint to a proxy endpoint (moving the message queue)
247 // and attach it to the channel. 242 // and attach it to the channel.
248 ChannelEndpointId endpoint_id = 243 s->receiver_endpoint_id = channel->AttachAndRunEndpoint(
249 channel->AttachEndpoint(message_pipe_->ConvertLocalToProxy(port_)); 244 message_pipe_->ConvertLocalToProxy(port_), false);
250 // Note: It's okay to get an invalid endpoint ID. (It's possible that the 245 DVLOG(2) << "Serializing message pipe dispatcher (remote ID = "
251 // other endpoint -- the one that we're not sending -- was closed in the 246 << s->receiver_endpoint_id << ")";
252 // intervening time.) In that case, we need to deserialize a "dead" message
253 // pipe dispatcher on the other end. (Note that this is different from just
254 // producing |MOJO_HANDLE_INVALID|.)
255 DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id
256 << ")";
257
258 // We now have a local ID. Before we can run the proxy endpoint, we need to
259 // get an ack back from the other side with the remote ID.
260 static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id =
261 endpoint_id;
262 247
263 message_pipe_ = nullptr; 248 message_pipe_ = nullptr;
264 port_ = kInvalidPort; 249 port_ = kInvalidPort;
265 250
266 *actual_size = sizeof(SerializedMessagePipeDispatcher); 251 *actual_size = sizeof(SerializedMessagePipeDispatcher);
267 return true; 252 return true;
268 } 253 }
269 254
270 // MessagePipeDispatcherTransport ---------------------------------------------- 255 // MessagePipeDispatcherTransport ----------------------------------------------
271 256
272 MessagePipeDispatcherTransport::MessagePipeDispatcherTransport( 257 MessagePipeDispatcherTransport::MessagePipeDispatcherTransport(
273 DispatcherTransport transport) 258 DispatcherTransport transport)
274 : DispatcherTransport(transport) { 259 : DispatcherTransport(transport) {
275 DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe); 260 DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe);
276 } 261 }
277 262
278 } // namespace system 263 } // namespace system
279 } // namespace mojo 264 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/message_in_transit.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698