OLD | NEW |
| (Empty) |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/edk/system/message_pipe.h" | |
6 | |
7 #include "base/logging.h" | |
8 #include "mojo/edk/system/channel.h" | |
9 #include "mojo/edk/system/channel_endpoint.h" | |
10 #include "mojo/edk/system/channel_endpoint_id.h" | |
11 #include "mojo/edk/system/incoming_endpoint.h" | |
12 #include "mojo/edk/system/local_message_pipe_endpoint.h" | |
13 #include "mojo/edk/system/message_in_transit.h" | |
14 #include "mojo/edk/system/message_pipe_dispatcher.h" | |
15 #include "mojo/edk/system/message_pipe_endpoint.h" | |
16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | |
17 | |
18 namespace mojo { | |
19 namespace system { | |
20 | |
21 // static | |
22 MessagePipe* MessagePipe::CreateLocalLocal() { | |
23 MessagePipe* message_pipe = new MessagePipe(); | |
24 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | |
25 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | |
26 return message_pipe; | |
27 } | |
28 | |
29 // static | |
30 MessagePipe* MessagePipe::CreateLocalProxy( | |
31 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | |
32 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. | |
33 MessagePipe* message_pipe = new MessagePipe(); | |
34 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | |
35 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); | |
36 message_pipe->endpoints_[1].reset( | |
37 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | |
38 return message_pipe; | |
39 } | |
40 | |
41 // static | |
42 MessagePipe* MessagePipe::CreateLocalProxyFromExisting( | |
43 MessageInTransitQueue* message_queue, | |
44 ChannelEndpoint* channel_endpoint) { | |
45 DCHECK(message_queue); | |
46 MessagePipe* message_pipe = new MessagePipe(); | |
47 message_pipe->endpoints_[0].reset( | |
48 new LocalMessagePipeEndpoint(message_queue)); | |
49 if (channel_endpoint) { | |
50 bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1); | |
51 message_pipe->endpoints_[1].reset( | |
52 new ProxyMessagePipeEndpoint(channel_endpoint)); | |
53 if (!attached_to_channel) | |
54 message_pipe->OnDetachFromChannel(1); | |
55 } else { | |
56 // This means that the proxy side was already closed; we only need to inform | |
57 // the local side of this. | |
58 // TODO(vtl): This is safe to do without locking (but perhaps slightly | |
59 // dubious), since no other thread has access to |message_pipe| yet. | |
60 message_pipe->endpoints_[0]->OnPeerClose(); | |
61 } | |
62 return message_pipe; | |
63 } | |
64 | |
65 // static | |
66 MessagePipe* MessagePipe::CreateProxyLocal( | |
67 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | |
68 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. | |
69 MessagePipe* message_pipe = new MessagePipe(); | |
70 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); | |
71 message_pipe->endpoints_[0].reset( | |
72 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | |
73 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | |
74 return message_pipe; | |
75 } | |
76 | |
77 // static | |
78 unsigned MessagePipe::GetPeerPort(unsigned port) { | |
79 DCHECK(port == 0 || port == 1); | |
80 return port ^ 1; | |
81 } | |
82 | |
83 // static | |
84 bool MessagePipe::Deserialize(Channel* channel, | |
85 const void* source, | |
86 size_t size, | |
87 scoped_refptr<MessagePipe>* message_pipe, | |
88 unsigned* port) { | |
89 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. | |
90 | |
91 if (size != channel->GetSerializedEndpointSize()) { | |
92 LOG(ERROR) << "Invalid serialized message pipe"; | |
93 return false; | |
94 } | |
95 | |
96 scoped_refptr<IncomingEndpoint> incoming_endpoint = | |
97 channel->DeserializeEndpoint(source); | |
98 if (!incoming_endpoint) | |
99 return false; | |
100 | |
101 *message_pipe = incoming_endpoint->ConvertToMessagePipe(); | |
102 DCHECK(*message_pipe); | |
103 *port = 0; | |
104 return true; | |
105 } | |
106 | |
107 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { | |
108 DCHECK(port == 0 || port == 1); | |
109 base::AutoLock locker(lock_); | |
110 DCHECK(endpoints_[port]); | |
111 | |
112 return endpoints_[port]->GetType(); | |
113 } | |
114 | |
115 void MessagePipe::CancelAllAwakables(unsigned port) { | |
116 DCHECK(port == 0 || port == 1); | |
117 | |
118 base::AutoLock locker(lock_); | |
119 DCHECK(endpoints_[port]); | |
120 endpoints_[port]->CancelAllAwakables(); | |
121 } | |
122 | |
123 void MessagePipe::Close(unsigned port) { | |
124 DCHECK(port == 0 || port == 1); | |
125 | |
126 unsigned peer_port = GetPeerPort(port); | |
127 | |
128 base::AutoLock locker(lock_); | |
129 // The endpoint's |OnPeerClose()| may have been called first and returned | |
130 // false, which would have resulted in its destruction. | |
131 if (!endpoints_[port]) | |
132 return; | |
133 | |
134 endpoints_[port]->Close(); | |
135 if (endpoints_[peer_port]) { | |
136 if (!endpoints_[peer_port]->OnPeerClose()) | |
137 endpoints_[peer_port].reset(); | |
138 } | |
139 endpoints_[port].reset(); | |
140 } | |
141 | |
142 // TODO(vtl): Handle flags. | |
143 MojoResult MessagePipe::WriteMessage( | |
144 unsigned port, | |
145 UserPointer<const void> bytes, | |
146 uint32_t num_bytes, | |
147 std::vector<DispatcherTransport>* transports, | |
148 MojoWriteMessageFlags flags) { | |
149 DCHECK(port == 0 || port == 1); | |
150 | |
151 base::AutoLock locker(lock_); | |
152 return EnqueueMessageNoLock( | |
153 GetPeerPort(port), | |
154 make_scoped_ptr(new MessageInTransit( | |
155 MessageInTransit::kTypeEndpoint, | |
156 MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)), | |
157 transports); | |
158 } | |
159 | |
160 MojoResult MessagePipe::ReadMessage(unsigned port, | |
161 UserPointer<void> bytes, | |
162 UserPointer<uint32_t> num_bytes, | |
163 DispatcherVector* dispatchers, | |
164 uint32_t* num_dispatchers, | |
165 MojoReadMessageFlags flags) { | |
166 DCHECK(port == 0 || port == 1); | |
167 | |
168 base::AutoLock locker(lock_); | |
169 DCHECK(endpoints_[port]); | |
170 | |
171 return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers, | |
172 num_dispatchers, flags); | |
173 } | |
174 | |
175 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { | |
176 DCHECK(port == 0 || port == 1); | |
177 | |
178 base::AutoLock locker(const_cast<base::Lock&>(lock_)); | |
179 DCHECK(endpoints_[port]); | |
180 | |
181 return endpoints_[port]->GetHandleSignalsState(); | |
182 } | |
183 | |
184 MojoResult MessagePipe::AddAwakable(unsigned port, | |
185 Awakable* awakable, | |
186 MojoHandleSignals signals, | |
187 uint32_t context, | |
188 HandleSignalsState* signals_state) { | |
189 DCHECK(port == 0 || port == 1); | |
190 | |
191 base::AutoLock locker(lock_); | |
192 DCHECK(endpoints_[port]); | |
193 | |
194 return endpoints_[port]->AddAwakable(awakable, signals, context, | |
195 signals_state); | |
196 } | |
197 | |
198 void MessagePipe::RemoveAwakable(unsigned port, | |
199 Awakable* awakable, | |
200 HandleSignalsState* signals_state) { | |
201 DCHECK(port == 0 || port == 1); | |
202 | |
203 base::AutoLock locker(lock_); | |
204 DCHECK(endpoints_[port]); | |
205 | |
206 endpoints_[port]->RemoveAwakable(awakable, signals_state); | |
207 } | |
208 | |
209 void MessagePipe::StartSerialize(unsigned /*port*/, | |
210 Channel* channel, | |
211 size_t* max_size, | |
212 size_t* max_platform_handles) { | |
213 *max_size = channel->GetSerializedEndpointSize(); | |
214 *max_platform_handles = 0; | |
215 } | |
216 | |
217 bool MessagePipe::EndSerialize( | |
218 unsigned port, | |
219 Channel* channel, | |
220 void* destination, | |
221 size_t* actual_size, | |
222 embedder::PlatformHandleVector* /*platform_handles*/) { | |
223 DCHECK(port == 0 || port == 1); | |
224 | |
225 base::AutoLock locker(lock_); | |
226 DCHECK(endpoints_[port]); | |
227 | |
228 // The port being serialized must be local. | |
229 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); | |
230 | |
231 unsigned peer_port = GetPeerPort(port); | |
232 MessageInTransitQueue* message_queue = | |
233 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()) | |
234 ->message_queue(); | |
235 // The replacement for |endpoints_[port]|, if any. | |
236 MessagePipeEndpoint* replacement_endpoint = nullptr; | |
237 | |
238 // The three cases below correspond to the ones described above | |
239 // |Channel::SerializeEndpoint...()| (in channel.h). | |
240 if (!endpoints_[peer_port]) { | |
241 // Case 1: (known-)closed peer port. There's no reason for us to continue to | |
242 // exist afterwards. | |
243 channel->SerializeEndpointWithClosedPeer(destination, message_queue); | |
244 } else if (endpoints_[peer_port]->GetType() == | |
245 MessagePipeEndpoint::kTypeLocal) { | |
246 // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint| | |
247 // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that | |
248 // the |Channel| returns to us. | |
249 scoped_refptr<ChannelEndpoint> channel_endpoint = | |
250 channel->SerializeEndpointWithLocalPeer(destination, message_queue, | |
251 this, port); | |
252 replacement_endpoint = new ProxyMessagePipeEndpoint(channel_endpoint.get()); | |
253 } else { | |
254 // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and | |
255 // pass it to the |Channel|. There's no reason for us to continue to exist | |
256 // afterwards. | |
257 DCHECK_EQ(endpoints_[peer_port]->GetType(), | |
258 MessagePipeEndpoint::kTypeProxy); | |
259 ProxyMessagePipeEndpoint* peer_endpoint = | |
260 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); | |
261 scoped_refptr<ChannelEndpoint> peer_channel_endpoint = | |
262 peer_endpoint->ReleaseChannelEndpoint(); | |
263 channel->SerializeEndpointWithRemotePeer(destination, message_queue, | |
264 peer_channel_endpoint); | |
265 // No need to call |Close()| after |ReleaseChannelEndpoint()|. | |
266 endpoints_[peer_port].reset(); | |
267 } | |
268 | |
269 endpoints_[port]->Close(); | |
270 endpoints_[port].reset(replacement_endpoint); | |
271 | |
272 *actual_size = channel->GetSerializedEndpointSize(); | |
273 return true; | |
274 } | |
275 | |
276 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { | |
277 base::AutoLock locker(lock_); | |
278 | |
279 if (!endpoints_[port]) { | |
280 // This will happen only on the rare occasion that the call to | |
281 // |OnReadMessage()| is racing with us calling | |
282 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, | |
283 // and the |ChannelEndpoint| can retry (calling the new client's | |
284 // |OnReadMessage()|). | |
285 return false; | |
286 } | |
287 | |
288 // This is called when the |ChannelEndpoint| for the | |
289 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). | |
290 // We need to pass this message on to its peer port (typically a | |
291 // |LocalMessagePipeEndpoint|). | |
292 MojoResult result = EnqueueMessageNoLock(GetPeerPort(port), | |
293 make_scoped_ptr(message), nullptr); | |
294 DLOG_IF(WARNING, result != MOJO_RESULT_OK) | |
295 << "EnqueueMessageNoLock() failed (result = " << result << ")"; | |
296 return true; | |
297 } | |
298 | |
299 void MessagePipe::OnDetachFromChannel(unsigned port) { | |
300 Close(port); | |
301 } | |
302 | |
303 MessagePipe::MessagePipe() { | |
304 } | |
305 | |
306 MessagePipe::~MessagePipe() { | |
307 // Owned by the dispatchers. The owning dispatchers should only release us via | |
308 // their |Close()| method, which should inform us of being closed via our | |
309 // |Close()|. Thus these should already be null. | |
310 DCHECK(!endpoints_[0]); | |
311 DCHECK(!endpoints_[1]); | |
312 } | |
313 | |
314 MojoResult MessagePipe::EnqueueMessageNoLock( | |
315 unsigned port, | |
316 scoped_ptr<MessageInTransit> message, | |
317 std::vector<DispatcherTransport>* transports) { | |
318 DCHECK(port == 0 || port == 1); | |
319 DCHECK(message); | |
320 | |
321 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); | |
322 DCHECK(endpoints_[GetPeerPort(port)]); | |
323 | |
324 // The destination port need not be open, unlike the source port. | |
325 if (!endpoints_[port]) | |
326 return MOJO_RESULT_FAILED_PRECONDITION; | |
327 | |
328 if (transports) { | |
329 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); | |
330 if (result != MOJO_RESULT_OK) | |
331 return result; | |
332 } | |
333 | |
334 // The endpoint's |EnqueueMessage()| may not report failure. | |
335 endpoints_[port]->EnqueueMessage(message.Pass()); | |
336 return MOJO_RESULT_OK; | |
337 } | |
338 | |
339 MojoResult MessagePipe::AttachTransportsNoLock( | |
340 unsigned port, | |
341 MessageInTransit* message, | |
342 std::vector<DispatcherTransport>* transports) { | |
343 DCHECK(!message->has_dispatchers()); | |
344 | |
345 // You're not allowed to send either handle to a message pipe over the message | |
346 // pipe, so check for this. (The case of trying to write a handle to itself is | |
347 // taken care of by |Core|. That case kind of makes sense, but leads to | |
348 // complications if, e.g., both sides try to do the same thing with their | |
349 // respective handles simultaneously. The other case, of trying to write the | |
350 // peer handle to a handle, doesn't make sense -- since no handle will be | |
351 // available to read the message from.) | |
352 for (size_t i = 0; i < transports->size(); i++) { | |
353 if (!(*transports)[i].is_valid()) | |
354 continue; | |
355 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) { | |
356 MessagePipeDispatcherTransport mp_transport((*transports)[i]); | |
357 if (mp_transport.GetMessagePipe() == this) { | |
358 // The other case should have been disallowed by |Core|. (Note: |port| | |
359 // is the peer port of the handle given to |WriteMessage()|.) | |
360 DCHECK_EQ(mp_transport.GetPort(), port); | |
361 return MOJO_RESULT_INVALID_ARGUMENT; | |
362 } | |
363 } | |
364 } | |
365 | |
366 // Clone the dispatchers and attach them to the message. (This must be done as | |
367 // a separate loop, since we want to leave the dispatchers alone on failure.) | |
368 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); | |
369 dispatchers->reserve(transports->size()); | |
370 for (size_t i = 0; i < transports->size(); i++) { | |
371 if ((*transports)[i].is_valid()) { | |
372 dispatchers->push_back( | |
373 (*transports)[i].CreateEquivalentDispatcherAndClose()); | |
374 } else { | |
375 LOG(WARNING) << "Enqueueing null dispatcher"; | |
376 dispatchers->push_back(nullptr); | |
377 } | |
378 } | |
379 message->SetDispatchers(dispatchers.Pass()); | |
380 return MOJO_RESULT_OK; | |
381 } | |
382 | |
383 } // namespace system | |
384 } // namespace mojo | |
OLD | NEW |