Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(484)

Side by Side Diff: mojo/system/channel.cc

Issue 472603002: Mojo: Add the ability to notify a Mojo Channel that it's going to be destroyed soon. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: comment fix Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « mojo/system/channel.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/channel.h" 5 #include "mojo/system/channel.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/compiler_specific.h" 10 #include "base/compiler_specific.h"
(...skipping 18 matching lines...) Expand all
29 } 29 }
30 30
31 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe, 31 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
32 unsigned port) 32 unsigned port)
33 : state(STATE_NORMAL), message_pipe(message_pipe), port(port) { 33 : state(STATE_NORMAL), message_pipe(message_pipe), port(port) {
34 } 34 }
35 35
36 Channel::EndpointInfo::~EndpointInfo() { 36 Channel::EndpointInfo::~EndpointInfo() {
37 } 37 }
38 38
39 Channel::Channel() : is_running_(false), next_local_id_(kBootstrapEndpointId) { 39 Channel::Channel()
40 : is_running_(false),
41 is_shutting_down_(false),
42 next_local_id_(kBootstrapEndpointId) {
40 } 43 }
41 44
42 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) { 45 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
43 DCHECK(creation_thread_checker_.CalledOnValidThread()); 46 DCHECK(creation_thread_checker_.CalledOnValidThread());
44 DCHECK(raw_channel); 47 DCHECK(raw_channel);
45 48
46 // No need to take |lock_|, since this must be called before this object 49 // No need to take |lock_|, since this must be called before this object
47 // becomes thread-safe. 50 // becomes thread-safe.
48 DCHECK(!is_running_no_lock()); 51 DCHECK(!is_running_);
49 raw_channel_ = raw_channel.Pass(); 52 raw_channel_ = raw_channel.Pass();
50 53
51 if (!raw_channel_->Init(this)) { 54 if (!raw_channel_->Init(this)) {
52 raw_channel_.reset(); 55 raw_channel_.reset();
53 return false; 56 return false;
54 } 57 }
55 58
56 is_running_ = true; 59 is_running_ = true;
57 return true; 60 return true;
58 } 61 }
59 62
60 void Channel::Shutdown() { 63 void Channel::Shutdown() {
61 DCHECK(creation_thread_checker_.CalledOnValidThread()); 64 DCHECK(creation_thread_checker_.CalledOnValidThread());
62 65
63 IdToEndpointInfoMap to_destroy; 66 IdToEndpointInfoMap to_destroy;
64 { 67 {
65 base::AutoLock locker(lock_); 68 base::AutoLock locker(lock_);
66 if (!is_running_no_lock()) 69 if (!is_running_)
67 return; 70 return;
68 71
69 // Note: Don't reset |raw_channel_|, in case we're being called from within 72 // Note: Don't reset |raw_channel_|, in case we're being called from within
70 // |OnReadMessage()| or |OnError()|. 73 // |OnReadMessage()| or |OnError()|.
71 raw_channel_->Shutdown(); 74 raw_channel_->Shutdown();
72 is_running_ = false; 75 is_running_ = false;
73 76
74 // We need to deal with it outside the lock. 77 // We need to deal with it outside the lock.
75 std::swap(to_destroy, local_id_to_endpoint_info_map_); 78 std::swap(to_destroy, local_id_to_endpoint_info_map_);
76 } 79 }
77 80
78 size_t num_live = 0; 81 size_t num_live = 0;
79 size_t num_zombies = 0; 82 size_t num_zombies = 0;
80 for (IdToEndpointInfoMap::iterator it = to_destroy.begin(); 83 for (IdToEndpointInfoMap::iterator it = to_destroy.begin();
81 it != to_destroy.end(); 84 it != to_destroy.end();
82 ++it) { 85 ++it) {
83 if (it->second.state == EndpointInfo::STATE_NORMAL) { 86 if (it->second.state == EndpointInfo::STATE_NORMAL) {
84 it->second.message_pipe->OnRemove(it->second.port); 87 it->second.message_pipe->OnRemove(it->second.port);
85 num_live++; 88 num_live++;
86 } else { 89 } else {
87 DCHECK(!it->second.message_pipe); 90 DCHECK(!it->second.message_pipe);
88 num_zombies++; 91 num_zombies++;
89 } 92 }
90 } 93 }
91 DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live 94 DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
92 << " live endpoints and " << num_zombies 95 << " live endpoints and " << num_zombies
93 << " zombies"; 96 << " zombies";
94 } 97 }
95 98
99 void Channel::WillShutdownSoon() {
100 base::AutoLock locker(lock_);
101 is_shutting_down_ = true;
102 }
103
96 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( 104 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
97 scoped_refptr<MessagePipe> message_pipe, 105 scoped_refptr<MessagePipe> message_pipe,
98 unsigned port) { 106 unsigned port) {
99 DCHECK(message_pipe); 107 DCHECK(message_pipe);
100 DCHECK(port == 0 || port == 1); 108 DCHECK(port == 0 || port == 1);
101 109
102 MessageInTransit::EndpointId local_id; 110 MessageInTransit::EndpointId local_id;
103 { 111 {
104 base::AutoLock locker(lock_); 112 base::AutoLock locker(lock_);
105 113
114 DLOG_IF(WARNING, is_shutting_down_)
115 << "AttachMessagePipeEndpoint() while shutting down";
116
106 while (next_local_id_ == MessageInTransit::kInvalidEndpointId || 117 while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
107 local_id_to_endpoint_info_map_.find(next_local_id_) != 118 local_id_to_endpoint_info_map_.find(next_local_id_) !=
108 local_id_to_endpoint_info_map_.end()) 119 local_id_to_endpoint_info_map_.end())
109 next_local_id_++; 120 next_local_id_++;
110 121
111 local_id = next_local_id_; 122 local_id = next_local_id_;
112 next_local_id_++; 123 next_local_id_++;
113 124
114 // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid 125 // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
115 // some expensive reference count increment/decrements.) Once this is done, 126 // some expensive reference count increment/decrements.) Once this is done,
(...skipping 28 matching lines...) Expand all
144 } 155 }
145 return MessageInTransit::kInvalidEndpointId; 156 return MessageInTransit::kInvalidEndpointId;
146 } 157 }
147 158
148 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, 159 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
149 MessageInTransit::EndpointId remote_id) { 160 MessageInTransit::EndpointId remote_id) {
150 EndpointInfo endpoint_info; 161 EndpointInfo endpoint_info;
151 { 162 {
152 base::AutoLock locker(lock_); 163 base::AutoLock locker(lock_);
153 164
165 DLOG_IF(WARNING, is_shutting_down_)
166 << "RunMessagePipeEndpoint() while shutting down";
167
154 IdToEndpointInfoMap::const_iterator it = 168 IdToEndpointInfoMap::const_iterator it =
155 local_id_to_endpoint_info_map_.find(local_id); 169 local_id_to_endpoint_info_map_.find(local_id);
156 if (it == local_id_to_endpoint_info_map_.end()) 170 if (it == local_id_to_endpoint_info_map_.end())
157 return false; 171 return false;
158 endpoint_info = it->second; 172 endpoint_info = it->second;
159 } 173 }
160 174
161 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint| 175 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
162 // and ignore it. 176 // and ignore it.
163 if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { 177 if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
(...skipping 26 matching lines...) Expand all
190 HandleLocalError(base::StringPrintf( 204 HandleLocalError(base::StringPrintf(
191 "Failed to send message to run remote message pipe endpoint (local ID " 205 "Failed to send message to run remote message pipe endpoint (local ID "
192 "%u, remote ID %u)", 206 "%u, remote ID %u)",
193 static_cast<unsigned>(local_id), 207 static_cast<unsigned>(local_id),
194 static_cast<unsigned>(remote_id))); 208 static_cast<unsigned>(remote_id)));
195 } 209 }
196 } 210 }
197 211
198 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) { 212 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
199 base::AutoLock locker(lock_); 213 base::AutoLock locker(lock_);
200 if (!is_running_no_lock()) { 214 if (!is_running_) {
201 // TODO(vtl): I think this is probably not an error condition, but I should 215 // TODO(vtl): I think this is probably not an error condition, but I should
202 // think about it (and the shutdown sequence) more carefully. 216 // think about it (and the shutdown sequence) more carefully.
203 LOG(WARNING) << "WriteMessage() after shutdown"; 217 LOG(WARNING) << "WriteMessage() after shutdown";
204 return false; 218 return false;
205 } 219 }
206 220
221 DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down";
207 return raw_channel_->WriteMessage(message.Pass()); 222 return raw_channel_->WriteMessage(message.Pass());
208 } 223 }
209 224
210 bool Channel::IsWriteBufferEmpty() { 225 bool Channel::IsWriteBufferEmpty() {
211 base::AutoLock locker(lock_); 226 base::AutoLock locker(lock_);
212 if (!is_running_no_lock()) 227 if (!is_running_)
213 return true; 228 return true;
214 return raw_channel_->IsWriteBufferEmpty(); 229 return raw_channel_->IsWriteBufferEmpty();
215 } 230 }
216 231
217 void Channel::DetachMessagePipeEndpoint( 232 void Channel::DetachMessagePipeEndpoint(
218 MessageInTransit::EndpointId local_id, 233 MessageInTransit::EndpointId local_id,
219 MessageInTransit::EndpointId remote_id) { 234 MessageInTransit::EndpointId remote_id) {
220 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); 235 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
221 236
222 bool should_send_remove_message = false; 237 bool should_send_remove_message = false;
223 { 238 {
224 base::AutoLock locker_(lock_); 239 base::AutoLock locker_(lock_);
225 if (!is_running_no_lock()) 240 if (!is_running_)
226 return; 241 return;
227 242
228 IdToEndpointInfoMap::iterator it = 243 IdToEndpointInfoMap::iterator it =
229 local_id_to_endpoint_info_map_.find(local_id); 244 local_id_to_endpoint_info_map_.find(local_id);
230 DCHECK(it != local_id_to_endpoint_info_map_.end()); 245 DCHECK(it != local_id_to_endpoint_info_map_.end());
231 246
232 switch (it->second.state) { 247 switch (it->second.state) {
233 case EndpointInfo::STATE_NORMAL: 248 case EndpointInfo::STATE_NORMAL:
234 it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK; 249 it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
235 it->second.message_pipe = NULL; 250 it->second.message_pipe = NULL;
(...skipping 25 matching lines...) Expand all
261 static_cast<unsigned>(remote_id))); 276 static_cast<unsigned>(remote_id)));
262 } 277 }
263 } 278 }
264 279
265 size_t Channel::GetSerializedPlatformHandleSize() const { 280 size_t Channel::GetSerializedPlatformHandleSize() const {
266 return raw_channel_->GetSerializedPlatformHandleSize(); 281 return raw_channel_->GetSerializedPlatformHandleSize();
267 } 282 }
268 283
269 Channel::~Channel() { 284 Channel::~Channel() {
270 // The channel should have been shut down first. 285 // The channel should have been shut down first.
271 DCHECK(!is_running_no_lock()); 286 DCHECK(!is_running_);
272 } 287 }
273 288
274 void Channel::OnReadMessage( 289 void Channel::OnReadMessage(
275 const MessageInTransit::View& message_view, 290 const MessageInTransit::View& message_view,
276 embedder::ScopedPlatformHandleVectorPtr platform_handles) { 291 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
277 switch (message_view.type()) { 292 switch (message_view.type()) {
278 case MessageInTransit::kTypeMessagePipeEndpoint: 293 case MessageInTransit::kTypeMessagePipeEndpoint:
279 case MessageInTransit::kTypeMessagePipe: 294 case MessageInTransit::kTypeMessagePipe:
280 OnReadMessageForDownstream(message_view, platform_handles.Pass()); 295 OnReadMessageForDownstream(message_view, platform_handles.Pass());
281 break; 296 break;
282 case MessageInTransit::kTypeChannel: 297 case MessageInTransit::kTypeChannel:
283 OnReadMessageForChannel(message_view, platform_handles.Pass()); 298 OnReadMessageForChannel(message_view, platform_handles.Pass());
284 break; 299 break;
285 default: 300 default:
286 HandleRemoteError( 301 HandleRemoteError(
287 base::StringPrintf("Received message of invalid type %u", 302 base::StringPrintf("Received message of invalid type %u",
288 static_cast<unsigned>(message_view.type()))); 303 static_cast<unsigned>(message_view.type())));
289 break; 304 break;
290 } 305 }
291 } 306 }
292 307
293 void Channel::OnError(Error error) { 308 void Channel::OnError(Error error) {
294 switch (error) { 309 switch (error) {
295 case ERROR_READ_SHUTDOWN: 310 case ERROR_READ_SHUTDOWN:
296 // The other side was cleanly closed, so this isn't actually an error. 311 // The other side was cleanly closed, so this isn't actually an error.
297 DVLOG(1) << "RawChannel read error (shutdown)"; 312 DVLOG(1) << "RawChannel read error (shutdown)";
298 break; 313 break;
299 case ERROR_READ_BROKEN: 314 case ERROR_READ_BROKEN: {
300 LOG(ERROR) << "RawChannel read error (connection broken)"; 315 base::AutoLock locker(lock_);
316 LOG_IF(ERROR, !is_shutting_down_)
317 << "RawChannel read error (connection broken)";
301 break; 318 break;
319 }
302 case ERROR_READ_BAD_MESSAGE: 320 case ERROR_READ_BAD_MESSAGE:
303 // Receiving a bad message means either a bug, data corruption, or 321 // Receiving a bad message means either a bug, data corruption, or
304 // malicious attack (probably due to some other bug). 322 // malicious attack (probably due to some other bug).
305 LOG(ERROR) << "RawChannel read error (received bad message)"; 323 LOG(ERROR) << "RawChannel read error (received bad message)";
306 break; 324 break;
307 case ERROR_READ_UNKNOWN: 325 case ERROR_READ_UNKNOWN:
308 LOG(ERROR) << "RawChannel read error (unknown)"; 326 LOG(ERROR) << "RawChannel read error (unknown)";
309 break; 327 break;
310 case ERROR_WRITE: 328 case ERROR_WRITE:
311 // Write errors are slightly notable: they probably shouldn't happen under 329 // Write errors are slightly notable: they probably shouldn't happen under
(...skipping 16 matching lines...) Expand all
328 return; 346 return;
329 } 347 }
330 348
331 EndpointInfo endpoint_info; 349 EndpointInfo endpoint_info;
332 { 350 {
333 base::AutoLock locker(lock_); 351 base::AutoLock locker(lock_);
334 352
335 // Since we own |raw_channel_|, and this method and |Shutdown()| should only 353 // Since we own |raw_channel_|, and this method and |Shutdown()| should only
336 // be called from the creation thread, |raw_channel_| should never be null 354 // be called from the creation thread, |raw_channel_| should never be null
337 // here. 355 // here.
338 DCHECK(is_running_no_lock()); 356 DCHECK(is_running_);
339 357
340 IdToEndpointInfoMap::const_iterator it = 358 IdToEndpointInfoMap::const_iterator it =
341 local_id_to_endpoint_info_map_.find(local_id); 359 local_id_to_endpoint_info_map_.find(local_id);
342 if (it == local_id_to_endpoint_info_map_.end()) { 360 if (it == local_id_to_endpoint_info_map_.end()) {
343 HandleRemoteError(base::StringPrintf( 361 HandleRemoteError(base::StringPrintf(
344 "Received a message for nonexistent local destination ID %u", 362 "Received a message for nonexistent local destination ID %u",
345 static_cast<unsigned>(local_id))); 363 static_cast<unsigned>(local_id)));
346 // This is strongly indicative of some problem. However, it's not a fatal 364 // This is strongly indicative of some problem. However, it's not a fatal
347 // error, since it may indicate a bug (or hostile) remote process. Don't 365 // error, since it may indicate a bug (or hostile) remote process. Don't
348 // die even for Debug builds, since handling this properly needs to be 366 // die even for Debug builds, since handling this properly needs to be
(...skipping 156 matching lines...) Expand 10 before | Expand all | Expand 10 after
505 // TODO(vtl): Is this how we really want to handle this? 523 // TODO(vtl): Is this how we really want to handle this?
506 // Sometimes we'll want to propagate the error back to the message pipe 524 // Sometimes we'll want to propagate the error back to the message pipe
507 // (endpoint), and notify it that the remote is (effectively) closed. 525 // (endpoint), and notify it that the remote is (effectively) closed.
508 // Sometimes we'll want to kill the channel (and notify all the endpoints that 526 // Sometimes we'll want to kill the channel (and notify all the endpoints that
509 // their remotes are dead. 527 // their remotes are dead.
510 LOG(WARNING) << error_message; 528 LOG(WARNING) << error_message;
511 } 529 }
512 530
513 } // namespace system 531 } // namespace system
514 } // namespace mojo 532 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/channel.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698