Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(347)

Side by Side Diff: mojo/system/channel.cc

Issue 541233002: Mojo: Factor Channel::EndpointInfo out to ChannelEndpoint. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: stupid msvs Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/system/channel.h ('k') | mojo/system/channel_endpoint.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/channel.h" 5 #include "mojo/system/channel.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/compiler_specific.h" 10 #include "base/compiler_specific.h"
11 #include "base/logging.h" 11 #include "base/logging.h"
12 #include "base/macros.h" 12 #include "base/macros.h"
13 #include "base/strings/stringprintf.h" 13 #include "base/strings/stringprintf.h"
14 #include "mojo/embedder/platform_handle_vector.h" 14 #include "mojo/embedder/platform_handle_vector.h"
15 #include "mojo/system/message_pipe_endpoint.h" 15 #include "mojo/system/message_pipe_endpoint.h"
16 #include "mojo/system/transport_data.h" 16 #include "mojo/system/transport_data.h"
17 17
18 namespace mojo { 18 namespace mojo {
19 namespace system { 19 namespace system {
20 20
21 COMPILE_ASSERT(Channel::kBootstrapEndpointId != 21 COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
22 MessageInTransit::kInvalidEndpointId, 22 MessageInTransit::kInvalidEndpointId,
23 kBootstrapEndpointId_is_invalid); 23 kBootstrapEndpointId_is_invalid);
24 24
25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId 25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
26 Channel::kBootstrapEndpointId; 26 Channel::kBootstrapEndpointId;
27 27
28 Channel::EndpointInfo::EndpointInfo() : state(STATE_NORMAL), port() {
29 }
30
31 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
32 unsigned port)
33 : state(STATE_NORMAL), message_pipe(message_pipe), port(port) {
34 }
35
36 Channel::EndpointInfo::~EndpointInfo() {
37 }
38
39 Channel::Channel(embedder::PlatformSupport* platform_support) 28 Channel::Channel(embedder::PlatformSupport* platform_support)
40 : platform_support_(platform_support), 29 : platform_support_(platform_support),
41 is_running_(false), 30 is_running_(false),
42 is_shutting_down_(false), 31 is_shutting_down_(false),
43 next_local_id_(kBootstrapEndpointId) { 32 next_local_id_(kBootstrapEndpointId) {
44 } 33 }
45 34
46 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) { 35 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
47 DCHECK(creation_thread_checker_.CalledOnValidThread()); 36 DCHECK(creation_thread_checker_.CalledOnValidThread());
48 DCHECK(raw_channel); 37 DCHECK(raw_channel);
49 38
50 // No need to take |lock_|, since this must be called before this object 39 // No need to take |lock_|, since this must be called before this object
51 // becomes thread-safe. 40 // becomes thread-safe.
52 DCHECK(!is_running_); 41 DCHECK(!is_running_);
53 raw_channel_ = raw_channel.Pass(); 42 raw_channel_ = raw_channel.Pass();
54 43
55 if (!raw_channel_->Init(this)) { 44 if (!raw_channel_->Init(this)) {
56 raw_channel_.reset(); 45 raw_channel_.reset();
57 return false; 46 return false;
58 } 47 }
59 48
60 is_running_ = true; 49 is_running_ = true;
61 return true; 50 return true;
62 } 51 }
63 52
64 void Channel::Shutdown() { 53 void Channel::Shutdown() {
65 DCHECK(creation_thread_checker_.CalledOnValidThread()); 54 DCHECK(creation_thread_checker_.CalledOnValidThread());
66 55
67 IdToEndpointInfoMap to_destroy; 56 IdToEndpointMap to_destroy;
68 { 57 {
69 base::AutoLock locker(lock_); 58 base::AutoLock locker(lock_);
70 if (!is_running_) 59 if (!is_running_)
71 return; 60 return;
72 61
73 // Note: Don't reset |raw_channel_|, in case we're being called from within 62 // Note: Don't reset |raw_channel_|, in case we're being called from within
74 // |OnReadMessage()| or |OnError()|. 63 // |OnReadMessage()| or |OnError()|.
75 raw_channel_->Shutdown(); 64 raw_channel_->Shutdown();
76 is_running_ = false; 65 is_running_ = false;
77 66
78 // We need to deal with it outside the lock. 67 // We need to deal with it outside the lock.
79 std::swap(to_destroy, local_id_to_endpoint_info_map_); 68 std::swap(to_destroy, local_id_to_endpoint_map_);
80 } 69 }
81 70
82 size_t num_live = 0; 71 size_t num_live = 0;
83 size_t num_zombies = 0; 72 size_t num_zombies = 0;
84 for (IdToEndpointInfoMap::iterator it = to_destroy.begin(); 73 for (IdToEndpointMap::iterator it = to_destroy.begin();
85 it != to_destroy.end(); 74 it != to_destroy.end();
86 ++it) { 75 ++it) {
87 if (it->second.state == EndpointInfo::STATE_NORMAL) { 76 if (it->second->state_ == ChannelEndpoint::STATE_NORMAL) {
88 it->second.message_pipe->OnRemove(it->second.port); 77 it->second->message_pipe_->OnRemove(it->second->port_);
89 num_live++; 78 num_live++;
90 } else { 79 } else {
91 DCHECK(!it->second.message_pipe.get()); 80 DCHECK(!it->second->message_pipe_.get());
92 num_zombies++; 81 num_zombies++;
93 } 82 }
94 } 83 }
95 DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live 84 DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
96 << " live endpoints and " << num_zombies 85 << " live endpoints and " << num_zombies
97 << " zombies"; 86 << " zombies";
98 } 87 }
99 88
100 void Channel::WillShutdownSoon() { 89 void Channel::WillShutdownSoon() {
101 base::AutoLock locker(lock_); 90 base::AutoLock locker(lock_);
102 is_shutting_down_ = true; 91 is_shutting_down_ = true;
103 } 92 }
104 93
105 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( 94 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
106 scoped_refptr<MessagePipe> message_pipe, 95 scoped_refptr<MessagePipe> message_pipe,
107 unsigned port) { 96 unsigned port) {
108 DCHECK(message_pipe.get()); 97 DCHECK(message_pipe.get());
109 DCHECK(port == 0 || port == 1); 98 DCHECK(port == 0 || port == 1);
110 99
111 MessageInTransit::EndpointId local_id; 100 MessageInTransit::EndpointId local_id;
112 { 101 {
113 base::AutoLock locker(lock_); 102 base::AutoLock locker(lock_);
114 103
115 DLOG_IF(WARNING, is_shutting_down_) 104 DLOG_IF(WARNING, is_shutting_down_)
116 << "AttachMessagePipeEndpoint() while shutting down"; 105 << "AttachMessagePipeEndpoint() while shutting down";
117 106
118 while (next_local_id_ == MessageInTransit::kInvalidEndpointId || 107 while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
119 local_id_to_endpoint_info_map_.find(next_local_id_) != 108 local_id_to_endpoint_map_.find(next_local_id_) !=
120 local_id_to_endpoint_info_map_.end()) 109 local_id_to_endpoint_map_.end())
121 next_local_id_++; 110 next_local_id_++;
122 111
123 local_id = next_local_id_; 112 local_id = next_local_id_;
124 next_local_id_++; 113 next_local_id_++;
125 114 local_id_to_endpoint_map_[local_id] =
126 // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid 115 new ChannelEndpoint(message_pipe, port);
127 // some expensive reference count increment/decrements.) Once this is done,
128 // we should be able to delete |EndpointInfo|'s default constructor.
129 local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
130 } 116 }
131 117
132 // This might fail if that port got an |OnPeerClose()| before attaching. 118 // This might fail if that port got an |OnPeerClose()| before attaching.
133 if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id)) 119 if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id))
134 return local_id; 120 return local_id;
135 121
136 // Note: If it failed, quite possibly the endpoint info was removed from that 122 // Note: If it failed, quite possibly the endpoint info was removed from that
137 // map (there's a race between us adding it to the map above and calling 123 // map (there's a race between us adding it to the map above and calling
138 // |Attach()|). And even if an entry exists for |local_id|, we need to check 124 // |Attach()|). And even if an entry exists for |local_id|, we need to check
139 // that it's the one we added (and not some other one that was added since). 125 // that it's the one we added (and not some other one that was added since).
140 { 126 {
141 base::AutoLock locker(lock_); 127 base::AutoLock locker(lock_);
142 IdToEndpointInfoMap::iterator it = 128 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
143 local_id_to_endpoint_info_map_.find(local_id); 129 if (it != local_id_to_endpoint_map_.end() &&
144 if (it != local_id_to_endpoint_info_map_.end() && 130 it->second->message_pipe_.get() == message_pipe.get() &&
145 it->second.message_pipe.get() == message_pipe.get() && 131 it->second->port_ == port) {
146 it->second.port == port) { 132 DCHECK_EQ(it->second->state_, ChannelEndpoint::STATE_NORMAL);
147 DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL);
148 // TODO(vtl): FIXME -- This is wrong. We need to specify (to 133 // TODO(vtl): FIXME -- This is wrong. We need to specify (to
149 // |AttachMessagePipeEndpoint()| who's going to be responsible for calling 134 // |AttachMessagePipeEndpoint()| who's going to be responsible for calling
150 // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a 135 // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a
151 // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to 136 // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to
152 // run, then we'll get messages to an "invalid" local ID (for running, for 137 // run, then we'll get messages to an "invalid" local ID (for running, for
153 // removal). 138 // removal).
154 local_id_to_endpoint_info_map_.erase(it); 139 local_id_to_endpoint_map_.erase(it);
155 } 140 }
156 } 141 }
157 return MessageInTransit::kInvalidEndpointId; 142 return MessageInTransit::kInvalidEndpointId;
158 } 143 }
159 144
160 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, 145 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
161 MessageInTransit::EndpointId remote_id) { 146 MessageInTransit::EndpointId remote_id) {
162 EndpointInfo endpoint_info; 147 ChannelEndpoint::State state;
148 scoped_refptr<MessagePipe> message_pipe;
149 unsigned port;
163 { 150 {
164 base::AutoLock locker(lock_); 151 base::AutoLock locker(lock_);
165 152
166 DLOG_IF(WARNING, is_shutting_down_) 153 DLOG_IF(WARNING, is_shutting_down_)
167 << "RunMessagePipeEndpoint() while shutting down"; 154 << "RunMessagePipeEndpoint() while shutting down";
168 155
169 IdToEndpointInfoMap::const_iterator it = 156 IdToEndpointMap::const_iterator it =
170 local_id_to_endpoint_info_map_.find(local_id); 157 local_id_to_endpoint_map_.find(local_id);
171 if (it == local_id_to_endpoint_info_map_.end()) 158 if (it == local_id_to_endpoint_map_.end())
172 return false; 159 return false;
173 endpoint_info = it->second; 160 state = it->second->state_;
161 message_pipe = it->second->message_pipe_;
162 port = it->second->port_;
174 } 163 }
175 164
176 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint| 165 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
177 // and ignore it. 166 // and ignore it.
178 if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { 167 if (state != ChannelEndpoint::STATE_NORMAL) {
179 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint " 168 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
180 "(local ID " << local_id << ", remote ID " << remote_id << ")"; 169 "(local ID " << local_id << ", remote ID " << remote_id << ")";
181 return true; 170 return true;
182 } 171 }
183 172
184 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already 173 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
185 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|). 174 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
186 endpoint_info.message_pipe->Run(endpoint_info.port, remote_id); 175 message_pipe->Run(port, remote_id);
187 return true; 176 return true;
188 } 177 }
189 178
190 void Channel::RunRemoteMessagePipeEndpoint( 179 void Channel::RunRemoteMessagePipeEndpoint(
191 MessageInTransit::EndpointId local_id, 180 MessageInTransit::EndpointId local_id,
192 MessageInTransit::EndpointId remote_id) { 181 MessageInTransit::EndpointId remote_id) {
193 #if DCHECK_IS_ON 182 #if DCHECK_IS_ON
194 { 183 {
195 base::AutoLock locker(lock_); 184 base::AutoLock locker(lock_);
196 DCHECK(local_id_to_endpoint_info_map_.find(local_id) != 185 DCHECK(local_id_to_endpoint_map_.find(local_id) !=
197 local_id_to_endpoint_info_map_.end()); 186 local_id_to_endpoint_map_.end());
198 } 187 }
199 #endif 188 #endif
200 189
201 if (!SendControlMessage( 190 if (!SendControlMessage(
202 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint, 191 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
203 local_id, 192 local_id,
204 remote_id)) { 193 remote_id)) {
205 HandleLocalError(base::StringPrintf( 194 HandleLocalError(base::StringPrintf(
206 "Failed to send message to run remote message pipe endpoint (local ID " 195 "Failed to send message to run remote message pipe endpoint (local ID "
207 "%u, remote ID %u)", 196 "%u, remote ID %u)",
(...skipping 26 matching lines...) Expand all
234 MessageInTransit::EndpointId local_id, 223 MessageInTransit::EndpointId local_id,
235 MessageInTransit::EndpointId remote_id) { 224 MessageInTransit::EndpointId remote_id) {
236 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); 225 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
237 226
238 bool should_send_remove_message = false; 227 bool should_send_remove_message = false;
239 { 228 {
240 base::AutoLock locker_(lock_); 229 base::AutoLock locker_(lock_);
241 if (!is_running_) 230 if (!is_running_)
242 return; 231 return;
243 232
244 IdToEndpointInfoMap::iterator it = 233 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
245 local_id_to_endpoint_info_map_.find(local_id); 234 DCHECK(it != local_id_to_endpoint_map_.end());
246 DCHECK(it != local_id_to_endpoint_info_map_.end());
247 235
248 switch (it->second.state) { 236 switch (it->second->state_) {
249 case EndpointInfo::STATE_NORMAL: 237 case ChannelEndpoint::STATE_NORMAL:
250 it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK; 238 it->second->state_ = ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK;
251 it->second.message_pipe = NULL; 239 it->second->message_pipe_ = NULL;
252 should_send_remove_message = 240 should_send_remove_message =
253 (remote_id != MessageInTransit::kInvalidEndpointId); 241 (remote_id != MessageInTransit::kInvalidEndpointId);
254 break; 242 break;
255 case EndpointInfo::STATE_WAIT_LOCAL_DETACH: 243 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
256 local_id_to_endpoint_info_map_.erase(it); 244 local_id_to_endpoint_map_.erase(it);
257 break; 245 break;
258 case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK: 246 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
259 NOTREACHED(); 247 NOTREACHED();
260 break; 248 break;
261 } 249 }
262 } 250 }
263 if (!should_send_remove_message) 251 if (!should_send_remove_message)
264 return; 252 return;
265 253
266 if (!SendControlMessage( 254 if (!SendControlMessage(
267 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint, 255 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
268 local_id, 256 local_id,
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
337 embedder::ScopedPlatformHandleVectorPtr platform_handles) { 325 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
338 DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint || 326 DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
339 message_view.type() == MessageInTransit::kTypeMessagePipe); 327 message_view.type() == MessageInTransit::kTypeMessagePipe);
340 328
341 MessageInTransit::EndpointId local_id = message_view.destination_id(); 329 MessageInTransit::EndpointId local_id = message_view.destination_id();
342 if (local_id == MessageInTransit::kInvalidEndpointId) { 330 if (local_id == MessageInTransit::kInvalidEndpointId) {
343 HandleRemoteError("Received message with no destination ID"); 331 HandleRemoteError("Received message with no destination ID");
344 return; 332 return;
345 } 333 }
346 334
347 EndpointInfo endpoint_info; 335 ChannelEndpoint::State state = ChannelEndpoint::STATE_NORMAL;
336 scoped_refptr<MessagePipe> message_pipe;
337 unsigned port = ~0u;
348 bool nonexistent_local_id_error = false; 338 bool nonexistent_local_id_error = false;
349 { 339 {
350 base::AutoLock locker(lock_); 340 base::AutoLock locker(lock_);
351 341
352 // Since we own |raw_channel_|, and this method and |Shutdown()| should only 342 // Since we own |raw_channel_|, and this method and |Shutdown()| should only
353 // be called from the creation thread, |raw_channel_| should never be null 343 // be called from the creation thread, |raw_channel_| should never be null
354 // here. 344 // here.
355 DCHECK(is_running_); 345 DCHECK(is_running_);
356 346
357 IdToEndpointInfoMap::const_iterator it = 347 IdToEndpointMap::const_iterator it =
358 local_id_to_endpoint_info_map_.find(local_id); 348 local_id_to_endpoint_map_.find(local_id);
359 if (it == local_id_to_endpoint_info_map_.end()) 349 if (it == local_id_to_endpoint_map_.end()) {
360 nonexistent_local_id_error = true; 350 nonexistent_local_id_error = true;
361 else 351 } else {
362 endpoint_info = it->second; 352 state = it->second->state_;
353 message_pipe = it->second->message_pipe_;
354 port = it->second->port_;
355 }
363 } 356 }
364 if (nonexistent_local_id_error) { 357 if (nonexistent_local_id_error) {
365 HandleRemoteError(base::StringPrintf( 358 HandleRemoteError(base::StringPrintf(
366 "Received a message for nonexistent local destination ID %u", 359 "Received a message for nonexistent local destination ID %u",
367 static_cast<unsigned>(local_id))); 360 static_cast<unsigned>(local_id)));
368 // This is strongly indicative of some problem. However, it's not a fatal 361 // This is strongly indicative of some problem. However, it's not a fatal
369 // error, since it may indicate a buggy (or hostile) remote process. Don't 362 // error, since it may indicate a buggy (or hostile) remote process. Don't
370 // die even for Debug builds, since handling this properly needs to be 363 // die even for Debug builds, since handling this properly needs to be
371 // tested (TODO(vtl)). 364 // tested (TODO(vtl)).
372 DLOG(ERROR) << "This should not happen under normal operation."; 365 DLOG(ERROR) << "This should not happen under normal operation.";
373 return; 366 return;
374 } 367 }
375 368
376 // Ignore messages for zombie endpoints (not an error). 369 // Ignore messages for zombie endpoints (not an error).
377 if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { 370 if (state != ChannelEndpoint::STATE_NORMAL) {
378 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = " 371 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
379 << local_id << ", remote ID = " << message_view.source_id() << ")"; 372 << local_id << ", remote ID = " << message_view.source_id() << ")";
380 return; 373 return;
381 } 374 }
382 375
383 // We need to duplicate the message (data), because |EnqueueMessage()| will 376 // We need to duplicate the message (data), because |EnqueueMessage()| will
384 // take ownership of it. 377 // take ownership of it.
385 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); 378 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
386 if (message_view.transport_data_buffer_size() > 0) { 379 if (message_view.transport_data_buffer_size() > 0) {
387 DCHECK(message_view.transport_data_buffer()); 380 DCHECK(message_view.transport_data_buffer());
388 message->SetDispatchers(TransportData::DeserializeDispatchers( 381 message->SetDispatchers(TransportData::DeserializeDispatchers(
389 message_view.transport_data_buffer(), 382 message_view.transport_data_buffer(),
390 message_view.transport_data_buffer_size(), 383 message_view.transport_data_buffer_size(),
391 platform_handles.Pass(), 384 platform_handles.Pass(),
392 this)); 385 this));
393 } 386 }
394 MojoResult result = endpoint_info.message_pipe->EnqueueMessage( 387 MojoResult result = message_pipe->EnqueueMessage(
395 MessagePipe::GetPeerPort(endpoint_info.port), message.Pass()); 388 MessagePipe::GetPeerPort(port), message.Pass());
396 if (result != MOJO_RESULT_OK) { 389 if (result != MOJO_RESULT_OK) {
397 // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint 390 // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
398 // has been closed (in an unavoidable race). This might also be a "remote" 391 // has been closed (in an unavoidable race). This might also be a "remote"
399 // error, e.g., if the remote side is sending invalid control messages (to 392 // error, e.g., if the remote side is sending invalid control messages (to
400 // the message pipe). 393 // the message pipe).
401 HandleLocalError(base::StringPrintf( 394 HandleLocalError(base::StringPrintf(
402 "Failed to enqueue message to local ID %u (result %d)", 395 "Failed to enqueue message to local ID %u (result %d)",
403 static_cast<unsigned>(local_id), 396 static_cast<unsigned>(local_id),
404 static_cast<int>(result))); 397 static_cast<int>(result)));
405 return; 398 return;
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
453 default: 446 default:
454 HandleRemoteError("Received invalid channel message"); 447 HandleRemoteError("Received invalid channel message");
455 NOTREACHED(); 448 NOTREACHED();
456 break; 449 break;
457 } 450 }
458 } 451 }
459 452
460 bool Channel::RemoveMessagePipeEndpoint( 453 bool Channel::RemoveMessagePipeEndpoint(
461 MessageInTransit::EndpointId local_id, 454 MessageInTransit::EndpointId local_id,
462 MessageInTransit::EndpointId remote_id) { 455 MessageInTransit::EndpointId remote_id) {
463 EndpointInfo endpoint_info; 456 scoped_refptr<MessagePipe> message_pipe;
457 unsigned port;
464 { 458 {
465 base::AutoLock locker(lock_); 459 base::AutoLock locker(lock_);
466 460
467 IdToEndpointInfoMap::iterator it = 461 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
468 local_id_to_endpoint_info_map_.find(local_id); 462 if (it == local_id_to_endpoint_map_.end()) {
469 if (it == local_id_to_endpoint_info_map_.end()) {
470 DVLOG(2) << "Remove message pipe error: not found"; 463 DVLOG(2) << "Remove message pipe error: not found";
471 return false; 464 return false;
472 } 465 }
473 466
474 // If it's waiting for the remove ack, just do it and return. 467 // If it's waiting for the remove ack, just do it and return.
475 if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) { 468 if (it->second->state_ == ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK) {
476 local_id_to_endpoint_info_map_.erase(it); 469 local_id_to_endpoint_map_.erase(it);
477 return true; 470 return true;
478 } 471 }
479 472
480 if (it->second.state != EndpointInfo::STATE_NORMAL) { 473 if (it->second->state_ != ChannelEndpoint::STATE_NORMAL) {
481 DVLOG(2) << "Remove message pipe error: wrong state"; 474 DVLOG(2) << "Remove message pipe error: wrong state";
482 return false; 475 return false;
483 } 476 }
484 477
485 it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH; 478 it->second->state_ = ChannelEndpoint::STATE_WAIT_LOCAL_DETACH;
486 endpoint_info = it->second; 479 message_pipe = it->second->message_pipe_;
487 it->second.message_pipe = NULL; 480 port = it->second->port_;
481 it->second->message_pipe_ = NULL;
488 } 482 }
489 483
490 if (!SendControlMessage( 484 if (!SendControlMessage(
491 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck, 485 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
492 local_id, 486 local_id,
493 remote_id)) { 487 remote_id)) {
494 HandleLocalError(base::StringPrintf( 488 HandleLocalError(base::StringPrintf(
495 "Failed to send message to remove remote message pipe endpoint ack " 489 "Failed to send message to remove remote message pipe endpoint ack "
496 "(local ID %u, remote ID %u)", 490 "(local ID %u, remote ID %u)",
497 static_cast<unsigned>(local_id), 491 static_cast<unsigned>(local_id),
498 static_cast<unsigned>(remote_id))); 492 static_cast<unsigned>(remote_id)));
499 } 493 }
500 494
501 endpoint_info.message_pipe->OnRemove(endpoint_info.port); 495 message_pipe->OnRemove(port);
502 496
503 return true; 497 return true;
504 } 498 }
505 499
506 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype, 500 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
507 MessageInTransit::EndpointId local_id, 501 MessageInTransit::EndpointId local_id,
508 MessageInTransit::EndpointId remote_id) { 502 MessageInTransit::EndpointId remote_id) {
509 DVLOG(2) << "Sending channel control message: subtype " << subtype 503 DVLOG(2) << "Sending channel control message: subtype " << subtype
510 << ", local ID " << local_id << ", remote ID " << remote_id; 504 << ", local ID " << local_id << ", remote ID " << remote_id;
511 scoped_ptr<MessageInTransit> message( 505 scoped_ptr<MessageInTransit> message(
(...skipping 13 matching lines...) Expand all
525 // TODO(vtl): Is this how we really want to handle this? 519 // TODO(vtl): Is this how we really want to handle this?
526 // Sometimes we'll want to propagate the error back to the message pipe 520 // Sometimes we'll want to propagate the error back to the message pipe
527 // (endpoint), and notify it that the remote is (effectively) closed. 521 // (endpoint), and notify it that the remote is (effectively) closed.
528 // Sometimes we'll want to kill the channel (and notify all the endpoints that 522 // Sometimes we'll want to kill the channel (and notify all the endpoints that
529 // their remotes are dead. 523 // their remotes are dead.
530 LOG(WARNING) << error_message; 524 LOG(WARNING) << error_message;
531 } 525 }
532 526
533 } // namespace system 527 } // namespace system
534 } // namespace mojo 528 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/channel.h ('k') | mojo/system/channel_endpoint.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698