Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(152)

Side by Side Diff: mojo/edk/system/message_pipe.cc

Issue 814543006: Move //mojo/{public, edk} underneath //third_party (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/message_pipe.h ('k') | mojo/edk/system/message_pipe_dispatcher.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/message_pipe.h"
6
7 #include "base/logging.h"
8 #include "mojo/edk/system/channel.h"
9 #include "mojo/edk/system/channel_endpoint.h"
10 #include "mojo/edk/system/channel_endpoint_id.h"
11 #include "mojo/edk/system/incoming_endpoint.h"
12 #include "mojo/edk/system/local_message_pipe_endpoint.h"
13 #include "mojo/edk/system/message_in_transit.h"
14 #include "mojo/edk/system/message_pipe_dispatcher.h"
15 #include "mojo/edk/system/message_pipe_endpoint.h"
16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h"
17
18 namespace mojo {
19 namespace system {
20
21 // static
22 MessagePipe* MessagePipe::CreateLocalLocal() {
23 MessagePipe* message_pipe = new MessagePipe();
24 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
25 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
26 return message_pipe;
27 }
28
29 // static
30 MessagePipe* MessagePipe::CreateLocalProxy(
31 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
32 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
33 MessagePipe* message_pipe = new MessagePipe();
34 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
35 *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
36 message_pipe->endpoints_[1].reset(
37 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
38 return message_pipe;
39 }
40
41 // static
42 MessagePipe* MessagePipe::CreateLocalProxyFromExisting(
43 MessageInTransitQueue* message_queue,
44 ChannelEndpoint* channel_endpoint) {
45 DCHECK(message_queue);
46 MessagePipe* message_pipe = new MessagePipe();
47 message_pipe->endpoints_[0].reset(
48 new LocalMessagePipeEndpoint(message_queue));
49 if (channel_endpoint) {
50 bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1);
51 message_pipe->endpoints_[1].reset(
52 new ProxyMessagePipeEndpoint(channel_endpoint));
53 if (!attached_to_channel)
54 message_pipe->OnDetachFromChannel(1);
55 } else {
56 // This means that the proxy side was already closed; we only need to inform
57 // the local side of this.
58 // TODO(vtl): This is safe to do without locking (but perhaps slightly
59 // dubious), since no other thread has access to |message_pipe| yet.
60 message_pipe->endpoints_[0]->OnPeerClose();
61 }
62 return message_pipe;
63 }
64
65 // static
66 MessagePipe* MessagePipe::CreateProxyLocal(
67 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
68 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
69 MessagePipe* message_pipe = new MessagePipe();
70 *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
71 message_pipe->endpoints_[0].reset(
72 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
73 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
74 return message_pipe;
75 }
76
77 // static
78 unsigned MessagePipe::GetPeerPort(unsigned port) {
79 DCHECK(port == 0 || port == 1);
80 return port ^ 1;
81 }
82
83 // static
84 bool MessagePipe::Deserialize(Channel* channel,
85 const void* source,
86 size_t size,
87 scoped_refptr<MessagePipe>* message_pipe,
88 unsigned* port) {
89 DCHECK(!*message_pipe); // Not technically wrong, but unlikely.
90
91 if (size != channel->GetSerializedEndpointSize()) {
92 LOG(ERROR) << "Invalid serialized message pipe";
93 return false;
94 }
95
96 scoped_refptr<IncomingEndpoint> incoming_endpoint =
97 channel->DeserializeEndpoint(source);
98 if (!incoming_endpoint)
99 return false;
100
101 *message_pipe = incoming_endpoint->ConvertToMessagePipe();
102 DCHECK(*message_pipe);
103 *port = 0;
104 return true;
105 }
106
107 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
108 DCHECK(port == 0 || port == 1);
109 base::AutoLock locker(lock_);
110 DCHECK(endpoints_[port]);
111
112 return endpoints_[port]->GetType();
113 }
114
115 void MessagePipe::CancelAllAwakables(unsigned port) {
116 DCHECK(port == 0 || port == 1);
117
118 base::AutoLock locker(lock_);
119 DCHECK(endpoints_[port]);
120 endpoints_[port]->CancelAllAwakables();
121 }
122
123 void MessagePipe::Close(unsigned port) {
124 DCHECK(port == 0 || port == 1);
125
126 unsigned peer_port = GetPeerPort(port);
127
128 base::AutoLock locker(lock_);
129 // The endpoint's |OnPeerClose()| may have been called first and returned
130 // false, which would have resulted in its destruction.
131 if (!endpoints_[port])
132 return;
133
134 endpoints_[port]->Close();
135 if (endpoints_[peer_port]) {
136 if (!endpoints_[peer_port]->OnPeerClose())
137 endpoints_[peer_port].reset();
138 }
139 endpoints_[port].reset();
140 }
141
142 // TODO(vtl): Handle flags.
143 MojoResult MessagePipe::WriteMessage(
144 unsigned port,
145 UserPointer<const void> bytes,
146 uint32_t num_bytes,
147 std::vector<DispatcherTransport>* transports,
148 MojoWriteMessageFlags flags) {
149 DCHECK(port == 0 || port == 1);
150
151 base::AutoLock locker(lock_);
152 return EnqueueMessageNoLock(
153 GetPeerPort(port),
154 make_scoped_ptr(new MessageInTransit(
155 MessageInTransit::kTypeEndpoint,
156 MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)),
157 transports);
158 }
159
160 MojoResult MessagePipe::ReadMessage(unsigned port,
161 UserPointer<void> bytes,
162 UserPointer<uint32_t> num_bytes,
163 DispatcherVector* dispatchers,
164 uint32_t* num_dispatchers,
165 MojoReadMessageFlags flags) {
166 DCHECK(port == 0 || port == 1);
167
168 base::AutoLock locker(lock_);
169 DCHECK(endpoints_[port]);
170
171 return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers,
172 num_dispatchers, flags);
173 }
174
175 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
176 DCHECK(port == 0 || port == 1);
177
178 base::AutoLock locker(const_cast<base::Lock&>(lock_));
179 DCHECK(endpoints_[port]);
180
181 return endpoints_[port]->GetHandleSignalsState();
182 }
183
184 MojoResult MessagePipe::AddAwakable(unsigned port,
185 Awakable* awakable,
186 MojoHandleSignals signals,
187 uint32_t context,
188 HandleSignalsState* signals_state) {
189 DCHECK(port == 0 || port == 1);
190
191 base::AutoLock locker(lock_);
192 DCHECK(endpoints_[port]);
193
194 return endpoints_[port]->AddAwakable(awakable, signals, context,
195 signals_state);
196 }
197
198 void MessagePipe::RemoveAwakable(unsigned port,
199 Awakable* awakable,
200 HandleSignalsState* signals_state) {
201 DCHECK(port == 0 || port == 1);
202
203 base::AutoLock locker(lock_);
204 DCHECK(endpoints_[port]);
205
206 endpoints_[port]->RemoveAwakable(awakable, signals_state);
207 }
208
209 void MessagePipe::StartSerialize(unsigned /*port*/,
210 Channel* channel,
211 size_t* max_size,
212 size_t* max_platform_handles) {
213 *max_size = channel->GetSerializedEndpointSize();
214 *max_platform_handles = 0;
215 }
216
217 bool MessagePipe::EndSerialize(
218 unsigned port,
219 Channel* channel,
220 void* destination,
221 size_t* actual_size,
222 embedder::PlatformHandleVector* /*platform_handles*/) {
223 DCHECK(port == 0 || port == 1);
224
225 base::AutoLock locker(lock_);
226 DCHECK(endpoints_[port]);
227
228 // The port being serialized must be local.
229 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
230
231 unsigned peer_port = GetPeerPort(port);
232 MessageInTransitQueue* message_queue =
233 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get())
234 ->message_queue();
235 // The replacement for |endpoints_[port]|, if any.
236 MessagePipeEndpoint* replacement_endpoint = nullptr;
237
238 // The three cases below correspond to the ones described above
239 // |Channel::SerializeEndpoint...()| (in channel.h).
240 if (!endpoints_[peer_port]) {
241 // Case 1: (known-)closed peer port. There's no reason for us to continue to
242 // exist afterwards.
243 channel->SerializeEndpointWithClosedPeer(destination, message_queue);
244 } else if (endpoints_[peer_port]->GetType() ==
245 MessagePipeEndpoint::kTypeLocal) {
246 // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint|
247 // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that
248 // the |Channel| returns to us.
249 scoped_refptr<ChannelEndpoint> channel_endpoint =
250 channel->SerializeEndpointWithLocalPeer(destination, message_queue,
251 this, port);
252 replacement_endpoint = new ProxyMessagePipeEndpoint(channel_endpoint.get());
253 } else {
254 // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and
255 // pass it to the |Channel|. There's no reason for us to continue to exist
256 // afterwards.
257 DCHECK_EQ(endpoints_[peer_port]->GetType(),
258 MessagePipeEndpoint::kTypeProxy);
259 ProxyMessagePipeEndpoint* peer_endpoint =
260 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get());
261 scoped_refptr<ChannelEndpoint> peer_channel_endpoint =
262 peer_endpoint->ReleaseChannelEndpoint();
263 channel->SerializeEndpointWithRemotePeer(destination, message_queue,
264 peer_channel_endpoint);
265 // No need to call |Close()| after |ReleaseChannelEndpoint()|.
266 endpoints_[peer_port].reset();
267 }
268
269 endpoints_[port]->Close();
270 endpoints_[port].reset(replacement_endpoint);
271
272 *actual_size = channel->GetSerializedEndpointSize();
273 return true;
274 }
275
276 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) {
277 base::AutoLock locker(lock_);
278
279 if (!endpoints_[port]) {
280 // This will happen only on the rare occasion that the call to
281 // |OnReadMessage()| is racing with us calling
282 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message,
283 // and the |ChannelEndpoint| can retry (calling the new client's
284 // |OnReadMessage()|).
285 return false;
286 }
287
288 // This is called when the |ChannelEndpoint| for the
289 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|).
290 // We need to pass this message on to its peer port (typically a
291 // |LocalMessagePipeEndpoint|).
292 MojoResult result = EnqueueMessageNoLock(GetPeerPort(port),
293 make_scoped_ptr(message), nullptr);
294 DLOG_IF(WARNING, result != MOJO_RESULT_OK)
295 << "EnqueueMessageNoLock() failed (result = " << result << ")";
296 return true;
297 }
298
299 void MessagePipe::OnDetachFromChannel(unsigned port) {
300 Close(port);
301 }
302
303 MessagePipe::MessagePipe() {
304 }
305
306 MessagePipe::~MessagePipe() {
307 // Owned by the dispatchers. The owning dispatchers should only release us via
308 // their |Close()| method, which should inform us of being closed via our
309 // |Close()|. Thus these should already be null.
310 DCHECK(!endpoints_[0]);
311 DCHECK(!endpoints_[1]);
312 }
313
314 MojoResult MessagePipe::EnqueueMessageNoLock(
315 unsigned port,
316 scoped_ptr<MessageInTransit> message,
317 std::vector<DispatcherTransport>* transports) {
318 DCHECK(port == 0 || port == 1);
319 DCHECK(message);
320
321 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint);
322 DCHECK(endpoints_[GetPeerPort(port)]);
323
324 // The destination port need not be open, unlike the source port.
325 if (!endpoints_[port])
326 return MOJO_RESULT_FAILED_PRECONDITION;
327
328 if (transports) {
329 MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
330 if (result != MOJO_RESULT_OK)
331 return result;
332 }
333
334 // The endpoint's |EnqueueMessage()| may not report failure.
335 endpoints_[port]->EnqueueMessage(message.Pass());
336 return MOJO_RESULT_OK;
337 }
338
339 MojoResult MessagePipe::AttachTransportsNoLock(
340 unsigned port,
341 MessageInTransit* message,
342 std::vector<DispatcherTransport>* transports) {
343 DCHECK(!message->has_dispatchers());
344
345 // You're not allowed to send either handle to a message pipe over the message
346 // pipe, so check for this. (The case of trying to write a handle to itself is
347 // taken care of by |Core|. That case kind of makes sense, but leads to
348 // complications if, e.g., both sides try to do the same thing with their
349 // respective handles simultaneously. The other case, of trying to write the
350 // peer handle to a handle, doesn't make sense -- since no handle will be
351 // available to read the message from.)
352 for (size_t i = 0; i < transports->size(); i++) {
353 if (!(*transports)[i].is_valid())
354 continue;
355 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
356 MessagePipeDispatcherTransport mp_transport((*transports)[i]);
357 if (mp_transport.GetMessagePipe() == this) {
358 // The other case should have been disallowed by |Core|. (Note: |port|
359 // is the peer port of the handle given to |WriteMessage()|.)
360 DCHECK_EQ(mp_transport.GetPort(), port);
361 return MOJO_RESULT_INVALID_ARGUMENT;
362 }
363 }
364 }
365
366 // Clone the dispatchers and attach them to the message. (This must be done as
367 // a separate loop, since we want to leave the dispatchers alone on failure.)
368 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
369 dispatchers->reserve(transports->size());
370 for (size_t i = 0; i < transports->size(); i++) {
371 if ((*transports)[i].is_valid()) {
372 dispatchers->push_back(
373 (*transports)[i].CreateEquivalentDispatcherAndClose());
374 } else {
375 LOG(WARNING) << "Enqueueing null dispatcher";
376 dispatchers->push_back(nullptr);
377 }
378 }
379 message->SetDispatchers(dispatchers.Pass());
380 return MOJO_RESULT_OK;
381 }
382
383 } // namespace system
384 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/message_pipe.h ('k') | mojo/edk/system/message_pipe_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698