Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(240)

Side by Side Diff: mojo/edk/system/message_pipe.cc

Issue 728133002: Update mojo sdk to rev e01f9a49449381a5eb430c1fd88bf2cae73ec35a (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: android + ios gyp fixes Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/message_pipe.h ('k') | mojo/edk/system/message_pipe_dispatcher.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe.h" 5 #include "mojo/edk/system/message_pipe.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "mojo/edk/system/channel.h"
8 #include "mojo/edk/system/channel_endpoint.h" 9 #include "mojo/edk/system/channel_endpoint.h"
10 #include "mojo/edk/system/channel_endpoint_id.h"
9 #include "mojo/edk/system/local_message_pipe_endpoint.h" 11 #include "mojo/edk/system/local_message_pipe_endpoint.h"
10 #include "mojo/edk/system/message_in_transit.h" 12 #include "mojo/edk/system/message_in_transit.h"
11 #include "mojo/edk/system/message_pipe_dispatcher.h" 13 #include "mojo/edk/system/message_pipe_dispatcher.h"
12 #include "mojo/edk/system/message_pipe_endpoint.h" 14 #include "mojo/edk/system/message_pipe_endpoint.h"
13 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" 15 #include "mojo/edk/system/proxy_message_pipe_endpoint.h"
14 16
15 namespace mojo { 17 namespace mojo {
16 namespace system { 18 namespace system {
17 19
20 namespace {
21
22 // TODO(vtl): Move this into |Channel| (and possible further).
23 struct SerializedMessagePipe {
24 // This is the endpoint ID on the receiving side, and should be a "remote ID".
25 // (The receiving side should already have had an endpoint attached and been
26 // run via the |Channel|s. This endpoint will have both IDs assigned, so this
27 // ID is only needed to associate that endpoint with a particular dispatcher.)
28 ChannelEndpointId receiver_endpoint_id;
29 };
30
31 } // namespace
32
18 // static 33 // static
19 MessagePipe* MessagePipe::CreateLocalLocal() { 34 MessagePipe* MessagePipe::CreateLocalLocal() {
20 MessagePipe* message_pipe = new MessagePipe(); 35 MessagePipe* message_pipe = new MessagePipe();
21 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); 36 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
22 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); 37 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
23 return message_pipe; 38 return message_pipe;
24 } 39 }
25 40
26 // static 41 // static
27 MessagePipe* MessagePipe::CreateLocalProxy( 42 MessagePipe* MessagePipe::CreateLocalProxy(
(...skipping 18 matching lines...) Expand all
46 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); 61 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
47 return message_pipe; 62 return message_pipe;
48 } 63 }
49 64
50 // static 65 // static
51 unsigned MessagePipe::GetPeerPort(unsigned port) { 66 unsigned MessagePipe::GetPeerPort(unsigned port) {
52 DCHECK(port == 0 || port == 1); 67 DCHECK(port == 0 || port == 1);
53 return port ^ 1; 68 return port ^ 1;
54 } 69 }
55 70
71 // static
72 bool MessagePipe::Deserialize(Channel* channel,
73 const void* source,
74 size_t size,
75 scoped_refptr<MessagePipe>* message_pipe,
76 unsigned* port) {
77 DCHECK(!message_pipe->get()); // Not technically wrong, but unlikely.
78
79 if (size != sizeof(SerializedMessagePipe)) {
80 LOG(ERROR) << "Invalid serialized message pipe";
81 return false;
82 }
83
84 const SerializedMessagePipe* s =
85 static_cast<const SerializedMessagePipe*>(source);
86 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id);
87 if (!message_pipe->get()) {
88 LOG(ERROR) << "Failed to deserialize message pipe (ID = "
89 << s->receiver_endpoint_id << ")";
90 return false;
91 }
92
93 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = "
94 << s->receiver_endpoint_id << ")";
95 *port = 0;
96 return true;
97 }
98
56 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { 99 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
57 DCHECK(port == 0 || port == 1); 100 DCHECK(port == 0 || port == 1);
58 base::AutoLock locker(lock_); 101 base::AutoLock locker(lock_);
59 DCHECK(endpoints_[port]); 102 DCHECK(endpoints_[port]);
60 103
61 return endpoints_[port]->GetType(); 104 return endpoints_[port]->GetType();
62 } 105 }
63 106
64 void MessagePipe::CancelAllWaiters(unsigned port) { 107 void MessagePipe::CancelAllWaiters(unsigned port) {
65 DCHECK(port == 0 || port == 1); 108 DCHECK(port == 0 || port == 1);
(...skipping 27 matching lines...) Expand all
93 unsigned port, 136 unsigned port,
94 UserPointer<const void> bytes, 137 UserPointer<const void> bytes,
95 uint32_t num_bytes, 138 uint32_t num_bytes,
96 std::vector<DispatcherTransport>* transports, 139 std::vector<DispatcherTransport>* transports,
97 MojoWriteMessageFlags flags) { 140 MojoWriteMessageFlags flags) {
98 DCHECK(port == 0 || port == 1); 141 DCHECK(port == 0 || port == 1);
99 return EnqueueMessageInternal( 142 return EnqueueMessageInternal(
100 GetPeerPort(port), 143 GetPeerPort(port),
101 make_scoped_ptr(new MessageInTransit( 144 make_scoped_ptr(new MessageInTransit(
102 MessageInTransit::kTypeMessagePipeEndpoint, 145 MessageInTransit::kTypeMessagePipeEndpoint,
103 MessageInTransit::kSubtypeMessagePipeEndpointData, 146 MessageInTransit::kSubtypeMessagePipeEndpointData, num_bytes, bytes)),
104 num_bytes,
105 bytes)),
106 transports); 147 transports);
107 } 148 }
108 149
109 MojoResult MessagePipe::ReadMessage(unsigned port, 150 MojoResult MessagePipe::ReadMessage(unsigned port,
110 UserPointer<void> bytes, 151 UserPointer<void> bytes,
111 UserPointer<uint32_t> num_bytes, 152 UserPointer<uint32_t> num_bytes,
112 DispatcherVector* dispatchers, 153 DispatcherVector* dispatchers,
113 uint32_t* num_dispatchers, 154 uint32_t* num_dispatchers,
114 MojoReadMessageFlags flags) { 155 MojoReadMessageFlags flags) {
115 DCHECK(port == 0 || port == 1); 156 DCHECK(port == 0 || port == 1);
116 157
117 base::AutoLock locker(lock_); 158 base::AutoLock locker(lock_);
118 DCHECK(endpoints_[port]); 159 DCHECK(endpoints_[port]);
119 160
120 return endpoints_[port]->ReadMessage( 161 return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers,
121 bytes, num_bytes, dispatchers, num_dispatchers, flags); 162 num_dispatchers, flags);
122 } 163 }
123 164
124 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { 165 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
125 DCHECK(port == 0 || port == 1); 166 DCHECK(port == 0 || port == 1);
126 167
127 base::AutoLock locker(const_cast<base::Lock&>(lock_)); 168 base::AutoLock locker(const_cast<base::Lock&>(lock_));
128 DCHECK(endpoints_[port]); 169 DCHECK(endpoints_[port]);
129 170
130 return endpoints_[port]->GetHandleSignalsState(); 171 return endpoints_[port]->GetHandleSignalsState();
131 } 172 }
(...skipping 15 matching lines...) Expand all
147 Waiter* waiter, 188 Waiter* waiter,
148 HandleSignalsState* signals_state) { 189 HandleSignalsState* signals_state) {
149 DCHECK(port == 0 || port == 1); 190 DCHECK(port == 0 || port == 1);
150 191
151 base::AutoLock locker(lock_); 192 base::AutoLock locker(lock_);
152 DCHECK(endpoints_[port]); 193 DCHECK(endpoints_[port]);
153 194
154 endpoints_[port]->RemoveWaiter(waiter, signals_state); 195 endpoints_[port]->RemoveWaiter(waiter, signals_state);
155 } 196 }
156 197
198 void MessagePipe::StartSerialize(unsigned /*port*/,
199 Channel* /*channel*/,
200 size_t* max_size,
201 size_t* max_platform_handles) {
202 *max_size = sizeof(SerializedMessagePipe);
203 *max_platform_handles = 0;
204 }
205
206 bool MessagePipe::EndSerialize(
207 unsigned port,
208 Channel* channel,
209 void* destination,
210 size_t* actual_size,
211 embedder::PlatformHandleVector* /*platform_handles*/) {
212 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination);
213
214 // Convert the local endpoint to a proxy endpoint (moving the message queue)
215 // and attach it to the channel.
216 s->receiver_endpoint_id =
217 channel->AttachAndRunEndpoint(ConvertLocalToProxy(port), false);
218 DVLOG(2) << "Serializing message pipe (remote ID = "
219 << s->receiver_endpoint_id << ")";
220 *actual_size = sizeof(SerializedMessagePipe);
221 return true;
222 }
223
157 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { 224 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
158 DCHECK(port == 0 || port == 1); 225 DCHECK(port == 0 || port == 1);
159 226
160 base::AutoLock locker(lock_); 227 base::AutoLock locker(lock_);
161 DCHECK(endpoints_[port]); 228 DCHECK(endpoints_[port]);
162 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); 229 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
163 230
164 // The local peer is already closed, so just make a |ChannelEndpoint| that'll 231 // The local peer is already closed, so just make a |ChannelEndpoint| that'll
165 // send the already-queued messages. 232 // send the already-queued messages.
166 if (!endpoints_[GetPeerPort(port)]) { 233 if (!endpoints_[GetPeerPort(port)]) {
167 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( 234 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint(
168 nullptr, 235 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
169 0, 236 endpoints_[port].get())->message_queue()));
170 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get())
171 ->message_queue()));
172 endpoints_[port]->Close(); 237 endpoints_[port]->Close();
173 endpoints_[port].reset(); 238 endpoints_[port].reset();
174 return channel_endpoint; 239 return channel_endpoint;
175 } 240 }
176 241
177 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a 242 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a
178 // |MessagePipe| with two proxy endpoints, which will then act as a proxy 243 // |MessagePipe| with two proxy endpoints, which will then act as a proxy
179 // (rather than trying to connect the two ends directly). 244 // (rather than trying to connect the two ends directly).
180 DLOG_IF(WARNING, 245 DLOG_IF(WARNING, endpoints_[GetPeerPort(port)]->GetType() !=
181 endpoints_[GetPeerPort(port)]->GetType() != 246 MessagePipeEndpoint::kTypeLocal)
182 MessagePipeEndpoint::kTypeLocal)
183 << "Direct message pipe passing across multiple channels not yet " 247 << "Direct message pipe passing across multiple channels not yet "
184 "implemented; will proxy"; 248 "implemented; will proxy";
185 249
186 scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass()); 250 scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
187 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( 251 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint(
188 this, 252 this, port, static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get())
189 port, 253 ->message_queue()));
190 static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get())
191 ->message_queue()));
192 endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get())); 254 endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get()));
193 old_endpoint->Close(); 255 old_endpoint->Close();
194 256
195 return channel_endpoint; 257 return channel_endpoint;
196 } 258 }
197 259
198 MojoResult MessagePipe::EnqueueMessage(unsigned port, 260 MojoResult MessagePipe::EnqueueMessage(unsigned port,
199 scoped_ptr<MessageInTransit> message) { 261 scoped_ptr<MessageInTransit> message) {
200 return EnqueueMessageInternal(port, message.Pass(), nullptr); 262 return EnqueueMessageInternal(port, message.Pass(), nullptr);
201 } 263 }
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
273 // Clone the dispatchers and attach them to the message. (This must be done as 335 // Clone the dispatchers and attach them to the message. (This must be done as
274 // a separate loop, since we want to leave the dispatchers alone on failure.) 336 // a separate loop, since we want to leave the dispatchers alone on failure.)
275 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); 337 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
276 dispatchers->reserve(transports->size()); 338 dispatchers->reserve(transports->size());
277 for (size_t i = 0; i < transports->size(); i++) { 339 for (size_t i = 0; i < transports->size(); i++) {
278 if ((*transports)[i].is_valid()) { 340 if ((*transports)[i].is_valid()) {
279 dispatchers->push_back( 341 dispatchers->push_back(
280 (*transports)[i].CreateEquivalentDispatcherAndClose()); 342 (*transports)[i].CreateEquivalentDispatcherAndClose());
281 } else { 343 } else {
282 LOG(WARNING) << "Enqueueing null dispatcher"; 344 LOG(WARNING) << "Enqueueing null dispatcher";
283 dispatchers->push_back(scoped_refptr<Dispatcher>()); 345 dispatchers->push_back(nullptr);
284 } 346 }
285 } 347 }
286 message->SetDispatchers(dispatchers.Pass()); 348 message->SetDispatchers(dispatchers.Pass());
287 return MOJO_RESULT_OK; 349 return MOJO_RESULT_OK;
288 } 350 }
289 351
290 MojoResult MessagePipe::HandleControlMessage( 352 MojoResult MessagePipe::HandleControlMessage(
291 unsigned /*port*/, 353 unsigned /*port*/,
292 scoped_ptr<MessageInTransit> message) { 354 scoped_ptr<MessageInTransit> message) {
293 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " 355 LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
294 << message->subtype(); 356 << message->subtype();
295 return MOJO_RESULT_UNKNOWN; 357 return MOJO_RESULT_UNKNOWN;
296 } 358 }
297 359
298 } // namespace system 360 } // namespace system
299 } // namespace mojo 361 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/message_pipe.h ('k') | mojo/edk/system/message_pipe_dispatcher.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698