OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
11 #include "base/bind.h" | 11 #include "base/bind.h" |
12 #include "base/location.h" | 12 #include "base/location.h" |
13 #include "base/macros.h" | 13 #include "base/macros.h" |
14 #include "base/memory/ptr_util.h" | 14 #include "base/memory/ptr_util.h" |
15 #include "base/single_thread_task_runner.h" | 15 #include "base/single_thread_task_runner.h" |
16 #include "base/stl_util.h" | 16 #include "base/stl_util.h" |
17 #include "base/threading/thread_task_runner_handle.h" | 17 #include "base/threading/thread_task_runner_handle.h" |
18 #include "mojo/public/cpp/bindings/associated_group.h" | 18 #include "mojo/public/cpp/bindings/associated_group.h" |
19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" | 19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" |
20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" | 20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" |
| 21 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" |
21 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" | 22 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
22 | 23 |
23 namespace mojo { | 24 namespace mojo { |
24 namespace internal { | 25 namespace internal { |
25 | 26 |
26 // InterfaceEndpoint stores the information of an interface endpoint registered | 27 // InterfaceEndpoint stores the information of an interface endpoint registered |
27 // with the router. | 28 // with the router. |
28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to | 29 // No one other than the router's |endpoints_| and |tasks_| should hold refs to |
29 // this object. | 30 // this object. |
30 class MultiplexRouter::InterfaceEndpoint | 31 class MultiplexRouter::InterfaceEndpoint |
(...skipping 12 matching lines...) Expand all Loading... |
43 // The following public methods are safe to call from any threads without | 44 // The following public methods are safe to call from any threads without |
44 // locking. | 45 // locking. |
45 | 46 |
46 InterfaceId id() const { return id_; } | 47 InterfaceId id() const { return id_; } |
47 | 48 |
48 // --------------------------------------------------------------------------- | 49 // --------------------------------------------------------------------------- |
49 // The following public methods are called under the router's lock. | 50 // The following public methods are called under the router's lock. |
50 | 51 |
51 bool closed() const { return closed_; } | 52 bool closed() const { return closed_; } |
52 void set_closed() { | 53 void set_closed() { |
53 router_->lock_.AssertAcquired(); | 54 router_->AssertLockAcquired(); |
54 closed_ = true; | 55 closed_ = true; |
55 } | 56 } |
56 | 57 |
57 bool peer_closed() const { return peer_closed_; } | 58 bool peer_closed() const { return peer_closed_; } |
58 void set_peer_closed() { | 59 void set_peer_closed() { |
59 router_->lock_.AssertAcquired(); | 60 router_->AssertLockAcquired(); |
60 peer_closed_ = true; | 61 peer_closed_ = true; |
61 } | 62 } |
62 | 63 |
63 base::SingleThreadTaskRunner* task_runner() const { | 64 base::SingleThreadTaskRunner* task_runner() const { |
64 return task_runner_.get(); | 65 return task_runner_.get(); |
65 } | 66 } |
66 | 67 |
67 InterfaceEndpointClient* client() const { return client_; } | 68 InterfaceEndpointClient* client() const { return client_; } |
68 | 69 |
69 void AttachClient(InterfaceEndpointClient* client, | 70 void AttachClient(InterfaceEndpointClient* client, |
70 scoped_refptr<base::SingleThreadTaskRunner> runner) { | 71 scoped_refptr<base::SingleThreadTaskRunner> runner) { |
71 router_->lock_.AssertAcquired(); | 72 router_->AssertLockAcquired(); |
72 DCHECK(!client_); | 73 DCHECK(!client_); |
73 DCHECK(!closed_); | 74 DCHECK(!closed_); |
74 DCHECK(runner->BelongsToCurrentThread()); | 75 DCHECK(runner->BelongsToCurrentThread()); |
75 | 76 |
76 task_runner_ = std::move(runner); | 77 task_runner_ = std::move(runner); |
77 client_ = client; | 78 client_ = client; |
78 } | 79 } |
79 | 80 |
80 // This method must be called on the same thread as the corresponding | 81 // This method must be called on the same thread as the corresponding |
81 // AttachClient() call. | 82 // AttachClient() call. |
82 void DetachClient() { | 83 void DetachClient() { |
83 router_->lock_.AssertAcquired(); | 84 router_->AssertLockAcquired(); |
84 DCHECK(client_); | 85 DCHECK(client_); |
85 DCHECK(task_runner_->BelongsToCurrentThread()); | 86 DCHECK(task_runner_->BelongsToCurrentThread()); |
86 DCHECK(!closed_); | 87 DCHECK(!closed_); |
87 | 88 |
88 task_runner_ = nullptr; | 89 task_runner_ = nullptr; |
89 client_ = nullptr; | 90 client_ = nullptr; |
90 sync_watcher_.reset(); | 91 sync_watcher_.reset(); |
91 } | 92 } |
92 | 93 |
93 void SignalSyncMessageEvent() { | 94 void SignalSyncMessageEvent() { |
94 router_->lock_.AssertAcquired(); | 95 router_->AssertLockAcquired(); |
95 if (event_signalled_) | 96 if (event_signalled_) |
96 return; | 97 return; |
97 | 98 |
98 EnsureEventMessagePipeExists(); | 99 EnsureEventMessagePipeExists(); |
99 event_signalled_ = true; | 100 event_signalled_ = true; |
100 MojoResult result = | 101 MojoResult result = |
101 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, | 102 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, |
102 0, MOJO_WRITE_MESSAGE_FLAG_NONE); | 103 0, MOJO_WRITE_MESSAGE_FLAG_NONE); |
103 DCHECK_EQ(MOJO_RESULT_OK, result); | 104 DCHECK_EQ(MOJO_RESULT_OK, result); |
104 } | 105 } |
105 | 106 |
106 void ResetSyncMessageSignal() { | 107 void ResetSyncMessageSignal() { |
107 router_->lock_.AssertAcquired(); | 108 router_->AssertLockAcquired(); |
108 | 109 |
109 if (!event_signalled_) | 110 if (!event_signalled_) |
110 return; | 111 return; |
111 | 112 |
112 DCHECK(sync_message_event_receiver_.is_valid()); | 113 DCHECK(sync_message_event_receiver_.is_valid()); |
113 MojoResult result = | 114 MojoResult result = |
114 ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr, | 115 ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr, |
115 nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); | 116 nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
116 DCHECK_EQ(MOJO_RESULT_OK, result); | 117 DCHECK_EQ(MOJO_RESULT_OK, result); |
117 event_signalled_ = false; | 118 event_signalled_ = false; |
(...skipping 21 matching lines...) Expand all Loading... |
139 DCHECK(task_runner_->BelongsToCurrentThread()); | 140 DCHECK(task_runner_->BelongsToCurrentThread()); |
140 | 141 |
141 EnsureSyncWatcherExists(); | 142 EnsureSyncWatcherExists(); |
142 return sync_watcher_->SyncWatch(should_stop); | 143 return sync_watcher_->SyncWatch(should_stop); |
143 } | 144 } |
144 | 145 |
145 private: | 146 private: |
146 friend class base::RefCounted<InterfaceEndpoint>; | 147 friend class base::RefCounted<InterfaceEndpoint>; |
147 | 148 |
148 ~InterfaceEndpoint() override { | 149 ~InterfaceEndpoint() override { |
149 router_->lock_.AssertAcquired(); | 150 router_->AssertLockAcquired(); |
150 | 151 |
151 DCHECK(!client_); | 152 DCHECK(!client_); |
152 DCHECK(closed_); | 153 DCHECK(closed_); |
153 DCHECK(peer_closed_); | 154 DCHECK(peer_closed_); |
154 DCHECK(!sync_watcher_); | 155 DCHECK(!sync_watcher_); |
155 } | 156 } |
156 | 157 |
157 void OnHandleReady(MojoResult result) { | 158 void OnHandleReady(MojoResult result) { |
158 DCHECK(task_runner_->BelongsToCurrentThread()); | 159 DCHECK(task_runner_->BelongsToCurrentThread()); |
159 scoped_refptr<InterfaceEndpoint> self_protector(this); | 160 scoped_refptr<InterfaceEndpoint> self_protector(this); |
160 scoped_refptr<MultiplexRouter> router_protector(router_); | 161 scoped_refptr<MultiplexRouter> router_protector(router_); |
161 | 162 |
162 // Because we never close |sync_message_event_{sender,receiver}_| before | 163 // Because we never close |sync_message_event_{sender,receiver}_| before |
163 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. | 164 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. |
164 DCHECK_EQ(MOJO_RESULT_OK, result); | 165 DCHECK_EQ(MOJO_RESULT_OK, result); |
165 bool reset_sync_watcher = false; | 166 bool reset_sync_watcher = false; |
166 { | 167 { |
167 base::AutoLock locker(router_->lock_); | 168 MayAutoLock locker(router_->lock_.get()); |
168 | 169 |
169 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); | 170 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); |
170 | 171 |
171 if (!more_to_process) | 172 if (!more_to_process) |
172 ResetSyncMessageSignal(); | 173 ResetSyncMessageSignal(); |
173 | 174 |
174 // Currently there are no queued sync messages and the peer has closed so | 175 // Currently there are no queued sync messages and the peer has closed so |
175 // there won't be incoming sync messages in the future. | 176 // there won't be incoming sync messages in the future. |
176 reset_sync_watcher = !more_to_process && peer_closed_; | 177 reset_sync_watcher = !more_to_process && peer_closed_; |
177 } | 178 } |
178 if (reset_sync_watcher) { | 179 if (reset_sync_watcher) { |
179 // If a SyncWatch() call (or multiple ones) of this interface endpoint is | 180 // If a SyncWatch() call (or multiple ones) of this interface endpoint is |
180 // on the call stack, resetting the sync watcher will allow it to exit | 181 // on the call stack, resetting the sync watcher will allow it to exit |
181 // when the call stack unwinds to that frame. | 182 // when the call stack unwinds to that frame. |
182 sync_watcher_.reset(); | 183 sync_watcher_.reset(); |
183 } | 184 } |
184 } | 185 } |
185 | 186 |
186 void EnsureSyncWatcherExists() { | 187 void EnsureSyncWatcherExists() { |
187 DCHECK(task_runner_->BelongsToCurrentThread()); | 188 DCHECK(task_runner_->BelongsToCurrentThread()); |
188 if (sync_watcher_) | 189 if (sync_watcher_) |
189 return; | 190 return; |
190 | 191 |
191 { | 192 { |
192 base::AutoLock locker(router_->lock_); | 193 MayAutoLock locker(router_->lock_.get()); |
193 EnsureEventMessagePipeExists(); | 194 EnsureEventMessagePipeExists(); |
194 | 195 |
195 auto iter = router_->sync_message_tasks_.find(id_); | 196 auto iter = router_->sync_message_tasks_.find(id_); |
196 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) | 197 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) |
197 SignalSyncMessageEvent(); | 198 SignalSyncMessageEvent(); |
198 } | 199 } |
199 | 200 |
200 sync_watcher_.reset(new SyncHandleWatcher( | 201 sync_watcher_.reset(new SyncHandleWatcher( |
201 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 202 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
202 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); | 203 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); |
203 } | 204 } |
204 | 205 |
205 void EnsureEventMessagePipeExists() { | 206 void EnsureEventMessagePipeExists() { |
206 router_->lock_.AssertAcquired(); | 207 router_->AssertLockAcquired(); |
207 | 208 |
208 if (sync_message_event_receiver_.is_valid()) | 209 if (sync_message_event_receiver_.is_valid()) |
209 return; | 210 return; |
210 | 211 |
211 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, | 212 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, |
212 &sync_message_event_receiver_); | 213 &sync_message_event_receiver_); |
213 DCHECK_EQ(MOJO_RESULT_OK, result); | 214 DCHECK_EQ(MOJO_RESULT_OK, result); |
214 } | 215 } |
215 | 216 |
216 // --------------------------------------------------------------------------- | 217 // --------------------------------------------------------------------------- |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
274 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; | 275 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
275 | 276 |
276 enum Type { MESSAGE, NOTIFY_ERROR }; | 277 enum Type { MESSAGE, NOTIFY_ERROR }; |
277 Type type; | 278 Type type; |
278 | 279 |
279 private: | 280 private: |
280 explicit Task(Type in_type) : type(in_type) {} | 281 explicit Task(Type in_type) : type(in_type) {} |
281 }; | 282 }; |
282 | 283 |
283 MultiplexRouter::MultiplexRouter( | 284 MultiplexRouter::MultiplexRouter( |
| 285 ScopedMessagePipeHandle message_pipe, |
| 286 Config config, |
284 bool set_interface_id_namesapce_bit, | 287 bool set_interface_id_namesapce_bit, |
285 ScopedMessagePipeHandle message_pipe, | |
286 scoped_refptr<base::SingleThreadTaskRunner> runner) | 288 scoped_refptr<base::SingleThreadTaskRunner> runner) |
287 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 289 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
288 task_runner_(runner), | 290 task_runner_(runner), |
289 header_validator_(nullptr), | 291 header_validator_(nullptr), |
290 filters_(this), | 292 filters_(this), |
291 connector_(std::move(message_pipe), | 293 connector_(std::move(message_pipe), |
292 Connector::MULTI_THREADED_SEND, | 294 config == SINGLE_INTERFACE ? Connector::SINGLE_THREADED_SEND |
| 295 : Connector::MULTI_THREADED_SEND, |
293 std::move(runner)), | 296 std::move(runner)), |
| 297 lock_(config == SINGLE_INTERFACE ? nullptr : new base::Lock), |
294 control_message_handler_(this), | 298 control_message_handler_(this), |
295 control_message_proxy_(&connector_), | 299 control_message_proxy_(&connector_), |
296 next_interface_id_value_(1), | 300 next_interface_id_value_(1), |
297 posted_to_process_tasks_(false), | 301 posted_to_process_tasks_(false), |
298 encountered_error_(false), | 302 encountered_error_(false), |
299 paused_(false), | 303 paused_(false), |
300 testing_mode_(false) { | 304 testing_mode_(false) { |
301 DCHECK(task_runner_->BelongsToCurrentThread()); | 305 DCHECK(task_runner_->BelongsToCurrentThread()); |
302 // Always participate in sync handle watching, because even if it doesn't | 306 // Always participate in sync handle watching, because even if it doesn't |
303 // expect sync requests during sync handle watching, it may still need to | 307 // expect sync requests during sync handle watching, it may still need to |
304 // dispatch messages to associated endpoints on a different thread. | 308 // dispatch messages to associated endpoints on a different thread. |
305 connector_.AllowWokenUpBySyncWatchOnSameThread(); | 309 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
306 connector_.set_incoming_receiver(&filters_); | 310 connector_.set_incoming_receiver(&filters_); |
307 connector_.set_connection_error_handler( | 311 connector_.set_connection_error_handler( |
308 base::Bind(&MultiplexRouter::OnPipeConnectionError, | 312 base::Bind(&MultiplexRouter::OnPipeConnectionError, |
309 base::Unretained(this))); | 313 base::Unretained(this))); |
310 | 314 |
311 std::unique_ptr<MessageHeaderValidator> header_validator = | 315 std::unique_ptr<MessageHeaderValidator> header_validator = |
312 base::MakeUnique<MessageHeaderValidator>(); | 316 base::MakeUnique<MessageHeaderValidator>(); |
313 header_validator_ = header_validator.get(); | 317 header_validator_ = header_validator.get(); |
314 filters_.Append(std::move(header_validator)); | 318 filters_.Append(std::move(header_validator)); |
315 } | 319 } |
316 | 320 |
317 MultiplexRouter::~MultiplexRouter() { | 321 MultiplexRouter::~MultiplexRouter() { |
318 base::AutoLock locker(lock_); | 322 MayAutoLock locker(lock_.get()); |
319 | 323 |
320 sync_message_tasks_.clear(); | 324 sync_message_tasks_.clear(); |
321 tasks_.clear(); | 325 tasks_.clear(); |
322 | 326 |
323 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | 327 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
324 InterfaceEndpoint* endpoint = iter->second.get(); | 328 InterfaceEndpoint* endpoint = iter->second.get(); |
325 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 329 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
326 // because it may remove the corresponding value from the map. | 330 // because it may remove the corresponding value from the map. |
327 ++iter; | 331 ++iter; |
328 | 332 |
329 DCHECK(endpoint->closed()); | 333 DCHECK(endpoint->closed()); |
330 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 334 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
331 } | 335 } |
332 | 336 |
333 DCHECK(endpoints_.empty()); | 337 DCHECK(endpoints_.empty()); |
334 } | 338 } |
335 | 339 |
336 void MultiplexRouter::SetMasterInterfaceName(const std::string& name) { | 340 void MultiplexRouter::SetMasterInterfaceName(const std::string& name) { |
337 DCHECK(thread_checker_.CalledOnValidThread()); | 341 DCHECK(thread_checker_.CalledOnValidThread()); |
338 header_validator_->SetDescription(name + " [master] MessageHeaderValidator"); | 342 header_validator_->SetDescription(name + " [master] MessageHeaderValidator"); |
339 control_message_handler_.SetDescription( | 343 control_message_handler_.SetDescription( |
340 name + " [master] PipeControlMessageHandler"); | 344 name + " [master] PipeControlMessageHandler"); |
341 } | 345 } |
342 | 346 |
343 void MultiplexRouter::CreateEndpointHandlePair( | 347 void MultiplexRouter::CreateEndpointHandlePair( |
344 ScopedInterfaceEndpointHandle* local_endpoint, | 348 ScopedInterfaceEndpointHandle* local_endpoint, |
345 ScopedInterfaceEndpointHandle* remote_endpoint) { | 349 ScopedInterfaceEndpointHandle* remote_endpoint) { |
346 base::AutoLock locker(lock_); | 350 MayAutoLock locker(lock_.get()); |
347 uint32_t id = 0; | 351 uint32_t id = 0; |
348 do { | 352 do { |
349 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) | 353 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) |
350 next_interface_id_value_ = 1; | 354 next_interface_id_value_ = 1; |
351 id = next_interface_id_value_++; | 355 id = next_interface_id_value_++; |
352 if (set_interface_id_namespace_bit_) | 356 if (set_interface_id_namespace_bit_) |
353 id |= kInterfaceIdNamespaceMask; | 357 id |= kInterfaceIdNamespaceMask; |
354 } while (base::ContainsKey(endpoints_, id)); | 358 } while (base::ContainsKey(endpoints_, id)); |
355 | 359 |
356 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); | 360 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
357 endpoints_[id] = endpoint; | 361 endpoints_[id] = endpoint; |
358 if (encountered_error_) | 362 if (encountered_error_) |
359 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 363 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
360 | 364 |
361 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true); | 365 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true); |
362 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false); | 366 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false); |
363 } | 367 } |
364 | 368 |
365 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( | 369 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( |
366 InterfaceId id) { | 370 InterfaceId id) { |
367 if (!IsValidInterfaceId(id)) | 371 if (!IsValidInterfaceId(id)) |
368 return ScopedInterfaceEndpointHandle(); | 372 return ScopedInterfaceEndpointHandle(); |
369 | 373 |
370 base::AutoLock locker(lock_); | 374 MayAutoLock locker(lock_.get()); |
371 bool inserted = false; | 375 bool inserted = false; |
372 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | 376 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
373 if (inserted) { | 377 if (inserted) { |
374 if (encountered_error_) | 378 if (encountered_error_) |
375 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 379 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
376 } else { | 380 } else { |
377 // If the endpoint already exist, it is because we have received a | 381 // If the endpoint already exist, it is because we have received a |
378 // notification that the peer endpoint has closed. | 382 // notification that the peer endpoint has closed. |
379 CHECK(!endpoint->closed()); | 383 CHECK(!endpoint->closed()); |
380 CHECK(endpoint->peer_closed()); | 384 CHECK(endpoint->peer_closed()); |
381 } | 385 } |
382 return CreateScopedInterfaceEndpointHandle(id, true); | 386 return CreateScopedInterfaceEndpointHandle(id, true); |
383 } | 387 } |
384 | 388 |
385 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { | 389 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { |
386 if (!IsValidInterfaceId(id)) | 390 if (!IsValidInterfaceId(id)) |
387 return; | 391 return; |
388 | 392 |
389 base::AutoLock locker(lock_); | 393 MayAutoLock locker(lock_.get()); |
390 | 394 |
391 if (!is_local) { | 395 if (!is_local) { |
392 DCHECK(base::ContainsKey(endpoints_, id)); | 396 DCHECK(base::ContainsKey(endpoints_, id)); |
393 DCHECK(!IsMasterInterfaceId(id)); | 397 DCHECK(!IsMasterInterfaceId(id)); |
394 | 398 |
395 // We will receive a NotifyPeerEndpointClosed message from the other side. | 399 // We will receive a NotifyPeerEndpointClosed message from the other side. |
396 control_message_proxy_.NotifyEndpointClosedBeforeSent(id); | 400 control_message_proxy_.NotifyEndpointClosedBeforeSent(id); |
397 | 401 |
398 return; | 402 return; |
399 } | 403 } |
(...skipping 12 matching lines...) Expand all Loading... |
412 | 416 |
413 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( | 417 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( |
414 const ScopedInterfaceEndpointHandle& handle, | 418 const ScopedInterfaceEndpointHandle& handle, |
415 InterfaceEndpointClient* client, | 419 InterfaceEndpointClient* client, |
416 scoped_refptr<base::SingleThreadTaskRunner> runner) { | 420 scoped_refptr<base::SingleThreadTaskRunner> runner) { |
417 const InterfaceId id = handle.id(); | 421 const InterfaceId id = handle.id(); |
418 | 422 |
419 DCHECK(IsValidInterfaceId(id)); | 423 DCHECK(IsValidInterfaceId(id)); |
420 DCHECK(client); | 424 DCHECK(client); |
421 | 425 |
422 base::AutoLock locker(lock_); | 426 MayAutoLock locker(lock_.get()); |
423 DCHECK(base::ContainsKey(endpoints_, id)); | 427 DCHECK(base::ContainsKey(endpoints_, id)); |
424 | 428 |
425 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 429 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
426 endpoint->AttachClient(client, std::move(runner)); | 430 endpoint->AttachClient(client, std::move(runner)); |
427 | 431 |
428 if (endpoint->peer_closed()) | 432 if (endpoint->peer_closed()) |
429 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 433 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
430 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); | 434 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
431 | 435 |
432 return endpoint; | 436 return endpoint; |
433 } | 437 } |
434 | 438 |
435 void MultiplexRouter::DetachEndpointClient( | 439 void MultiplexRouter::DetachEndpointClient( |
436 const ScopedInterfaceEndpointHandle& handle) { | 440 const ScopedInterfaceEndpointHandle& handle) { |
437 const InterfaceId id = handle.id(); | 441 const InterfaceId id = handle.id(); |
438 | 442 |
439 DCHECK(IsValidInterfaceId(id)); | 443 DCHECK(IsValidInterfaceId(id)); |
440 | 444 |
441 base::AutoLock locker(lock_); | 445 MayAutoLock locker(lock_.get()); |
442 DCHECK(base::ContainsKey(endpoints_, id)); | 446 DCHECK(base::ContainsKey(endpoints_, id)); |
443 | 447 |
444 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 448 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
445 endpoint->DetachClient(); | 449 endpoint->DetachClient(); |
446 } | 450 } |
447 | 451 |
448 void MultiplexRouter::RaiseError() { | 452 void MultiplexRouter::RaiseError() { |
449 if (task_runner_->BelongsToCurrentThread()) { | 453 if (task_runner_->BelongsToCurrentThread()) { |
450 connector_.RaiseError(); | 454 connector_.RaiseError(); |
451 } else { | 455 } else { |
452 task_runner_->PostTask(FROM_HERE, | 456 task_runner_->PostTask(FROM_HERE, |
453 base::Bind(&MultiplexRouter::RaiseError, this)); | 457 base::Bind(&MultiplexRouter::RaiseError, this)); |
454 } | 458 } |
455 } | 459 } |
456 | 460 |
457 void MultiplexRouter::CloseMessagePipe() { | 461 void MultiplexRouter::CloseMessagePipe() { |
458 DCHECK(thread_checker_.CalledOnValidThread()); | 462 DCHECK(thread_checker_.CalledOnValidThread()); |
459 connector_.CloseMessagePipe(); | 463 connector_.CloseMessagePipe(); |
460 // CloseMessagePipe() above won't trigger connection error handler. | 464 // CloseMessagePipe() above won't trigger connection error handler. |
461 // Explicitly call OnPipeConnectionError() so that associated endpoints will | 465 // Explicitly call OnPipeConnectionError() so that associated endpoints will |
462 // get notified. | 466 // get notified. |
463 OnPipeConnectionError(); | 467 OnPipeConnectionError(); |
464 } | 468 } |
465 | 469 |
466 void MultiplexRouter::PauseIncomingMethodCallProcessing() { | 470 void MultiplexRouter::PauseIncomingMethodCallProcessing() { |
467 DCHECK(thread_checker_.CalledOnValidThread()); | 471 DCHECK(thread_checker_.CalledOnValidThread()); |
468 connector_.PauseIncomingMethodCallProcessing(); | 472 connector_.PauseIncomingMethodCallProcessing(); |
469 | 473 |
470 base::AutoLock locker(lock_); | 474 MayAutoLock locker(lock_.get()); |
471 paused_ = true; | 475 paused_ = true; |
472 | 476 |
473 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) | 477 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) |
474 iter->second->ResetSyncMessageSignal(); | 478 iter->second->ResetSyncMessageSignal(); |
475 } | 479 } |
476 | 480 |
477 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { | 481 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { |
478 DCHECK(thread_checker_.CalledOnValidThread()); | 482 DCHECK(thread_checker_.CalledOnValidThread()); |
479 connector_.ResumeIncomingMethodCallProcessing(); | 483 connector_.ResumeIncomingMethodCallProcessing(); |
480 | 484 |
481 base::AutoLock locker(lock_); | 485 MayAutoLock locker(lock_.get()); |
482 paused_ = false; | 486 paused_ = false; |
483 | 487 |
484 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { | 488 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { |
485 auto sync_iter = sync_message_tasks_.find(iter->first); | 489 auto sync_iter = sync_message_tasks_.find(iter->first); |
486 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) | 490 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) |
487 iter->second->SignalSyncMessageEvent(); | 491 iter->second->SignalSyncMessageEvent(); |
488 } | 492 } |
489 | 493 |
490 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); | 494 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
491 } | 495 } |
492 | 496 |
493 bool MultiplexRouter::HasAssociatedEndpoints() const { | 497 bool MultiplexRouter::HasAssociatedEndpoints() const { |
494 DCHECK(thread_checker_.CalledOnValidThread()); | 498 DCHECK(thread_checker_.CalledOnValidThread()); |
495 base::AutoLock locker(lock_); | 499 MayAutoLock locker(lock_.get()); |
496 | 500 |
497 if (endpoints_.size() > 1) | 501 if (endpoints_.size() > 1) |
498 return true; | 502 return true; |
499 if (endpoints_.size() == 0) | 503 if (endpoints_.size() == 0) |
500 return false; | 504 return false; |
501 | 505 |
502 return !base::ContainsKey(endpoints_, kMasterInterfaceId); | 506 return !base::ContainsKey(endpoints_, kMasterInterfaceId); |
503 } | 507 } |
504 | 508 |
505 void MultiplexRouter::EnableTestingMode() { | 509 void MultiplexRouter::EnableTestingMode() { |
506 DCHECK(thread_checker_.CalledOnValidThread()); | 510 DCHECK(thread_checker_.CalledOnValidThread()); |
507 base::AutoLock locker(lock_); | 511 MayAutoLock locker(lock_.get()); |
508 | 512 |
509 testing_mode_ = true; | 513 testing_mode_ = true; |
510 connector_.set_enforce_errors_from_incoming_receiver(false); | 514 connector_.set_enforce_errors_from_incoming_receiver(false); |
511 } | 515 } |
512 | 516 |
513 bool MultiplexRouter::Accept(Message* message) { | 517 bool MultiplexRouter::Accept(Message* message) { |
514 DCHECK(thread_checker_.CalledOnValidThread()); | 518 DCHECK(thread_checker_.CalledOnValidThread()); |
515 | 519 |
516 scoped_refptr<MultiplexRouter> protector(this); | 520 scoped_refptr<MultiplexRouter> protector(this); |
517 base::AutoLock locker(lock_); | 521 MayAutoLock locker(lock_.get()); |
518 | 522 |
519 DCHECK(!paused_); | 523 DCHECK(!paused_); |
520 | 524 |
521 ClientCallBehavior client_call_behavior = | 525 ClientCallBehavior client_call_behavior = |
522 connector_.during_sync_handle_watcher_callback() | 526 connector_.during_sync_handle_watcher_callback() |
523 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 527 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
524 : ALLOW_DIRECT_CLIENT_CALLS; | 528 : ALLOW_DIRECT_CLIENT_CALLS; |
525 | 529 |
526 bool processed = | 530 bool processed = |
527 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, | 531 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, |
(...skipping 18 matching lines...) Expand all Loading... |
546 // tasks. | 550 // tasks. |
547 ProcessTasks(client_call_behavior, connector_.task_runner()); | 551 ProcessTasks(client_call_behavior, connector_.task_runner()); |
548 } | 552 } |
549 | 553 |
550 // Always return true. If we see errors during message processing, we will | 554 // Always return true. If we see errors during message processing, we will |
551 // explicitly call Connector::RaiseError() to disconnect the message pipe. | 555 // explicitly call Connector::RaiseError() to disconnect the message pipe. |
552 return true; | 556 return true; |
553 } | 557 } |
554 | 558 |
555 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { | 559 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
556 lock_.AssertAcquired(); | 560 AssertLockAcquired(); |
557 | 561 |
558 if (IsMasterInterfaceId(id)) | 562 if (IsMasterInterfaceId(id)) |
559 return false; | 563 return false; |
560 | 564 |
561 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); | 565 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); |
562 | 566 |
563 // It is possible that this endpoint has been set as peer closed. That is | 567 // It is possible that this endpoint has been set as peer closed. That is |
564 // because when the message pipe is closed, all the endpoints are updated with | 568 // because when the message pipe is closed, all the endpoints are updated with |
565 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, | 569 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, |
566 // as long as there are refs keeping the router alive. If there is a | 570 // as long as there are refs keeping the router alive. If there is a |
567 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get | 571 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get |
568 // here and see that the endpoint has been marked as peer closed. | 572 // here and see that the endpoint has been marked as peer closed. |
569 if (!endpoint->peer_closed()) { | 573 if (!endpoint->peer_closed()) { |
570 if (endpoint->client()) | 574 if (endpoint->client()) |
571 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 575 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
572 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 576 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
573 } | 577 } |
574 | 578 |
575 // No need to trigger a ProcessTasks() because it is already on the stack. | 579 // No need to trigger a ProcessTasks() because it is already on the stack. |
576 | 580 |
577 return true; | 581 return true; |
578 } | 582 } |
579 | 583 |
580 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { | 584 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { |
581 lock_.AssertAcquired(); | 585 AssertLockAcquired(); |
582 | 586 |
583 if (IsMasterInterfaceId(id)) | 587 if (IsMasterInterfaceId(id)) |
584 return false; | 588 return false; |
585 | 589 |
586 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); | 590 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); |
587 DCHECK(!endpoint->closed()); | 591 DCHECK(!endpoint->closed()); |
588 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 592 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
589 | 593 |
590 control_message_proxy_.NotifyPeerEndpointClosed(id); | 594 control_message_proxy_.NotifyPeerEndpointClosed(id); |
591 | 595 |
592 return true; | 596 return true; |
593 } | 597 } |
594 | 598 |
595 void MultiplexRouter::OnPipeConnectionError() { | 599 void MultiplexRouter::OnPipeConnectionError() { |
596 DCHECK(thread_checker_.CalledOnValidThread()); | 600 DCHECK(thread_checker_.CalledOnValidThread()); |
597 | 601 |
598 scoped_refptr<MultiplexRouter> protector(this); | 602 scoped_refptr<MultiplexRouter> protector(this); |
599 base::AutoLock locker(lock_); | 603 MayAutoLock locker(lock_.get()); |
600 | 604 |
601 encountered_error_ = true; | 605 encountered_error_ = true; |
602 | 606 |
603 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | 607 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
604 InterfaceEndpoint* endpoint = iter->second.get(); | 608 InterfaceEndpoint* endpoint = iter->second.get(); |
605 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 609 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
606 // because it may remove the corresponding value from the map. | 610 // because it may remove the corresponding value from the map. |
607 ++iter; | 611 ++iter; |
608 | 612 |
609 if (endpoint->client()) | 613 if (endpoint->client()) |
610 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 614 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
611 | 615 |
612 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 616 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
613 } | 617 } |
614 | 618 |
615 ProcessTasks(connector_.during_sync_handle_watcher_callback() | 619 ProcessTasks(connector_.during_sync_handle_watcher_callback() |
616 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 620 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
617 : ALLOW_DIRECT_CLIENT_CALLS, | 621 : ALLOW_DIRECT_CLIENT_CALLS, |
618 connector_.task_runner()); | 622 connector_.task_runner()); |
619 } | 623 } |
620 | 624 |
621 void MultiplexRouter::ProcessTasks( | 625 void MultiplexRouter::ProcessTasks( |
622 ClientCallBehavior client_call_behavior, | 626 ClientCallBehavior client_call_behavior, |
623 base::SingleThreadTaskRunner* current_task_runner) { | 627 base::SingleThreadTaskRunner* current_task_runner) { |
624 lock_.AssertAcquired(); | 628 AssertLockAcquired(); |
625 | 629 |
626 if (posted_to_process_tasks_) | 630 if (posted_to_process_tasks_) |
627 return; | 631 return; |
628 | 632 |
629 while (!tasks_.empty() && !paused_) { | 633 while (!tasks_.empty() && !paused_) { |
630 std::unique_ptr<Task> task(std::move(tasks_.front())); | 634 std::unique_ptr<Task> task(std::move(tasks_.front())); |
631 tasks_.pop_front(); | 635 tasks_.pop_front(); |
632 | 636 |
633 InterfaceId id = kInvalidInterfaceId; | 637 InterfaceId id = kInvalidInterfaceId; |
634 bool sync_message = task->IsMessageTask() && !task->message.IsNull() && | 638 bool sync_message = task->IsMessageTask() && !task->message.IsNull() && |
(...skipping 23 matching lines...) Expand all Loading... |
658 if (sync_message) { | 662 if (sync_message) { |
659 auto iter = sync_message_tasks_.find(id); | 663 auto iter = sync_message_tasks_.find(id); |
660 if (iter != sync_message_tasks_.end() && iter->second.empty()) | 664 if (iter != sync_message_tasks_.end() && iter->second.empty()) |
661 sync_message_tasks_.erase(iter); | 665 sync_message_tasks_.erase(iter); |
662 } | 666 } |
663 } | 667 } |
664 } | 668 } |
665 } | 669 } |
666 | 670 |
667 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { | 671 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { |
668 lock_.AssertAcquired(); | 672 AssertLockAcquired(); |
669 | 673 |
670 auto iter = sync_message_tasks_.find(id); | 674 auto iter = sync_message_tasks_.find(id); |
671 if (iter == sync_message_tasks_.end()) | 675 if (iter == sync_message_tasks_.end()) |
672 return false; | 676 return false; |
673 | 677 |
674 if (paused_) | 678 if (paused_) |
675 return true; | 679 return true; |
676 | 680 |
677 MultiplexRouter::Task* task = iter->second.front(); | 681 MultiplexRouter::Task* task = iter->second.front(); |
678 iter->second.pop_front(); | 682 iter->second.pop_front(); |
(...skipping 18 matching lines...) Expand all Loading... |
697 return true; | 701 return true; |
698 } | 702 } |
699 | 703 |
700 bool MultiplexRouter::ProcessNotifyErrorTask( | 704 bool MultiplexRouter::ProcessNotifyErrorTask( |
701 Task* task, | 705 Task* task, |
702 ClientCallBehavior client_call_behavior, | 706 ClientCallBehavior client_call_behavior, |
703 base::SingleThreadTaskRunner* current_task_runner) { | 707 base::SingleThreadTaskRunner* current_task_runner) { |
704 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 708 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
705 DCHECK(!paused_); | 709 DCHECK(!paused_); |
706 | 710 |
707 lock_.AssertAcquired(); | 711 AssertLockAcquired(); |
708 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 712 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
709 if (!endpoint->client()) | 713 if (!endpoint->client()) |
710 return true; | 714 return true; |
711 | 715 |
712 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || | 716 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || |
713 endpoint->task_runner() != current_task_runner) { | 717 endpoint->task_runner() != current_task_runner) { |
714 MaybePostToProcessTasks(endpoint->task_runner()); | 718 MaybePostToProcessTasks(endpoint->task_runner()); |
715 return false; | 719 return false; |
716 } | 720 } |
717 | 721 |
718 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 722 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
719 | 723 |
720 InterfaceEndpointClient* client = endpoint->client(); | 724 InterfaceEndpointClient* client = endpoint->client(); |
721 { | 725 { |
722 // We must unlock before calling into |client| because it may call this | 726 // We must unlock before calling into |client| because it may call this |
723 // object within NotifyError(). Holding the lock will lead to deadlock. | 727 // object within NotifyError(). Holding the lock will lead to deadlock. |
724 // | 728 // |
725 // It is safe to call into |client| without the lock. Because |client| is | 729 // It is safe to call into |client| without the lock. Because |client| is |
726 // always accessed on the same thread, including DetachEndpointClient(). | 730 // always accessed on the same thread, including DetachEndpointClient(). |
727 base::AutoUnlock unlocker(lock_); | 731 MayAutoUnlock unlocker(lock_.get()); |
728 client->NotifyError(); | 732 client->NotifyError(); |
729 } | 733 } |
730 return true; | 734 return true; |
731 } | 735 } |
732 | 736 |
733 bool MultiplexRouter::ProcessIncomingMessage( | 737 bool MultiplexRouter::ProcessIncomingMessage( |
734 Message* message, | 738 Message* message, |
735 ClientCallBehavior client_call_behavior, | 739 ClientCallBehavior client_call_behavior, |
736 base::SingleThreadTaskRunner* current_task_runner) { | 740 base::SingleThreadTaskRunner* current_task_runner) { |
737 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 741 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
738 DCHECK(!paused_); | 742 DCHECK(!paused_); |
739 DCHECK(message); | 743 DCHECK(message); |
740 lock_.AssertAcquired(); | 744 AssertLockAcquired(); |
741 | 745 |
742 if (message->IsNull()) { | 746 if (message->IsNull()) { |
743 // This is a sync message and has been processed during sync handle | 747 // This is a sync message and has been processed during sync handle |
744 // watching. | 748 // watching. |
745 return true; | 749 return true; |
746 } | 750 } |
747 | 751 |
748 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 752 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
749 if (!control_message_handler_.Accept(message)) | 753 if (!control_message_handler_.Accept(message)) |
750 RaiseErrorInNonTestingMode(); | 754 RaiseErrorInNonTestingMode(); |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
804 | 808 |
805 InterfaceEndpointClient* client = endpoint->client(); | 809 InterfaceEndpointClient* client = endpoint->client(); |
806 bool result = false; | 810 bool result = false; |
807 { | 811 { |
808 // We must unlock before calling into |client| because it may call this | 812 // We must unlock before calling into |client| because it may call this |
809 // object within HandleIncomingMessage(). Holding the lock will lead to | 813 // object within HandleIncomingMessage(). Holding the lock will lead to |
810 // deadlock. | 814 // deadlock. |
811 // | 815 // |
812 // It is safe to call into |client| without the lock. Because |client| is | 816 // It is safe to call into |client| without the lock. Because |client| is |
813 // always accessed on the same thread, including DetachEndpointClient(). | 817 // always accessed on the same thread, including DetachEndpointClient(). |
814 base::AutoUnlock unlocker(lock_); | 818 MayAutoUnlock unlocker(lock_.get()); |
815 result = client->HandleIncomingMessage(message); | 819 result = client->HandleIncomingMessage(message); |
816 } | 820 } |
817 if (!result) | 821 if (!result) |
818 RaiseErrorInNonTestingMode(); | 822 RaiseErrorInNonTestingMode(); |
819 | 823 |
820 return true; | 824 return true; |
821 } | 825 } |
822 | 826 |
823 void MultiplexRouter::MaybePostToProcessTasks( | 827 void MultiplexRouter::MaybePostToProcessTasks( |
824 base::SingleThreadTaskRunner* task_runner) { | 828 base::SingleThreadTaskRunner* task_runner) { |
825 lock_.AssertAcquired(); | 829 AssertLockAcquired(); |
826 if (posted_to_process_tasks_) | 830 if (posted_to_process_tasks_) |
827 return; | 831 return; |
828 | 832 |
829 posted_to_process_tasks_ = true; | 833 posted_to_process_tasks_ = true; |
830 posted_to_task_runner_ = task_runner; | 834 posted_to_task_runner_ = task_runner; |
831 task_runner->PostTask( | 835 task_runner->PostTask( |
832 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 836 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
833 } | 837 } |
834 | 838 |
835 void MultiplexRouter::LockAndCallProcessTasks() { | 839 void MultiplexRouter::LockAndCallProcessTasks() { |
836 // There is no need to hold a ref to this class in this case because this is | 840 // There is no need to hold a ref to this class in this case because this is |
837 // always called using base::Bind(), which holds a ref. | 841 // always called using base::Bind(), which holds a ref. |
838 base::AutoLock locker(lock_); | 842 MayAutoLock locker(lock_.get()); |
839 posted_to_process_tasks_ = false; | 843 posted_to_process_tasks_ = false; |
840 scoped_refptr<base::SingleThreadTaskRunner> runner( | 844 scoped_refptr<base::SingleThreadTaskRunner> runner( |
841 std::move(posted_to_task_runner_)); | 845 std::move(posted_to_task_runner_)); |
842 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); | 846 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); |
843 } | 847 } |
844 | 848 |
845 void MultiplexRouter::UpdateEndpointStateMayRemove( | 849 void MultiplexRouter::UpdateEndpointStateMayRemove( |
846 InterfaceEndpoint* endpoint, | 850 InterfaceEndpoint* endpoint, |
847 EndpointStateUpdateType type) { | 851 EndpointStateUpdateType type) { |
848 switch (type) { | 852 switch (type) { |
849 case ENDPOINT_CLOSED: | 853 case ENDPOINT_CLOSED: |
850 endpoint->set_closed(); | 854 endpoint->set_closed(); |
851 break; | 855 break; |
852 case PEER_ENDPOINT_CLOSED: | 856 case PEER_ENDPOINT_CLOSED: |
853 endpoint->set_peer_closed(); | 857 endpoint->set_peer_closed(); |
854 // If the interface endpoint is performing a sync watch, this makes sure | 858 // If the interface endpoint is performing a sync watch, this makes sure |
855 // it is notified and eventually exits the sync watch. | 859 // it is notified and eventually exits the sync watch. |
856 endpoint->SignalSyncMessageEvent(); | 860 endpoint->SignalSyncMessageEvent(); |
857 break; | 861 break; |
858 } | 862 } |
859 if (endpoint->closed() && endpoint->peer_closed()) | 863 if (endpoint->closed() && endpoint->peer_closed()) |
860 endpoints_.erase(endpoint->id()); | 864 endpoints_.erase(endpoint->id()); |
861 } | 865 } |
862 | 866 |
863 void MultiplexRouter::RaiseErrorInNonTestingMode() { | 867 void MultiplexRouter::RaiseErrorInNonTestingMode() { |
864 lock_.AssertAcquired(); | 868 AssertLockAcquired(); |
865 if (!testing_mode_) | 869 if (!testing_mode_) |
866 RaiseError(); | 870 RaiseError(); |
867 } | 871 } |
868 | 872 |
869 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( | 873 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( |
870 InterfaceId id, | 874 InterfaceId id, |
871 bool* inserted) { | 875 bool* inserted) { |
872 lock_.AssertAcquired(); | 876 AssertLockAcquired(); |
873 // Either |inserted| is nullptr or it points to a boolean initialized as | 877 // Either |inserted| is nullptr or it points to a boolean initialized as |
874 // false. | 878 // false. |
875 DCHECK(!inserted || !*inserted); | 879 DCHECK(!inserted || !*inserted); |
876 | 880 |
877 auto iter = endpoints_.find(id); | 881 auto iter = endpoints_.find(id); |
878 InterfaceEndpoint* endpoint; | 882 InterfaceEndpoint* endpoint; |
879 if (iter == endpoints_.end()) { | 883 if (iter == endpoints_.end()) { |
880 endpoint = new InterfaceEndpoint(this, id); | 884 endpoint = new InterfaceEndpoint(this, id); |
881 endpoints_[id] = endpoint; | 885 endpoints_[id] = endpoint; |
882 if (inserted) | 886 if (inserted) |
883 *inserted = true; | 887 *inserted = true; |
884 } else { | 888 } else { |
885 endpoint = iter->second.get(); | 889 endpoint = iter->second.get(); |
886 } | 890 } |
887 | 891 |
888 return endpoint; | 892 return endpoint; |
889 } | 893 } |
890 | 894 |
| 895 void MultiplexRouter::AssertLockAcquired() { |
| 896 #if DCHECK_IS_ON() |
| 897 if (lock_) |
| 898 lock_->AssertAcquired(); |
| 899 #endif |
| 900 } |
| 901 |
891 } // namespace internal | 902 } // namespace internal |
892 } // namespace mojo | 903 } // namespace mojo |
OLD | NEW |