OLD | NEW |
| (Empty) |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/system/channel.h" | |
6 | |
7 #include <algorithm> | |
8 | |
9 #include "base/bind.h" | |
10 #include "base/compiler_specific.h" | |
11 #include "base/logging.h" | |
12 #include "base/macros.h" | |
13 #include "base/strings/stringprintf.h" | |
14 #include "mojo/embedder/platform_handle_vector.h" | |
15 #include "mojo/system/message_pipe_endpoint.h" | |
16 #include "mojo/system/transport_data.h" | |
17 | |
18 namespace mojo { | |
19 namespace system { | |
20 | |
21 static_assert(Channel::kBootstrapEndpointId != | |
22 MessageInTransit::kInvalidEndpointId, | |
23 "kBootstrapEndpointId is invalid"); | |
24 | |
25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId | |
26 Channel::kBootstrapEndpointId; | |
27 | |
28 Channel::Channel(embedder::PlatformSupport* platform_support) | |
29 : platform_support_(platform_support), | |
30 is_running_(false), | |
31 is_shutting_down_(false), | |
32 next_local_id_(kBootstrapEndpointId) { | |
33 } | |
34 | |
35 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) { | |
36 DCHECK(creation_thread_checker_.CalledOnValidThread()); | |
37 DCHECK(raw_channel); | |
38 | |
39 // No need to take |lock_|, since this must be called before this object | |
40 // becomes thread-safe. | |
41 DCHECK(!is_running_); | |
42 raw_channel_ = raw_channel.Pass(); | |
43 | |
44 if (!raw_channel_->Init(this)) { | |
45 raw_channel_.reset(); | |
46 return false; | |
47 } | |
48 | |
49 is_running_ = true; | |
50 return true; | |
51 } | |
52 | |
53 void Channel::Shutdown() { | |
54 DCHECK(creation_thread_checker_.CalledOnValidThread()); | |
55 | |
56 IdToEndpointMap to_destroy; | |
57 { | |
58 base::AutoLock locker(lock_); | |
59 if (!is_running_) | |
60 return; | |
61 | |
62 // Note: Don't reset |raw_channel_|, in case we're being called from within | |
63 // |OnReadMessage()| or |OnError()|. | |
64 raw_channel_->Shutdown(); | |
65 is_running_ = false; | |
66 | |
67 // We need to deal with it outside the lock. | |
68 std::swap(to_destroy, local_id_to_endpoint_map_); | |
69 } | |
70 | |
71 size_t num_live = 0; | |
72 size_t num_zombies = 0; | |
73 for (IdToEndpointMap::iterator it = to_destroy.begin(); | |
74 it != to_destroy.end(); | |
75 ++it) { | |
76 if (it->second->state_ == ChannelEndpoint::STATE_NORMAL) { | |
77 it->second->OnDisconnect(); | |
78 num_live++; | |
79 } else { | |
80 num_zombies++; | |
81 } | |
82 it->second->DetachFromChannel(); | |
83 } | |
84 DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live | |
85 << " live endpoints and " << num_zombies | |
86 << " zombies"; | |
87 } | |
88 | |
89 void Channel::WillShutdownSoon() { | |
90 base::AutoLock locker(lock_); | |
91 is_shutting_down_ = true; | |
92 } | |
93 | |
94 // Note: |endpoint| being a |scoped_refptr| makes this function safe, since it | |
95 // keeps the endpoint alive even after the lock is released. Otherwise, there's | |
96 // the temptation to simply pass the result of |new ChannelEndpoint(...)| | |
97 // directly to this function, which wouldn't be sufficient for safety. | |
98 MessageInTransit::EndpointId Channel::AttachEndpoint( | |
99 scoped_refptr<ChannelEndpoint> endpoint) { | |
100 DCHECK(endpoint.get()); | |
101 | |
102 MessageInTransit::EndpointId local_id; | |
103 { | |
104 base::AutoLock locker(lock_); | |
105 | |
106 DLOG_IF(WARNING, is_shutting_down_) | |
107 << "AttachEndpoint() while shutting down"; | |
108 | |
109 while (next_local_id_ == MessageInTransit::kInvalidEndpointId || | |
110 local_id_to_endpoint_map_.find(next_local_id_) != | |
111 local_id_to_endpoint_map_.end()) | |
112 next_local_id_++; | |
113 | |
114 local_id = next_local_id_; | |
115 next_local_id_++; | |
116 local_id_to_endpoint_map_[local_id] = endpoint; | |
117 } | |
118 | |
119 endpoint->AttachToChannel(this, local_id); | |
120 return local_id; | |
121 } | |
122 | |
123 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, | |
124 MessageInTransit::EndpointId remote_id) { | |
125 scoped_refptr<ChannelEndpoint> endpoint; | |
126 ChannelEndpoint::State state; | |
127 { | |
128 base::AutoLock locker(lock_); | |
129 | |
130 DLOG_IF(WARNING, is_shutting_down_) | |
131 << "RunMessagePipeEndpoint() while shutting down"; | |
132 | |
133 IdToEndpointMap::const_iterator it = | |
134 local_id_to_endpoint_map_.find(local_id); | |
135 if (it == local_id_to_endpoint_map_.end()) | |
136 return false; | |
137 endpoint = it->second; | |
138 state = it->second->state_; | |
139 } | |
140 | |
141 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint| | |
142 // and ignore it. | |
143 if (state != ChannelEndpoint::STATE_NORMAL) { | |
144 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint " | |
145 "(local ID " << local_id << ", remote ID " << remote_id << ")"; | |
146 return true; | |
147 } | |
148 | |
149 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already | |
150 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|). | |
151 endpoint->Run(remote_id); | |
152 return true; | |
153 } | |
154 | |
155 void Channel::RunRemoteMessagePipeEndpoint( | |
156 MessageInTransit::EndpointId local_id, | |
157 MessageInTransit::EndpointId remote_id) { | |
158 #if DCHECK_IS_ON | |
159 { | |
160 base::AutoLock locker(lock_); | |
161 DCHECK(local_id_to_endpoint_map_.find(local_id) != | |
162 local_id_to_endpoint_map_.end()); | |
163 } | |
164 #endif | |
165 | |
166 if (!SendControlMessage( | |
167 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint, | |
168 local_id, | |
169 remote_id)) { | |
170 HandleLocalError(base::StringPrintf( | |
171 "Failed to send message to run remote message pipe endpoint (local ID " | |
172 "%u, remote ID %u)", | |
173 static_cast<unsigned>(local_id), | |
174 static_cast<unsigned>(remote_id))); | |
175 } | |
176 } | |
177 | |
178 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) { | |
179 base::AutoLock locker(lock_); | |
180 if (!is_running_) { | |
181 // TODO(vtl): I think this is probably not an error condition, but I should | |
182 // think about it (and the shutdown sequence) more carefully. | |
183 LOG(WARNING) << "WriteMessage() after shutdown"; | |
184 return false; | |
185 } | |
186 | |
187 DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down"; | |
188 return raw_channel_->WriteMessage(message.Pass()); | |
189 } | |
190 | |
191 bool Channel::IsWriteBufferEmpty() { | |
192 base::AutoLock locker(lock_); | |
193 if (!is_running_) | |
194 return true; | |
195 return raw_channel_->IsWriteBufferEmpty(); | |
196 } | |
197 | |
198 void Channel::DetachMessagePipeEndpoint( | |
199 MessageInTransit::EndpointId local_id, | |
200 MessageInTransit::EndpointId remote_id) { | |
201 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); | |
202 | |
203 // If this is non-null after the locked block, the endpoint should be detached | |
204 // (and no remove message sent). | |
205 scoped_refptr<ChannelEndpoint> endpoint_to_detach; | |
206 { | |
207 base::AutoLock locker_(lock_); | |
208 if (!is_running_) | |
209 return; | |
210 | |
211 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id); | |
212 DCHECK(it != local_id_to_endpoint_map_.end()); | |
213 | |
214 switch (it->second->state_) { | |
215 case ChannelEndpoint::STATE_NORMAL: | |
216 it->second->state_ = ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK; | |
217 if (remote_id == MessageInTransit::kInvalidEndpointId) | |
218 return; | |
219 // We have to send a remove message (outside the lock). | |
220 break; | |
221 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH: | |
222 endpoint_to_detach = it->second; | |
223 local_id_to_endpoint_map_.erase(it); | |
224 // We have to detach (outside the lock). | |
225 break; | |
226 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK: | |
227 NOTREACHED(); | |
228 return; | |
229 } | |
230 } | |
231 if (endpoint_to_detach.get()) { | |
232 endpoint_to_detach->DetachFromChannel(); | |
233 return; | |
234 } | |
235 | |
236 if (!SendControlMessage( | |
237 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint, | |
238 local_id, | |
239 remote_id)) { | |
240 HandleLocalError(base::StringPrintf( | |
241 "Failed to send message to remove remote message pipe endpoint (local " | |
242 "ID %u, remote ID %u)", | |
243 static_cast<unsigned>(local_id), | |
244 static_cast<unsigned>(remote_id))); | |
245 } | |
246 } | |
247 | |
248 size_t Channel::GetSerializedPlatformHandleSize() const { | |
249 return raw_channel_->GetSerializedPlatformHandleSize(); | |
250 } | |
251 | |
252 Channel::~Channel() { | |
253 // The channel should have been shut down first. | |
254 DCHECK(!is_running_); | |
255 } | |
256 | |
257 void Channel::OnReadMessage( | |
258 const MessageInTransit::View& message_view, | |
259 embedder::ScopedPlatformHandleVectorPtr platform_handles) { | |
260 DCHECK(creation_thread_checker_.CalledOnValidThread()); | |
261 | |
262 switch (message_view.type()) { | |
263 case MessageInTransit::kTypeMessagePipeEndpoint: | |
264 case MessageInTransit::kTypeMessagePipe: | |
265 OnReadMessageForDownstream(message_view, platform_handles.Pass()); | |
266 break; | |
267 case MessageInTransit::kTypeChannel: | |
268 OnReadMessageForChannel(message_view, platform_handles.Pass()); | |
269 break; | |
270 default: | |
271 HandleRemoteError( | |
272 base::StringPrintf("Received message of invalid type %u", | |
273 static_cast<unsigned>(message_view.type()))); | |
274 break; | |
275 } | |
276 } | |
277 | |
278 void Channel::OnError(Error error) { | |
279 DCHECK(creation_thread_checker_.CalledOnValidThread()); | |
280 | |
281 switch (error) { | |
282 case ERROR_READ_SHUTDOWN: | |
283 // The other side was cleanly closed, so this isn't actually an error. | |
284 DVLOG(1) << "RawChannel read error (shutdown)"; | |
285 break; | |
286 case ERROR_READ_BROKEN: { | |
287 base::AutoLock locker(lock_); | |
288 LOG_IF(ERROR, !is_shutting_down_) | |
289 << "RawChannel read error (connection broken)"; | |
290 break; | |
291 } | |
292 case ERROR_READ_BAD_MESSAGE: | |
293 // Receiving a bad message means either a bug, data corruption, or | |
294 // malicious attack (probably due to some other bug). | |
295 LOG(ERROR) << "RawChannel read error (received bad message)"; | |
296 break; | |
297 case ERROR_READ_UNKNOWN: | |
298 LOG(ERROR) << "RawChannel read error (unknown)"; | |
299 break; | |
300 case ERROR_WRITE: | |
301 // Write errors are slightly notable: they probably shouldn't happen under | |
302 // normal operation (but maybe the other side crashed). | |
303 LOG(WARNING) << "RawChannel write error"; | |
304 break; | |
305 } | |
306 Shutdown(); | |
307 } | |
308 | |
309 void Channel::OnReadMessageForDownstream( | |
310 const MessageInTransit::View& message_view, | |
311 embedder::ScopedPlatformHandleVectorPtr platform_handles) { | |
312 DCHECK(creation_thread_checker_.CalledOnValidThread()); | |
313 DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint || | |
314 message_view.type() == MessageInTransit::kTypeMessagePipe); | |
315 | |
316 MessageInTransit::EndpointId local_id = message_view.destination_id(); | |
317 if (local_id == MessageInTransit::kInvalidEndpointId) { | |
318 HandleRemoteError("Received message with no destination ID"); | |
319 return; | |
320 } | |
321 | |
322 scoped_refptr<ChannelEndpoint> endpoint; | |
323 ChannelEndpoint::State state = ChannelEndpoint::STATE_NORMAL; | |
324 { | |
325 base::AutoLock locker(lock_); | |
326 | |
327 // Since we own |raw_channel_|, and this method and |Shutdown()| should only | |
328 // be called from the creation thread, |raw_channel_| should never be null | |
329 // here. | |
330 DCHECK(is_running_); | |
331 | |
332 IdToEndpointMap::const_iterator it = | |
333 local_id_to_endpoint_map_.find(local_id); | |
334 if (it != local_id_to_endpoint_map_.end()) { | |
335 endpoint = it->second; | |
336 state = it->second->state_; | |
337 } | |
338 } | |
339 if (!endpoint.get()) { | |
340 HandleRemoteError(base::StringPrintf( | |
341 "Received a message for nonexistent local destination ID %u", | |
342 static_cast<unsigned>(local_id))); | |
343 // This is strongly indicative of some problem. However, it's not a fatal | |
344 // error, since it may indicate a buggy (or hostile) remote process. Don't | |
345 // die even for Debug builds, since handling this properly needs to be | |
346 // tested (TODO(vtl)). | |
347 DLOG(ERROR) << "This should not happen under normal operation."; | |
348 return; | |
349 } | |
350 | |
351 // Ignore messages for zombie endpoints (not an error). | |
352 if (state != ChannelEndpoint::STATE_NORMAL) { | |
353 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = " | |
354 << local_id << ", remote ID = " << message_view.source_id() << ")"; | |
355 return; | |
356 } | |
357 | |
358 if (!endpoint->OnReadMessage(message_view, platform_handles.Pass())) { | |
359 HandleLocalError( | |
360 base::StringPrintf("Failed to enqueue message to local ID %u", | |
361 static_cast<unsigned>(local_id))); | |
362 return; | |
363 } | |
364 } | |
365 | |
366 void Channel::OnReadMessageForChannel( | |
367 const MessageInTransit::View& message_view, | |
368 embedder::ScopedPlatformHandleVectorPtr platform_handles) { | |
369 DCHECK(creation_thread_checker_.CalledOnValidThread()); | |
370 DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel); | |
371 | |
372 // Currently, no channel messages take platform handles. | |
373 if (platform_handles) { | |
374 HandleRemoteError( | |
375 "Received invalid channel message (has platform handles)"); | |
376 NOTREACHED(); | |
377 return; | |
378 } | |
379 | |
380 switch (message_view.subtype()) { | |
381 case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint: | |
382 DVLOG(2) << "Handling channel message to run message pipe (local ID " | |
383 << message_view.destination_id() << ", remote ID " | |
384 << message_view.source_id() << ")"; | |
385 if (!RunMessagePipeEndpoint(message_view.destination_id(), | |
386 message_view.source_id())) { | |
387 HandleRemoteError( | |
388 "Received invalid channel message to run message pipe"); | |
389 } | |
390 break; | |
391 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint: | |
392 DVLOG(2) << "Handling channel message to remove message pipe (local ID " | |
393 << message_view.destination_id() << ", remote ID " | |
394 << message_view.source_id() << ")"; | |
395 if (!OnRemoveMessagePipeEndpoint(message_view.destination_id(), | |
396 message_view.source_id())) { | |
397 HandleRemoteError( | |
398 "Received invalid channel message to remove message pipe"); | |
399 } | |
400 break; | |
401 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck: | |
402 DVLOG(2) << "Handling channel message to ack remove message pipe (local " | |
403 "ID " << message_view.destination_id() << ", remote ID " | |
404 << message_view.source_id() << ")"; | |
405 if (!OnRemoveMessagePipeEndpointAck(message_view.destination_id())) { | |
406 HandleRemoteError( | |
407 "Received invalid channel message to ack remove message pipe"); | |
408 } | |
409 break; | |
410 default: | |
411 HandleRemoteError("Received invalid channel message"); | |
412 NOTREACHED(); | |
413 break; | |
414 } | |
415 } | |
416 | |
417 bool Channel::OnRemoveMessagePipeEndpoint( | |
418 MessageInTransit::EndpointId local_id, | |
419 MessageInTransit::EndpointId remote_id) { | |
420 DCHECK(creation_thread_checker_.CalledOnValidThread()); | |
421 | |
422 scoped_refptr<ChannelEndpoint> endpoint; | |
423 { | |
424 base::AutoLock locker(lock_); | |
425 | |
426 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id); | |
427 if (it == local_id_to_endpoint_map_.end()) { | |
428 DVLOG(2) << "Remove message pipe endpoint error: not found"; | |
429 return false; | |
430 } | |
431 | |
432 switch (it->second->state_) { | |
433 case ChannelEndpoint::STATE_NORMAL: | |
434 // This is the normal case; we'll proceed on to "wait local detach". | |
435 break; | |
436 | |
437 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH: | |
438 // We can only be in this state because we got a "remove" already, so | |
439 // getting another such message is invalid. | |
440 DVLOG(2) << "Remove message pipe endpoint error: wrong state"; | |
441 return false; | |
442 | |
443 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK: | |
444 // Remove messages "crossed"; we have to wait for the ack. | |
445 return true; | |
446 } | |
447 | |
448 it->second->state_ = ChannelEndpoint::STATE_WAIT_LOCAL_DETACH; | |
449 endpoint = it->second; | |
450 // Send the remove ack message outside the lock. | |
451 } | |
452 | |
453 if (!SendControlMessage( | |
454 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck, | |
455 local_id, | |
456 remote_id)) { | |
457 HandleLocalError(base::StringPrintf( | |
458 "Failed to send message to remove remote message pipe endpoint ack " | |
459 "(local ID %u, remote ID %u)", | |
460 static_cast<unsigned>(local_id), | |
461 static_cast<unsigned>(remote_id))); | |
462 } | |
463 | |
464 endpoint->OnDisconnect(); | |
465 return true; | |
466 } | |
467 | |
468 bool Channel::OnRemoveMessagePipeEndpointAck( | |
469 MessageInTransit::EndpointId local_id) { | |
470 DCHECK(creation_thread_checker_.CalledOnValidThread()); | |
471 | |
472 scoped_refptr<ChannelEndpoint> endpoint; | |
473 { | |
474 base::AutoLock locker(lock_); | |
475 | |
476 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id); | |
477 if (it == local_id_to_endpoint_map_.end()) { | |
478 DVLOG(2) << "Remove message pipe endpoint ack error: not found"; | |
479 return false; | |
480 } | |
481 | |
482 if (it->second->state_ != ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK) { | |
483 DVLOG(2) << "Remove message pipe endpoint ack error: wrong state"; | |
484 return false; | |
485 } | |
486 | |
487 endpoint = it->second; | |
488 local_id_to_endpoint_map_.erase(it); | |
489 // Detach the endpoint outside the lock. | |
490 } | |
491 | |
492 endpoint->DetachFromChannel(); | |
493 return true; | |
494 } | |
495 | |
496 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype, | |
497 MessageInTransit::EndpointId local_id, | |
498 MessageInTransit::EndpointId remote_id) { | |
499 DVLOG(2) << "Sending channel control message: subtype " << subtype | |
500 << ", local ID " << local_id << ", remote ID " << remote_id; | |
501 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
502 MessageInTransit::kTypeChannel, subtype, 0, nullptr)); | |
503 message->set_source_id(local_id); | |
504 message->set_destination_id(remote_id); | |
505 return WriteMessage(message.Pass()); | |
506 } | |
507 | |
508 void Channel::HandleRemoteError(const base::StringPiece& error_message) { | |
509 // TODO(vtl): Is this how we really want to handle this? Probably we want to | |
510 // terminate the connection, since it's spewing invalid stuff. | |
511 LOG(WARNING) << error_message; | |
512 } | |
513 | |
514 void Channel::HandleLocalError(const base::StringPiece& error_message) { | |
515 // TODO(vtl): Is this how we really want to handle this? | |
516 // Sometimes we'll want to propagate the error back to the message pipe | |
517 // (endpoint), and notify it that the remote is (effectively) closed. | |
518 // Sometimes we'll want to kill the channel (and notify all the endpoints that | |
519 // their remotes are dead. | |
520 LOG(WARNING) << error_message; | |
521 } | |
522 | |
523 } // namespace system | |
524 } // namespace mojo | |
OLD | NEW |