OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/system/channel.h" | 5 #include "mojo/system/channel.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/compiler_specific.h" | 10 #include "base/compiler_specific.h" |
11 #include "base/logging.h" | 11 #include "base/logging.h" |
12 #include "base/macros.h" | 12 #include "base/macros.h" |
13 #include "base/strings/stringprintf.h" | 13 #include "base/strings/stringprintf.h" |
14 #include "mojo/embedder/platform_handle_vector.h" | 14 #include "mojo/embedder/platform_handle_vector.h" |
15 #include "mojo/system/message_pipe_endpoint.h" | 15 #include "mojo/system/message_pipe_endpoint.h" |
16 #include "mojo/system/transport_data.h" | 16 #include "mojo/system/transport_data.h" |
17 | 17 |
18 namespace mojo { | 18 namespace mojo { |
19 namespace system { | 19 namespace system { |
20 | 20 |
21 COMPILE_ASSERT(Channel::kBootstrapEndpointId != | 21 COMPILE_ASSERT(Channel::kBootstrapEndpointId != |
22 MessageInTransit::kInvalidEndpointId, | 22 MessageInTransit::kInvalidEndpointId, |
23 kBootstrapEndpointId_is_invalid); | 23 kBootstrapEndpointId_is_invalid); |
24 | 24 |
25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId | 25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId |
26 Channel::kBootstrapEndpointId; | 26 Channel::kBootstrapEndpointId; |
27 | 27 |
28 Channel::EndpointInfo::EndpointInfo() : state(STATE_NORMAL), port() { | |
29 } | |
30 | |
31 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe, | |
32 unsigned port) | |
33 : state(STATE_NORMAL), message_pipe(message_pipe), port(port) { | |
34 } | |
35 | |
36 Channel::EndpointInfo::~EndpointInfo() { | |
37 } | |
38 | |
39 Channel::Channel(embedder::PlatformSupport* platform_support) | 28 Channel::Channel(embedder::PlatformSupport* platform_support) |
40 : platform_support_(platform_support), | 29 : platform_support_(platform_support), |
41 is_running_(false), | 30 is_running_(false), |
42 is_shutting_down_(false), | 31 is_shutting_down_(false), |
43 next_local_id_(kBootstrapEndpointId) { | 32 next_local_id_(kBootstrapEndpointId) { |
44 } | 33 } |
45 | 34 |
46 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) { | 35 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) { |
47 DCHECK(creation_thread_checker_.CalledOnValidThread()); | 36 DCHECK(creation_thread_checker_.CalledOnValidThread()); |
48 DCHECK(raw_channel); | 37 DCHECK(raw_channel); |
49 | 38 |
50 // No need to take |lock_|, since this must be called before this object | 39 // No need to take |lock_|, since this must be called before this object |
51 // becomes thread-safe. | 40 // becomes thread-safe. |
52 DCHECK(!is_running_); | 41 DCHECK(!is_running_); |
53 raw_channel_ = raw_channel.Pass(); | 42 raw_channel_ = raw_channel.Pass(); |
54 | 43 |
55 if (!raw_channel_->Init(this)) { | 44 if (!raw_channel_->Init(this)) { |
56 raw_channel_.reset(); | 45 raw_channel_.reset(); |
57 return false; | 46 return false; |
58 } | 47 } |
59 | 48 |
60 is_running_ = true; | 49 is_running_ = true; |
61 return true; | 50 return true; |
62 } | 51 } |
63 | 52 |
64 void Channel::Shutdown() { | 53 void Channel::Shutdown() { |
65 DCHECK(creation_thread_checker_.CalledOnValidThread()); | 54 DCHECK(creation_thread_checker_.CalledOnValidThread()); |
66 | 55 |
67 IdToEndpointInfoMap to_destroy; | 56 IdToEndpointMap to_destroy; |
68 { | 57 { |
69 base::AutoLock locker(lock_); | 58 base::AutoLock locker(lock_); |
70 if (!is_running_) | 59 if (!is_running_) |
71 return; | 60 return; |
72 | 61 |
73 // Note: Don't reset |raw_channel_|, in case we're being called from within | 62 // Note: Don't reset |raw_channel_|, in case we're being called from within |
74 // |OnReadMessage()| or |OnError()|. | 63 // |OnReadMessage()| or |OnError()|. |
75 raw_channel_->Shutdown(); | 64 raw_channel_->Shutdown(); |
76 is_running_ = false; | 65 is_running_ = false; |
77 | 66 |
78 // We need to deal with it outside the lock. | 67 // We need to deal with it outside the lock. |
79 std::swap(to_destroy, local_id_to_endpoint_info_map_); | 68 std::swap(to_destroy, local_id_to_endpoint_map_); |
80 } | 69 } |
81 | 70 |
82 size_t num_live = 0; | 71 size_t num_live = 0; |
83 size_t num_zombies = 0; | 72 size_t num_zombies = 0; |
84 for (IdToEndpointInfoMap::iterator it = to_destroy.begin(); | 73 for (IdToEndpointMap::iterator it = to_destroy.begin(); |
85 it != to_destroy.end(); | 74 it != to_destroy.end(); |
86 ++it) { | 75 ++it) { |
87 if (it->second.state == EndpointInfo::STATE_NORMAL) { | 76 if (it->second->state_ == ChannelEndpoint::STATE_NORMAL) { |
88 it->second.message_pipe->OnRemove(it->second.port); | 77 it->second->message_pipe_->OnRemove(it->second->port_); |
89 num_live++; | 78 num_live++; |
90 } else { | 79 } else { |
91 DCHECK(!it->second.message_pipe.get()); | 80 DCHECK(!it->second->message_pipe_.get()); |
92 num_zombies++; | 81 num_zombies++; |
93 } | 82 } |
94 } | 83 } |
95 DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live | 84 DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live |
96 << " live endpoints and " << num_zombies | 85 << " live endpoints and " << num_zombies |
97 << " zombies"; | 86 << " zombies"; |
98 } | 87 } |
99 | 88 |
100 void Channel::WillShutdownSoon() { | 89 void Channel::WillShutdownSoon() { |
101 base::AutoLock locker(lock_); | 90 base::AutoLock locker(lock_); |
102 is_shutting_down_ = true; | 91 is_shutting_down_ = true; |
103 } | 92 } |
104 | 93 |
105 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( | 94 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( |
106 scoped_refptr<MessagePipe> message_pipe, | 95 scoped_refptr<MessagePipe> message_pipe, |
107 unsigned port) { | 96 unsigned port) { |
108 DCHECK(message_pipe.get()); | 97 DCHECK(message_pipe.get()); |
109 DCHECK(port == 0 || port == 1); | 98 DCHECK(port == 0 || port == 1); |
110 | 99 |
111 MessageInTransit::EndpointId local_id; | 100 MessageInTransit::EndpointId local_id; |
112 { | 101 { |
113 base::AutoLock locker(lock_); | 102 base::AutoLock locker(lock_); |
114 | 103 |
115 DLOG_IF(WARNING, is_shutting_down_) | 104 DLOG_IF(WARNING, is_shutting_down_) |
116 << "AttachMessagePipeEndpoint() while shutting down"; | 105 << "AttachMessagePipeEndpoint() while shutting down"; |
117 | 106 |
118 while (next_local_id_ == MessageInTransit::kInvalidEndpointId || | 107 while (next_local_id_ == MessageInTransit::kInvalidEndpointId || |
119 local_id_to_endpoint_info_map_.find(next_local_id_) != | 108 local_id_to_endpoint_map_.find(next_local_id_) != |
120 local_id_to_endpoint_info_map_.end()) | 109 local_id_to_endpoint_map_.end()) |
121 next_local_id_++; | 110 next_local_id_++; |
122 | 111 |
123 local_id = next_local_id_; | 112 local_id = next_local_id_; |
124 next_local_id_++; | 113 next_local_id_++; |
125 | 114 local_id_to_endpoint_map_[local_id] = |
126 // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid | 115 new ChannelEndpoint(message_pipe, port); |
127 // some expensive reference count increment/decrements.) Once this is done, | |
128 // we should be able to delete |EndpointInfo|'s default constructor. | |
129 local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port); | |
130 } | 116 } |
131 | 117 |
132 // This might fail if that port got an |OnPeerClose()| before attaching. | 118 // This might fail if that port got an |OnPeerClose()| before attaching. |
133 if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id)) | 119 if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id)) |
134 return local_id; | 120 return local_id; |
135 | 121 |
136 // Note: If it failed, quite possibly the endpoint info was removed from that | 122 // Note: If it failed, quite possibly the endpoint info was removed from that |
137 // map (there's a race between us adding it to the map above and calling | 123 // map (there's a race between us adding it to the map above and calling |
138 // |Attach()|). And even if an entry exists for |local_id|, we need to check | 124 // |Attach()|). And even if an entry exists for |local_id|, we need to check |
139 // that it's the one we added (and not some other one that was added since). | 125 // that it's the one we added (and not some other one that was added since). |
140 { | 126 { |
141 base::AutoLock locker(lock_); | 127 base::AutoLock locker(lock_); |
142 IdToEndpointInfoMap::iterator it = | 128 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id); |
143 local_id_to_endpoint_info_map_.find(local_id); | 129 if (it != local_id_to_endpoint_map_.end() && |
144 if (it != local_id_to_endpoint_info_map_.end() && | 130 it->second->message_pipe_.get() == message_pipe.get() && |
145 it->second.message_pipe.get() == message_pipe.get() && | 131 it->second->port_ == port) { |
146 it->second.port == port) { | 132 DCHECK_EQ(it->second->state_, ChannelEndpoint::STATE_NORMAL); |
147 DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL); | |
148 // TODO(vtl): FIXME -- This is wrong. We need to specify (to | 133 // TODO(vtl): FIXME -- This is wrong. We need to specify (to |
149 // |AttachMessagePipeEndpoint()| who's going to be responsible for calling | 134 // |AttachMessagePipeEndpoint()| who's going to be responsible for calling |
150 // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a | 135 // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a |
151 // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to | 136 // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to |
152 // run, then we'll get messages to an "invalid" local ID (for running, for | 137 // run, then we'll get messages to an "invalid" local ID (for running, for |
153 // removal). | 138 // removal). |
154 local_id_to_endpoint_info_map_.erase(it); | 139 local_id_to_endpoint_map_.erase(it); |
155 } | 140 } |
156 } | 141 } |
157 return MessageInTransit::kInvalidEndpointId; | 142 return MessageInTransit::kInvalidEndpointId; |
158 } | 143 } |
159 | 144 |
160 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, | 145 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, |
161 MessageInTransit::EndpointId remote_id) { | 146 MessageInTransit::EndpointId remote_id) { |
162 EndpointInfo endpoint_info; | 147 ChannelEndpoint::State state; |
| 148 scoped_refptr<MessagePipe> message_pipe; |
| 149 unsigned port; |
163 { | 150 { |
164 base::AutoLock locker(lock_); | 151 base::AutoLock locker(lock_); |
165 | 152 |
166 DLOG_IF(WARNING, is_shutting_down_) | 153 DLOG_IF(WARNING, is_shutting_down_) |
167 << "RunMessagePipeEndpoint() while shutting down"; | 154 << "RunMessagePipeEndpoint() while shutting down"; |
168 | 155 |
169 IdToEndpointInfoMap::const_iterator it = | 156 IdToEndpointMap::const_iterator it = |
170 local_id_to_endpoint_info_map_.find(local_id); | 157 local_id_to_endpoint_map_.find(local_id); |
171 if (it == local_id_to_endpoint_info_map_.end()) | 158 if (it == local_id_to_endpoint_map_.end()) |
172 return false; | 159 return false; |
173 endpoint_info = it->second; | 160 state = it->second->state_; |
| 161 message_pipe = it->second->message_pipe_; |
| 162 port = it->second->port_; |
174 } | 163 } |
175 | 164 |
176 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint| | 165 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint| |
177 // and ignore it. | 166 // and ignore it. |
178 if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { | 167 if (state != ChannelEndpoint::STATE_NORMAL) { |
179 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint " | 168 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint " |
180 "(local ID " << local_id << ", remote ID " << remote_id << ")"; | 169 "(local ID " << local_id << ", remote ID " << remote_id << ")"; |
181 return true; | 170 return true; |
182 } | 171 } |
183 | 172 |
184 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already | 173 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already |
185 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|). | 174 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|). |
186 endpoint_info.message_pipe->Run(endpoint_info.port, remote_id); | 175 message_pipe->Run(port, remote_id); |
187 return true; | 176 return true; |
188 } | 177 } |
189 | 178 |
190 void Channel::RunRemoteMessagePipeEndpoint( | 179 void Channel::RunRemoteMessagePipeEndpoint( |
191 MessageInTransit::EndpointId local_id, | 180 MessageInTransit::EndpointId local_id, |
192 MessageInTransit::EndpointId remote_id) { | 181 MessageInTransit::EndpointId remote_id) { |
193 #if DCHECK_IS_ON | 182 #if DCHECK_IS_ON |
194 { | 183 { |
195 base::AutoLock locker(lock_); | 184 base::AutoLock locker(lock_); |
196 DCHECK(local_id_to_endpoint_info_map_.find(local_id) != | 185 DCHECK(local_id_to_endpoint_map_.find(local_id) != |
197 local_id_to_endpoint_info_map_.end()); | 186 local_id_to_endpoint_map_.end()); |
198 } | 187 } |
199 #endif | 188 #endif |
200 | 189 |
201 if (!SendControlMessage( | 190 if (!SendControlMessage( |
202 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint, | 191 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint, |
203 local_id, | 192 local_id, |
204 remote_id)) { | 193 remote_id)) { |
205 HandleLocalError(base::StringPrintf( | 194 HandleLocalError(base::StringPrintf( |
206 "Failed to send message to run remote message pipe endpoint (local ID " | 195 "Failed to send message to run remote message pipe endpoint (local ID " |
207 "%u, remote ID %u)", | 196 "%u, remote ID %u)", |
(...skipping 26 matching lines...) Expand all Loading... |
234 MessageInTransit::EndpointId local_id, | 223 MessageInTransit::EndpointId local_id, |
235 MessageInTransit::EndpointId remote_id) { | 224 MessageInTransit::EndpointId remote_id) { |
236 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); | 225 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
237 | 226 |
238 bool should_send_remove_message = false; | 227 bool should_send_remove_message = false; |
239 { | 228 { |
240 base::AutoLock locker_(lock_); | 229 base::AutoLock locker_(lock_); |
241 if (!is_running_) | 230 if (!is_running_) |
242 return; | 231 return; |
243 | 232 |
244 IdToEndpointInfoMap::iterator it = | 233 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id); |
245 local_id_to_endpoint_info_map_.find(local_id); | 234 DCHECK(it != local_id_to_endpoint_map_.end()); |
246 DCHECK(it != local_id_to_endpoint_info_map_.end()); | |
247 | 235 |
248 switch (it->second.state) { | 236 switch (it->second->state_) { |
249 case EndpointInfo::STATE_NORMAL: | 237 case ChannelEndpoint::STATE_NORMAL: |
250 it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK; | 238 it->second->state_ = ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK; |
251 it->second.message_pipe = NULL; | 239 it->second->message_pipe_ = NULL; |
252 should_send_remove_message = | 240 should_send_remove_message = |
253 (remote_id != MessageInTransit::kInvalidEndpointId); | 241 (remote_id != MessageInTransit::kInvalidEndpointId); |
254 break; | 242 break; |
255 case EndpointInfo::STATE_WAIT_LOCAL_DETACH: | 243 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH: |
256 local_id_to_endpoint_info_map_.erase(it); | 244 local_id_to_endpoint_map_.erase(it); |
257 break; | 245 break; |
258 case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK: | 246 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK: |
259 NOTREACHED(); | 247 NOTREACHED(); |
260 break; | 248 break; |
261 } | 249 } |
262 } | 250 } |
263 if (!should_send_remove_message) | 251 if (!should_send_remove_message) |
264 return; | 252 return; |
265 | 253 |
266 if (!SendControlMessage( | 254 if (!SendControlMessage( |
267 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint, | 255 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint, |
268 local_id, | 256 local_id, |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
337 embedder::ScopedPlatformHandleVectorPtr platform_handles) { | 325 embedder::ScopedPlatformHandleVectorPtr platform_handles) { |
338 DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint || | 326 DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint || |
339 message_view.type() == MessageInTransit::kTypeMessagePipe); | 327 message_view.type() == MessageInTransit::kTypeMessagePipe); |
340 | 328 |
341 MessageInTransit::EndpointId local_id = message_view.destination_id(); | 329 MessageInTransit::EndpointId local_id = message_view.destination_id(); |
342 if (local_id == MessageInTransit::kInvalidEndpointId) { | 330 if (local_id == MessageInTransit::kInvalidEndpointId) { |
343 HandleRemoteError("Received message with no destination ID"); | 331 HandleRemoteError("Received message with no destination ID"); |
344 return; | 332 return; |
345 } | 333 } |
346 | 334 |
347 EndpointInfo endpoint_info; | 335 ChannelEndpoint::State state = ChannelEndpoint::STATE_NORMAL; |
| 336 scoped_refptr<MessagePipe> message_pipe; |
| 337 unsigned port = ~0u; |
348 bool nonexistent_local_id_error = false; | 338 bool nonexistent_local_id_error = false; |
349 { | 339 { |
350 base::AutoLock locker(lock_); | 340 base::AutoLock locker(lock_); |
351 | 341 |
352 // Since we own |raw_channel_|, and this method and |Shutdown()| should only | 342 // Since we own |raw_channel_|, and this method and |Shutdown()| should only |
353 // be called from the creation thread, |raw_channel_| should never be null | 343 // be called from the creation thread, |raw_channel_| should never be null |
354 // here. | 344 // here. |
355 DCHECK(is_running_); | 345 DCHECK(is_running_); |
356 | 346 |
357 IdToEndpointInfoMap::const_iterator it = | 347 IdToEndpointMap::const_iterator it = |
358 local_id_to_endpoint_info_map_.find(local_id); | 348 local_id_to_endpoint_map_.find(local_id); |
359 if (it == local_id_to_endpoint_info_map_.end()) | 349 if (it == local_id_to_endpoint_map_.end()) { |
360 nonexistent_local_id_error = true; | 350 nonexistent_local_id_error = true; |
361 else | 351 } else { |
362 endpoint_info = it->second; | 352 state = it->second->state_; |
| 353 message_pipe = it->second->message_pipe_; |
| 354 port = it->second->port_; |
| 355 } |
363 } | 356 } |
364 if (nonexistent_local_id_error) { | 357 if (nonexistent_local_id_error) { |
365 HandleRemoteError(base::StringPrintf( | 358 HandleRemoteError(base::StringPrintf( |
366 "Received a message for nonexistent local destination ID %u", | 359 "Received a message for nonexistent local destination ID %u", |
367 static_cast<unsigned>(local_id))); | 360 static_cast<unsigned>(local_id))); |
368 // This is strongly indicative of some problem. However, it's not a fatal | 361 // This is strongly indicative of some problem. However, it's not a fatal |
369 // error, since it may indicate a buggy (or hostile) remote process. Don't | 362 // error, since it may indicate a buggy (or hostile) remote process. Don't |
370 // die even for Debug builds, since handling this properly needs to be | 363 // die even for Debug builds, since handling this properly needs to be |
371 // tested (TODO(vtl)). | 364 // tested (TODO(vtl)). |
372 DLOG(ERROR) << "This should not happen under normal operation."; | 365 DLOG(ERROR) << "This should not happen under normal operation."; |
373 return; | 366 return; |
374 } | 367 } |
375 | 368 |
376 // Ignore messages for zombie endpoints (not an error). | 369 // Ignore messages for zombie endpoints (not an error). |
377 if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { | 370 if (state != ChannelEndpoint::STATE_NORMAL) { |
378 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = " | 371 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = " |
379 << local_id << ", remote ID = " << message_view.source_id() << ")"; | 372 << local_id << ", remote ID = " << message_view.source_id() << ")"; |
380 return; | 373 return; |
381 } | 374 } |
382 | 375 |
383 // We need to duplicate the message (data), because |EnqueueMessage()| will | 376 // We need to duplicate the message (data), because |EnqueueMessage()| will |
384 // take ownership of it. | 377 // take ownership of it. |
385 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | 378 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
386 if (message_view.transport_data_buffer_size() > 0) { | 379 if (message_view.transport_data_buffer_size() > 0) { |
387 DCHECK(message_view.transport_data_buffer()); | 380 DCHECK(message_view.transport_data_buffer()); |
388 message->SetDispatchers(TransportData::DeserializeDispatchers( | 381 message->SetDispatchers(TransportData::DeserializeDispatchers( |
389 message_view.transport_data_buffer(), | 382 message_view.transport_data_buffer(), |
390 message_view.transport_data_buffer_size(), | 383 message_view.transport_data_buffer_size(), |
391 platform_handles.Pass(), | 384 platform_handles.Pass(), |
392 this)); | 385 this)); |
393 } | 386 } |
394 MojoResult result = endpoint_info.message_pipe->EnqueueMessage( | 387 MojoResult result = message_pipe->EnqueueMessage( |
395 MessagePipe::GetPeerPort(endpoint_info.port), message.Pass()); | 388 MessagePipe::GetPeerPort(port), message.Pass()); |
396 if (result != MOJO_RESULT_OK) { | 389 if (result != MOJO_RESULT_OK) { |
397 // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint | 390 // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint |
398 // has been closed (in an unavoidable race). This might also be a "remote" | 391 // has been closed (in an unavoidable race). This might also be a "remote" |
399 // error, e.g., if the remote side is sending invalid control messages (to | 392 // error, e.g., if the remote side is sending invalid control messages (to |
400 // the message pipe). | 393 // the message pipe). |
401 HandleLocalError(base::StringPrintf( | 394 HandleLocalError(base::StringPrintf( |
402 "Failed to enqueue message to local ID %u (result %d)", | 395 "Failed to enqueue message to local ID %u (result %d)", |
403 static_cast<unsigned>(local_id), | 396 static_cast<unsigned>(local_id), |
404 static_cast<int>(result))); | 397 static_cast<int>(result))); |
405 return; | 398 return; |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
453 default: | 446 default: |
454 HandleRemoteError("Received invalid channel message"); | 447 HandleRemoteError("Received invalid channel message"); |
455 NOTREACHED(); | 448 NOTREACHED(); |
456 break; | 449 break; |
457 } | 450 } |
458 } | 451 } |
459 | 452 |
460 bool Channel::RemoveMessagePipeEndpoint( | 453 bool Channel::RemoveMessagePipeEndpoint( |
461 MessageInTransit::EndpointId local_id, | 454 MessageInTransit::EndpointId local_id, |
462 MessageInTransit::EndpointId remote_id) { | 455 MessageInTransit::EndpointId remote_id) { |
463 EndpointInfo endpoint_info; | 456 scoped_refptr<MessagePipe> message_pipe; |
| 457 unsigned port; |
464 { | 458 { |
465 base::AutoLock locker(lock_); | 459 base::AutoLock locker(lock_); |
466 | 460 |
467 IdToEndpointInfoMap::iterator it = | 461 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id); |
468 local_id_to_endpoint_info_map_.find(local_id); | 462 if (it == local_id_to_endpoint_map_.end()) { |
469 if (it == local_id_to_endpoint_info_map_.end()) { | |
470 DVLOG(2) << "Remove message pipe error: not found"; | 463 DVLOG(2) << "Remove message pipe error: not found"; |
471 return false; | 464 return false; |
472 } | 465 } |
473 | 466 |
474 // If it's waiting for the remove ack, just do it and return. | 467 // If it's waiting for the remove ack, just do it and return. |
475 if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) { | 468 if (it->second->state_ == ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK) { |
476 local_id_to_endpoint_info_map_.erase(it); | 469 local_id_to_endpoint_map_.erase(it); |
477 return true; | 470 return true; |
478 } | 471 } |
479 | 472 |
480 if (it->second.state != EndpointInfo::STATE_NORMAL) { | 473 if (it->second->state_ != ChannelEndpoint::STATE_NORMAL) { |
481 DVLOG(2) << "Remove message pipe error: wrong state"; | 474 DVLOG(2) << "Remove message pipe error: wrong state"; |
482 return false; | 475 return false; |
483 } | 476 } |
484 | 477 |
485 it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH; | 478 it->second->state_ = ChannelEndpoint::STATE_WAIT_LOCAL_DETACH; |
486 endpoint_info = it->second; | 479 message_pipe = it->second->message_pipe_; |
487 it->second.message_pipe = NULL; | 480 port = it->second->port_; |
| 481 it->second->message_pipe_ = NULL; |
488 } | 482 } |
489 | 483 |
490 if (!SendControlMessage( | 484 if (!SendControlMessage( |
491 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck, | 485 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck, |
492 local_id, | 486 local_id, |
493 remote_id)) { | 487 remote_id)) { |
494 HandleLocalError(base::StringPrintf( | 488 HandleLocalError(base::StringPrintf( |
495 "Failed to send message to remove remote message pipe endpoint ack " | 489 "Failed to send message to remove remote message pipe endpoint ack " |
496 "(local ID %u, remote ID %u)", | 490 "(local ID %u, remote ID %u)", |
497 static_cast<unsigned>(local_id), | 491 static_cast<unsigned>(local_id), |
498 static_cast<unsigned>(remote_id))); | 492 static_cast<unsigned>(remote_id))); |
499 } | 493 } |
500 | 494 |
501 endpoint_info.message_pipe->OnRemove(endpoint_info.port); | 495 message_pipe->OnRemove(port); |
502 | 496 |
503 return true; | 497 return true; |
504 } | 498 } |
505 | 499 |
506 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype, | 500 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype, |
507 MessageInTransit::EndpointId local_id, | 501 MessageInTransit::EndpointId local_id, |
508 MessageInTransit::EndpointId remote_id) { | 502 MessageInTransit::EndpointId remote_id) { |
509 DVLOG(2) << "Sending channel control message: subtype " << subtype | 503 DVLOG(2) << "Sending channel control message: subtype " << subtype |
510 << ", local ID " << local_id << ", remote ID " << remote_id; | 504 << ", local ID " << local_id << ", remote ID " << remote_id; |
511 scoped_ptr<MessageInTransit> message( | 505 scoped_ptr<MessageInTransit> message( |
(...skipping 13 matching lines...) Expand all Loading... |
525 // TODO(vtl): Is this how we really want to handle this? | 519 // TODO(vtl): Is this how we really want to handle this? |
526 // Sometimes we'll want to propagate the error back to the message pipe | 520 // Sometimes we'll want to propagate the error back to the message pipe |
527 // (endpoint), and notify it that the remote is (effectively) closed. | 521 // (endpoint), and notify it that the remote is (effectively) closed. |
528 // Sometimes we'll want to kill the channel (and notify all the endpoints that | 522 // Sometimes we'll want to kill the channel (and notify all the endpoints that |
529 // their remotes are dead. | 523 // their remotes are dead. |
530 LOG(WARNING) << error_message; | 524 LOG(WARNING) << error_message; |
531 } | 525 } |
532 | 526 |
533 } // namespace system | 527 } // namespace system |
534 } // namespace mojo | 528 } // namespace mojo |
OLD | NEW |