Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(244)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 2345013002: Mojo C++ bindings: remove the lock in MultiplexRouter if it only serves a single interface. (Closed)
Patch Set: . Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/macros.h" 13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h" 14 #include "base/memory/ptr_util.h"
15 #include "base/single_thread_task_runner.h" 15 #include "base/single_thread_task_runner.h"
16 #include "base/stl_util.h" 16 #include "base/stl_util.h"
17 #include "base/threading/thread_task_runner_handle.h" 17 #include "base/threading/thread_task_runner_handle.h"
18 #include "mojo/public/cpp/bindings/associated_group.h" 18 #include "mojo/public/cpp/bindings/associated_group.h"
19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" 19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" 20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
21 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
21 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" 22 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
22 23
23 namespace mojo { 24 namespace mojo {
24 namespace internal { 25 namespace internal {
25 26
26 // InterfaceEndpoint stores the information of an interface endpoint registered 27 // InterfaceEndpoint stores the information of an interface endpoint registered
27 // with the router. 28 // with the router.
28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to 29 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
29 // this object. 30 // this object.
30 class MultiplexRouter::InterfaceEndpoint 31 class MultiplexRouter::InterfaceEndpoint
(...skipping 12 matching lines...) Expand all
43 // The following public methods are safe to call from any threads without 44 // The following public methods are safe to call from any threads without
44 // locking. 45 // locking.
45 46
46 InterfaceId id() const { return id_; } 47 InterfaceId id() const { return id_; }
47 48
48 // --------------------------------------------------------------------------- 49 // ---------------------------------------------------------------------------
49 // The following public methods are called under the router's lock. 50 // The following public methods are called under the router's lock.
50 51
51 bool closed() const { return closed_; } 52 bool closed() const { return closed_; }
52 void set_closed() { 53 void set_closed() {
53 router_->lock_.AssertAcquired(); 54 router_->AssertLockAcquired();
54 closed_ = true; 55 closed_ = true;
55 } 56 }
56 57
57 bool peer_closed() const { return peer_closed_; } 58 bool peer_closed() const { return peer_closed_; }
58 void set_peer_closed() { 59 void set_peer_closed() {
59 router_->lock_.AssertAcquired(); 60 router_->AssertLockAcquired();
60 peer_closed_ = true; 61 peer_closed_ = true;
61 } 62 }
62 63
63 base::SingleThreadTaskRunner* task_runner() const { 64 base::SingleThreadTaskRunner* task_runner() const {
64 return task_runner_.get(); 65 return task_runner_.get();
65 } 66 }
66 67
67 InterfaceEndpointClient* client() const { return client_; } 68 InterfaceEndpointClient* client() const { return client_; }
68 69
69 void AttachClient(InterfaceEndpointClient* client, 70 void AttachClient(InterfaceEndpointClient* client,
70 scoped_refptr<base::SingleThreadTaskRunner> runner) { 71 scoped_refptr<base::SingleThreadTaskRunner> runner) {
71 router_->lock_.AssertAcquired(); 72 router_->AssertLockAcquired();
72 DCHECK(!client_); 73 DCHECK(!client_);
73 DCHECK(!closed_); 74 DCHECK(!closed_);
74 DCHECK(runner->BelongsToCurrentThread()); 75 DCHECK(runner->BelongsToCurrentThread());
75 76
76 task_runner_ = std::move(runner); 77 task_runner_ = std::move(runner);
77 client_ = client; 78 client_ = client;
78 } 79 }
79 80
80 // This method must be called on the same thread as the corresponding 81 // This method must be called on the same thread as the corresponding
81 // AttachClient() call. 82 // AttachClient() call.
82 void DetachClient() { 83 void DetachClient() {
83 router_->lock_.AssertAcquired(); 84 router_->AssertLockAcquired();
84 DCHECK(client_); 85 DCHECK(client_);
85 DCHECK(task_runner_->BelongsToCurrentThread()); 86 DCHECK(task_runner_->BelongsToCurrentThread());
86 DCHECK(!closed_); 87 DCHECK(!closed_);
87 88
88 task_runner_ = nullptr; 89 task_runner_ = nullptr;
89 client_ = nullptr; 90 client_ = nullptr;
90 sync_watcher_.reset(); 91 sync_watcher_.reset();
91 } 92 }
92 93
93 void SignalSyncMessageEvent() { 94 void SignalSyncMessageEvent() {
94 router_->lock_.AssertAcquired(); 95 router_->AssertLockAcquired();
95 if (event_signalled_) 96 if (event_signalled_)
96 return; 97 return;
97 98
98 EnsureEventMessagePipeExists(); 99 EnsureEventMessagePipeExists();
99 event_signalled_ = true; 100 event_signalled_ = true;
100 MojoResult result = 101 MojoResult result =
101 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, 102 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr,
102 0, MOJO_WRITE_MESSAGE_FLAG_NONE); 103 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
103 DCHECK_EQ(MOJO_RESULT_OK, result); 104 DCHECK_EQ(MOJO_RESULT_OK, result);
104 } 105 }
105 106
106 void ResetSyncMessageSignal() { 107 void ResetSyncMessageSignal() {
107 router_->lock_.AssertAcquired(); 108 router_->AssertLockAcquired();
108 109
109 if (!event_signalled_) 110 if (!event_signalled_)
110 return; 111 return;
111 112
112 DCHECK(sync_message_event_receiver_.is_valid()); 113 DCHECK(sync_message_event_receiver_.is_valid());
113 MojoResult result = 114 MojoResult result =
114 ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr, 115 ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr,
115 nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); 116 nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
116 DCHECK_EQ(MOJO_RESULT_OK, result); 117 DCHECK_EQ(MOJO_RESULT_OK, result);
117 event_signalled_ = false; 118 event_signalled_ = false;
(...skipping 21 matching lines...) Expand all
139 DCHECK(task_runner_->BelongsToCurrentThread()); 140 DCHECK(task_runner_->BelongsToCurrentThread());
140 141
141 EnsureSyncWatcherExists(); 142 EnsureSyncWatcherExists();
142 return sync_watcher_->SyncWatch(should_stop); 143 return sync_watcher_->SyncWatch(should_stop);
143 } 144 }
144 145
145 private: 146 private:
146 friend class base::RefCounted<InterfaceEndpoint>; 147 friend class base::RefCounted<InterfaceEndpoint>;
147 148
148 ~InterfaceEndpoint() override { 149 ~InterfaceEndpoint() override {
149 router_->lock_.AssertAcquired(); 150 router_->AssertLockAcquired();
150 151
151 DCHECK(!client_); 152 DCHECK(!client_);
152 DCHECK(closed_); 153 DCHECK(closed_);
153 DCHECK(peer_closed_); 154 DCHECK(peer_closed_);
154 DCHECK(!sync_watcher_); 155 DCHECK(!sync_watcher_);
155 } 156 }
156 157
157 void OnHandleReady(MojoResult result) { 158 void OnHandleReady(MojoResult result) {
158 DCHECK(task_runner_->BelongsToCurrentThread()); 159 DCHECK(task_runner_->BelongsToCurrentThread());
159 scoped_refptr<InterfaceEndpoint> self_protector(this); 160 scoped_refptr<InterfaceEndpoint> self_protector(this);
160 scoped_refptr<MultiplexRouter> router_protector(router_); 161 scoped_refptr<MultiplexRouter> router_protector(router_);
161 162
162 // Because we never close |sync_message_event_{sender,receiver}_| before 163 // Because we never close |sync_message_event_{sender,receiver}_| before
163 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. 164 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
164 DCHECK_EQ(MOJO_RESULT_OK, result); 165 DCHECK_EQ(MOJO_RESULT_OK, result);
165 bool reset_sync_watcher = false; 166 bool reset_sync_watcher = false;
166 { 167 {
167 base::AutoLock locker(router_->lock_); 168 MayAutoLock locker(router_->lock_.get());
168 169
169 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); 170 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
170 171
171 if (!more_to_process) 172 if (!more_to_process)
172 ResetSyncMessageSignal(); 173 ResetSyncMessageSignal();
173 174
174 // Currently there are no queued sync messages and the peer has closed so 175 // Currently there are no queued sync messages and the peer has closed so
175 // there won't be incoming sync messages in the future. 176 // there won't be incoming sync messages in the future.
176 reset_sync_watcher = !more_to_process && peer_closed_; 177 reset_sync_watcher = !more_to_process && peer_closed_;
177 } 178 }
178 if (reset_sync_watcher) { 179 if (reset_sync_watcher) {
179 // If a SyncWatch() call (or multiple ones) of this interface endpoint is 180 // If a SyncWatch() call (or multiple ones) of this interface endpoint is
180 // on the call stack, resetting the sync watcher will allow it to exit 181 // on the call stack, resetting the sync watcher will allow it to exit
181 // when the call stack unwinds to that frame. 182 // when the call stack unwinds to that frame.
182 sync_watcher_.reset(); 183 sync_watcher_.reset();
183 } 184 }
184 } 185 }
185 186
186 void EnsureSyncWatcherExists() { 187 void EnsureSyncWatcherExists() {
187 DCHECK(task_runner_->BelongsToCurrentThread()); 188 DCHECK(task_runner_->BelongsToCurrentThread());
188 if (sync_watcher_) 189 if (sync_watcher_)
189 return; 190 return;
190 191
191 { 192 {
192 base::AutoLock locker(router_->lock_); 193 MayAutoLock locker(router_->lock_.get());
193 EnsureEventMessagePipeExists(); 194 EnsureEventMessagePipeExists();
194 195
195 auto iter = router_->sync_message_tasks_.find(id_); 196 auto iter = router_->sync_message_tasks_.find(id_);
196 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) 197 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
197 SignalSyncMessageEvent(); 198 SignalSyncMessageEvent();
198 } 199 }
199 200
200 sync_watcher_.reset(new SyncHandleWatcher( 201 sync_watcher_.reset(new SyncHandleWatcher(
201 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, 202 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
202 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); 203 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
203 } 204 }
204 205
205 void EnsureEventMessagePipeExists() { 206 void EnsureEventMessagePipeExists() {
206 router_->lock_.AssertAcquired(); 207 router_->AssertLockAcquired();
207 208
208 if (sync_message_event_receiver_.is_valid()) 209 if (sync_message_event_receiver_.is_valid())
209 return; 210 return;
210 211
211 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, 212 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_,
212 &sync_message_event_receiver_); 213 &sync_message_event_receiver_);
213 DCHECK_EQ(MOJO_RESULT_OK, result); 214 DCHECK_EQ(MOJO_RESULT_OK, result);
214 } 215 }
215 216
216 // --------------------------------------------------------------------------- 217 // ---------------------------------------------------------------------------
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
274 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; 275 scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
275 276
276 enum Type { MESSAGE, NOTIFY_ERROR }; 277 enum Type { MESSAGE, NOTIFY_ERROR };
277 Type type; 278 Type type;
278 279
279 private: 280 private:
280 explicit Task(Type in_type) : type(in_type) {} 281 explicit Task(Type in_type) : type(in_type) {}
281 }; 282 };
282 283
283 MultiplexRouter::MultiplexRouter( 284 MultiplexRouter::MultiplexRouter(
285 ScopedMessagePipeHandle message_pipe,
286 Config config,
284 bool set_interface_id_namesapce_bit, 287 bool set_interface_id_namesapce_bit,
285 ScopedMessagePipeHandle message_pipe,
286 scoped_refptr<base::SingleThreadTaskRunner> runner) 288 scoped_refptr<base::SingleThreadTaskRunner> runner)
287 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), 289 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
288 task_runner_(runner), 290 task_runner_(runner),
289 header_validator_(nullptr), 291 header_validator_(nullptr),
290 filters_(this), 292 filters_(this),
291 connector_(std::move(message_pipe), 293 connector_(std::move(message_pipe),
292 Connector::MULTI_THREADED_SEND, 294 config == SINGLE_INTERFACE ? Connector::SINGLE_THREADED_SEND
295 : Connector::MULTI_THREADED_SEND,
293 std::move(runner)), 296 std::move(runner)),
297 lock_(config == SINGLE_INTERFACE ? nullptr : new base::Lock),
294 control_message_handler_(this), 298 control_message_handler_(this),
295 control_message_proxy_(&connector_), 299 control_message_proxy_(&connector_),
296 next_interface_id_value_(1), 300 next_interface_id_value_(1),
297 posted_to_process_tasks_(false), 301 posted_to_process_tasks_(false),
298 encountered_error_(false), 302 encountered_error_(false),
299 paused_(false), 303 paused_(false),
300 testing_mode_(false) { 304 testing_mode_(false) {
301 DCHECK(task_runner_->BelongsToCurrentThread()); 305 DCHECK(task_runner_->BelongsToCurrentThread());
302 // Always participate in sync handle watching, because even if it doesn't 306 // Always participate in sync handle watching, because even if it doesn't
303 // expect sync requests during sync handle watching, it may still need to 307 // expect sync requests during sync handle watching, it may still need to
304 // dispatch messages to associated endpoints on a different thread. 308 // dispatch messages to associated endpoints on a different thread.
305 connector_.AllowWokenUpBySyncWatchOnSameThread(); 309 connector_.AllowWokenUpBySyncWatchOnSameThread();
306 connector_.set_incoming_receiver(&filters_); 310 connector_.set_incoming_receiver(&filters_);
307 connector_.set_connection_error_handler( 311 connector_.set_connection_error_handler(
308 base::Bind(&MultiplexRouter::OnPipeConnectionError, 312 base::Bind(&MultiplexRouter::OnPipeConnectionError,
309 base::Unretained(this))); 313 base::Unretained(this)));
310 314
311 std::unique_ptr<MessageHeaderValidator> header_validator = 315 std::unique_ptr<MessageHeaderValidator> header_validator =
312 base::MakeUnique<MessageHeaderValidator>(); 316 base::MakeUnique<MessageHeaderValidator>();
313 header_validator_ = header_validator.get(); 317 header_validator_ = header_validator.get();
314 filters_.Append(std::move(header_validator)); 318 filters_.Append(std::move(header_validator));
315 } 319 }
316 320
317 MultiplexRouter::~MultiplexRouter() { 321 MultiplexRouter::~MultiplexRouter() {
318 base::AutoLock locker(lock_); 322 MayAutoLock locker(lock_.get());
319 323
320 sync_message_tasks_.clear(); 324 sync_message_tasks_.clear();
321 tasks_.clear(); 325 tasks_.clear();
322 326
323 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 327 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
324 InterfaceEndpoint* endpoint = iter->second.get(); 328 InterfaceEndpoint* endpoint = iter->second.get();
325 // Increment the iterator before calling UpdateEndpointStateMayRemove() 329 // Increment the iterator before calling UpdateEndpointStateMayRemove()
326 // because it may remove the corresponding value from the map. 330 // because it may remove the corresponding value from the map.
327 ++iter; 331 ++iter;
328 332
329 DCHECK(endpoint->closed()); 333 DCHECK(endpoint->closed());
330 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 334 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
331 } 335 }
332 336
333 DCHECK(endpoints_.empty()); 337 DCHECK(endpoints_.empty());
334 } 338 }
335 339
336 void MultiplexRouter::SetMasterInterfaceName(const std::string& name) { 340 void MultiplexRouter::SetMasterInterfaceName(const std::string& name) {
337 DCHECK(thread_checker_.CalledOnValidThread()); 341 DCHECK(thread_checker_.CalledOnValidThread());
338 header_validator_->SetDescription(name + " [master] MessageHeaderValidator"); 342 header_validator_->SetDescription(name + " [master] MessageHeaderValidator");
339 control_message_handler_.SetDescription( 343 control_message_handler_.SetDescription(
340 name + " [master] PipeControlMessageHandler"); 344 name + " [master] PipeControlMessageHandler");
341 } 345 }
342 346
343 void MultiplexRouter::CreateEndpointHandlePair( 347 void MultiplexRouter::CreateEndpointHandlePair(
344 ScopedInterfaceEndpointHandle* local_endpoint, 348 ScopedInterfaceEndpointHandle* local_endpoint,
345 ScopedInterfaceEndpointHandle* remote_endpoint) { 349 ScopedInterfaceEndpointHandle* remote_endpoint) {
346 base::AutoLock locker(lock_); 350 MayAutoLock locker(lock_.get());
347 uint32_t id = 0; 351 uint32_t id = 0;
348 do { 352 do {
349 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) 353 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
350 next_interface_id_value_ = 1; 354 next_interface_id_value_ = 1;
351 id = next_interface_id_value_++; 355 id = next_interface_id_value_++;
352 if (set_interface_id_namespace_bit_) 356 if (set_interface_id_namespace_bit_)
353 id |= kInterfaceIdNamespaceMask; 357 id |= kInterfaceIdNamespaceMask;
354 } while (base::ContainsKey(endpoints_, id)); 358 } while (base::ContainsKey(endpoints_, id));
355 359
356 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); 360 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
357 endpoints_[id] = endpoint; 361 endpoints_[id] = endpoint;
358 if (encountered_error_) 362 if (encountered_error_)
359 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 363 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
360 364
361 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true); 365 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true);
362 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false); 366 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false);
363 } 367 }
364 368
365 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( 369 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
366 InterfaceId id) { 370 InterfaceId id) {
367 if (!IsValidInterfaceId(id)) 371 if (!IsValidInterfaceId(id))
368 return ScopedInterfaceEndpointHandle(); 372 return ScopedInterfaceEndpointHandle();
369 373
370 base::AutoLock locker(lock_); 374 MayAutoLock locker(lock_.get());
371 bool inserted = false; 375 bool inserted = false;
372 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 376 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
373 if (inserted) { 377 if (inserted) {
374 if (encountered_error_) 378 if (encountered_error_)
375 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 379 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
376 } else { 380 } else {
377 // If the endpoint already exist, it is because we have received a 381 // If the endpoint already exist, it is because we have received a
378 // notification that the peer endpoint has closed. 382 // notification that the peer endpoint has closed.
379 CHECK(!endpoint->closed()); 383 CHECK(!endpoint->closed());
380 CHECK(endpoint->peer_closed()); 384 CHECK(endpoint->peer_closed());
381 } 385 }
382 return CreateScopedInterfaceEndpointHandle(id, true); 386 return CreateScopedInterfaceEndpointHandle(id, true);
383 } 387 }
384 388
385 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { 389 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) {
386 if (!IsValidInterfaceId(id)) 390 if (!IsValidInterfaceId(id))
387 return; 391 return;
388 392
389 base::AutoLock locker(lock_); 393 MayAutoLock locker(lock_.get());
390 394
391 if (!is_local) { 395 if (!is_local) {
392 DCHECK(base::ContainsKey(endpoints_, id)); 396 DCHECK(base::ContainsKey(endpoints_, id));
393 DCHECK(!IsMasterInterfaceId(id)); 397 DCHECK(!IsMasterInterfaceId(id));
394 398
395 // We will receive a NotifyPeerEndpointClosed message from the other side. 399 // We will receive a NotifyPeerEndpointClosed message from the other side.
396 control_message_proxy_.NotifyEndpointClosedBeforeSent(id); 400 control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
397 401
398 return; 402 return;
399 } 403 }
(...skipping 12 matching lines...) Expand all
412 416
413 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( 417 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
414 const ScopedInterfaceEndpointHandle& handle, 418 const ScopedInterfaceEndpointHandle& handle,
415 InterfaceEndpointClient* client, 419 InterfaceEndpointClient* client,
416 scoped_refptr<base::SingleThreadTaskRunner> runner) { 420 scoped_refptr<base::SingleThreadTaskRunner> runner) {
417 const InterfaceId id = handle.id(); 421 const InterfaceId id = handle.id();
418 422
419 DCHECK(IsValidInterfaceId(id)); 423 DCHECK(IsValidInterfaceId(id));
420 DCHECK(client); 424 DCHECK(client);
421 425
422 base::AutoLock locker(lock_); 426 MayAutoLock locker(lock_.get());
423 DCHECK(base::ContainsKey(endpoints_, id)); 427 DCHECK(base::ContainsKey(endpoints_, id));
424 428
425 InterfaceEndpoint* endpoint = endpoints_[id].get(); 429 InterfaceEndpoint* endpoint = endpoints_[id].get();
426 endpoint->AttachClient(client, std::move(runner)); 430 endpoint->AttachClient(client, std::move(runner));
427 431
428 if (endpoint->peer_closed()) 432 if (endpoint->peer_closed())
429 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 433 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
430 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 434 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
431 435
432 return endpoint; 436 return endpoint;
433 } 437 }
434 438
435 void MultiplexRouter::DetachEndpointClient( 439 void MultiplexRouter::DetachEndpointClient(
436 const ScopedInterfaceEndpointHandle& handle) { 440 const ScopedInterfaceEndpointHandle& handle) {
437 const InterfaceId id = handle.id(); 441 const InterfaceId id = handle.id();
438 442
439 DCHECK(IsValidInterfaceId(id)); 443 DCHECK(IsValidInterfaceId(id));
440 444
441 base::AutoLock locker(lock_); 445 MayAutoLock locker(lock_.get());
442 DCHECK(base::ContainsKey(endpoints_, id)); 446 DCHECK(base::ContainsKey(endpoints_, id));
443 447
444 InterfaceEndpoint* endpoint = endpoints_[id].get(); 448 InterfaceEndpoint* endpoint = endpoints_[id].get();
445 endpoint->DetachClient(); 449 endpoint->DetachClient();
446 } 450 }
447 451
448 void MultiplexRouter::RaiseError() { 452 void MultiplexRouter::RaiseError() {
449 if (task_runner_->BelongsToCurrentThread()) { 453 if (task_runner_->BelongsToCurrentThread()) {
450 connector_.RaiseError(); 454 connector_.RaiseError();
451 } else { 455 } else {
452 task_runner_->PostTask(FROM_HERE, 456 task_runner_->PostTask(FROM_HERE,
453 base::Bind(&MultiplexRouter::RaiseError, this)); 457 base::Bind(&MultiplexRouter::RaiseError, this));
454 } 458 }
455 } 459 }
456 460
457 void MultiplexRouter::CloseMessagePipe() { 461 void MultiplexRouter::CloseMessagePipe() {
458 DCHECK(thread_checker_.CalledOnValidThread()); 462 DCHECK(thread_checker_.CalledOnValidThread());
459 connector_.CloseMessagePipe(); 463 connector_.CloseMessagePipe();
460 // CloseMessagePipe() above won't trigger connection error handler. 464 // CloseMessagePipe() above won't trigger connection error handler.
461 // Explicitly call OnPipeConnectionError() so that associated endpoints will 465 // Explicitly call OnPipeConnectionError() so that associated endpoints will
462 // get notified. 466 // get notified.
463 OnPipeConnectionError(); 467 OnPipeConnectionError();
464 } 468 }
465 469
466 void MultiplexRouter::PauseIncomingMethodCallProcessing() { 470 void MultiplexRouter::PauseIncomingMethodCallProcessing() {
467 DCHECK(thread_checker_.CalledOnValidThread()); 471 DCHECK(thread_checker_.CalledOnValidThread());
468 connector_.PauseIncomingMethodCallProcessing(); 472 connector_.PauseIncomingMethodCallProcessing();
469 473
470 base::AutoLock locker(lock_); 474 MayAutoLock locker(lock_.get());
471 paused_ = true; 475 paused_ = true;
472 476
473 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) 477 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter)
474 iter->second->ResetSyncMessageSignal(); 478 iter->second->ResetSyncMessageSignal();
475 } 479 }
476 480
477 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { 481 void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
478 DCHECK(thread_checker_.CalledOnValidThread()); 482 DCHECK(thread_checker_.CalledOnValidThread());
479 connector_.ResumeIncomingMethodCallProcessing(); 483 connector_.ResumeIncomingMethodCallProcessing();
480 484
481 base::AutoLock locker(lock_); 485 MayAutoLock locker(lock_.get());
482 paused_ = false; 486 paused_ = false;
483 487
484 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { 488 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
485 auto sync_iter = sync_message_tasks_.find(iter->first); 489 auto sync_iter = sync_message_tasks_.find(iter->first);
486 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) 490 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty())
487 iter->second->SignalSyncMessageEvent(); 491 iter->second->SignalSyncMessageEvent();
488 } 492 }
489 493
490 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 494 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
491 } 495 }
492 496
493 bool MultiplexRouter::HasAssociatedEndpoints() const { 497 bool MultiplexRouter::HasAssociatedEndpoints() const {
494 DCHECK(thread_checker_.CalledOnValidThread()); 498 DCHECK(thread_checker_.CalledOnValidThread());
495 base::AutoLock locker(lock_); 499 MayAutoLock locker(lock_.get());
496 500
497 if (endpoints_.size() > 1) 501 if (endpoints_.size() > 1)
498 return true; 502 return true;
499 if (endpoints_.size() == 0) 503 if (endpoints_.size() == 0)
500 return false; 504 return false;
501 505
502 return !base::ContainsKey(endpoints_, kMasterInterfaceId); 506 return !base::ContainsKey(endpoints_, kMasterInterfaceId);
503 } 507 }
504 508
505 void MultiplexRouter::EnableTestingMode() { 509 void MultiplexRouter::EnableTestingMode() {
506 DCHECK(thread_checker_.CalledOnValidThread()); 510 DCHECK(thread_checker_.CalledOnValidThread());
507 base::AutoLock locker(lock_); 511 MayAutoLock locker(lock_.get());
508 512
509 testing_mode_ = true; 513 testing_mode_ = true;
510 connector_.set_enforce_errors_from_incoming_receiver(false); 514 connector_.set_enforce_errors_from_incoming_receiver(false);
511 } 515 }
512 516
513 bool MultiplexRouter::Accept(Message* message) { 517 bool MultiplexRouter::Accept(Message* message) {
514 DCHECK(thread_checker_.CalledOnValidThread()); 518 DCHECK(thread_checker_.CalledOnValidThread());
515 519
516 scoped_refptr<MultiplexRouter> protector(this); 520 scoped_refptr<MultiplexRouter> protector(this);
517 base::AutoLock locker(lock_); 521 MayAutoLock locker(lock_.get());
518 522
519 DCHECK(!paused_); 523 DCHECK(!paused_);
520 524
521 ClientCallBehavior client_call_behavior = 525 ClientCallBehavior client_call_behavior =
522 connector_.during_sync_handle_watcher_callback() 526 connector_.during_sync_handle_watcher_callback()
523 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 527 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
524 : ALLOW_DIRECT_CLIENT_CALLS; 528 : ALLOW_DIRECT_CLIENT_CALLS;
525 529
526 bool processed = 530 bool processed =
527 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, 531 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
(...skipping 18 matching lines...) Expand all
546 // tasks. 550 // tasks.
547 ProcessTasks(client_call_behavior, connector_.task_runner()); 551 ProcessTasks(client_call_behavior, connector_.task_runner());
548 } 552 }
549 553
550 // Always return true. If we see errors during message processing, we will 554 // Always return true. If we see errors during message processing, we will
551 // explicitly call Connector::RaiseError() to disconnect the message pipe. 555 // explicitly call Connector::RaiseError() to disconnect the message pipe.
552 return true; 556 return true;
553 } 557 }
554 558
555 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { 559 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
556 lock_.AssertAcquired(); 560 AssertLockAcquired();
557 561
558 if (IsMasterInterfaceId(id)) 562 if (IsMasterInterfaceId(id))
559 return false; 563 return false;
560 564
561 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); 565 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
562 566
563 // It is possible that this endpoint has been set as peer closed. That is 567 // It is possible that this endpoint has been set as peer closed. That is
564 // because when the message pipe is closed, all the endpoints are updated with 568 // because when the message pipe is closed, all the endpoints are updated with
565 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, 569 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
566 // as long as there are refs keeping the router alive. If there is a 570 // as long as there are refs keeping the router alive. If there is a
567 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get 571 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
568 // here and see that the endpoint has been marked as peer closed. 572 // here and see that the endpoint has been marked as peer closed.
569 if (!endpoint->peer_closed()) { 573 if (!endpoint->peer_closed()) {
570 if (endpoint->client()) 574 if (endpoint->client())
571 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 575 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
572 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 576 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
573 } 577 }
574 578
575 // No need to trigger a ProcessTasks() because it is already on the stack. 579 // No need to trigger a ProcessTasks() because it is already on the stack.
576 580
577 return true; 581 return true;
578 } 582 }
579 583
580 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { 584 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
581 lock_.AssertAcquired(); 585 AssertLockAcquired();
582 586
583 if (IsMasterInterfaceId(id)) 587 if (IsMasterInterfaceId(id))
584 return false; 588 return false;
585 589
586 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); 590 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
587 DCHECK(!endpoint->closed()); 591 DCHECK(!endpoint->closed());
588 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 592 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
589 593
590 control_message_proxy_.NotifyPeerEndpointClosed(id); 594 control_message_proxy_.NotifyPeerEndpointClosed(id);
591 595
592 return true; 596 return true;
593 } 597 }
594 598
595 void MultiplexRouter::OnPipeConnectionError() { 599 void MultiplexRouter::OnPipeConnectionError() {
596 DCHECK(thread_checker_.CalledOnValidThread()); 600 DCHECK(thread_checker_.CalledOnValidThread());
597 601
598 scoped_refptr<MultiplexRouter> protector(this); 602 scoped_refptr<MultiplexRouter> protector(this);
599 base::AutoLock locker(lock_); 603 MayAutoLock locker(lock_.get());
600 604
601 encountered_error_ = true; 605 encountered_error_ = true;
602 606
603 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 607 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
604 InterfaceEndpoint* endpoint = iter->second.get(); 608 InterfaceEndpoint* endpoint = iter->second.get();
605 // Increment the iterator before calling UpdateEndpointStateMayRemove() 609 // Increment the iterator before calling UpdateEndpointStateMayRemove()
606 // because it may remove the corresponding value from the map. 610 // because it may remove the corresponding value from the map.
607 ++iter; 611 ++iter;
608 612
609 if (endpoint->client()) 613 if (endpoint->client())
610 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 614 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
611 615
612 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 616 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
613 } 617 }
614 618
615 ProcessTasks(connector_.during_sync_handle_watcher_callback() 619 ProcessTasks(connector_.during_sync_handle_watcher_callback()
616 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 620 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
617 : ALLOW_DIRECT_CLIENT_CALLS, 621 : ALLOW_DIRECT_CLIENT_CALLS,
618 connector_.task_runner()); 622 connector_.task_runner());
619 } 623 }
620 624
621 void MultiplexRouter::ProcessTasks( 625 void MultiplexRouter::ProcessTasks(
622 ClientCallBehavior client_call_behavior, 626 ClientCallBehavior client_call_behavior,
623 base::SingleThreadTaskRunner* current_task_runner) { 627 base::SingleThreadTaskRunner* current_task_runner) {
624 lock_.AssertAcquired(); 628 AssertLockAcquired();
625 629
626 if (posted_to_process_tasks_) 630 if (posted_to_process_tasks_)
627 return; 631 return;
628 632
629 while (!tasks_.empty() && !paused_) { 633 while (!tasks_.empty() && !paused_) {
630 std::unique_ptr<Task> task(std::move(tasks_.front())); 634 std::unique_ptr<Task> task(std::move(tasks_.front()));
631 tasks_.pop_front(); 635 tasks_.pop_front();
632 636
633 InterfaceId id = kInvalidInterfaceId; 637 InterfaceId id = kInvalidInterfaceId;
634 bool sync_message = task->IsMessageTask() && !task->message.IsNull() && 638 bool sync_message = task->IsMessageTask() && !task->message.IsNull() &&
(...skipping 23 matching lines...) Expand all
658 if (sync_message) { 662 if (sync_message) {
659 auto iter = sync_message_tasks_.find(id); 663 auto iter = sync_message_tasks_.find(id);
660 if (iter != sync_message_tasks_.end() && iter->second.empty()) 664 if (iter != sync_message_tasks_.end() && iter->second.empty())
661 sync_message_tasks_.erase(iter); 665 sync_message_tasks_.erase(iter);
662 } 666 }
663 } 667 }
664 } 668 }
665 } 669 }
666 670
667 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { 671 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
668 lock_.AssertAcquired(); 672 AssertLockAcquired();
669 673
670 auto iter = sync_message_tasks_.find(id); 674 auto iter = sync_message_tasks_.find(id);
671 if (iter == sync_message_tasks_.end()) 675 if (iter == sync_message_tasks_.end())
672 return false; 676 return false;
673 677
674 if (paused_) 678 if (paused_)
675 return true; 679 return true;
676 680
677 MultiplexRouter::Task* task = iter->second.front(); 681 MultiplexRouter::Task* task = iter->second.front();
678 iter->second.pop_front(); 682 iter->second.pop_front();
(...skipping 18 matching lines...) Expand all
697 return true; 701 return true;
698 } 702 }
699 703
700 bool MultiplexRouter::ProcessNotifyErrorTask( 704 bool MultiplexRouter::ProcessNotifyErrorTask(
701 Task* task, 705 Task* task,
702 ClientCallBehavior client_call_behavior, 706 ClientCallBehavior client_call_behavior,
703 base::SingleThreadTaskRunner* current_task_runner) { 707 base::SingleThreadTaskRunner* current_task_runner) {
704 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); 708 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
705 DCHECK(!paused_); 709 DCHECK(!paused_);
706 710
707 lock_.AssertAcquired(); 711 AssertLockAcquired();
708 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); 712 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
709 if (!endpoint->client()) 713 if (!endpoint->client())
710 return true; 714 return true;
711 715
712 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || 716 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
713 endpoint->task_runner() != current_task_runner) { 717 endpoint->task_runner() != current_task_runner) {
714 MaybePostToProcessTasks(endpoint->task_runner()); 718 MaybePostToProcessTasks(endpoint->task_runner());
715 return false; 719 return false;
716 } 720 }
717 721
718 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 722 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
719 723
720 InterfaceEndpointClient* client = endpoint->client(); 724 InterfaceEndpointClient* client = endpoint->client();
721 { 725 {
722 // We must unlock before calling into |client| because it may call this 726 // We must unlock before calling into |client| because it may call this
723 // object within NotifyError(). Holding the lock will lead to deadlock. 727 // object within NotifyError(). Holding the lock will lead to deadlock.
724 // 728 //
725 // It is safe to call into |client| without the lock. Because |client| is 729 // It is safe to call into |client| without the lock. Because |client| is
726 // always accessed on the same thread, including DetachEndpointClient(). 730 // always accessed on the same thread, including DetachEndpointClient().
727 base::AutoUnlock unlocker(lock_); 731 MayAutoUnlock unlocker(lock_.get());
728 client->NotifyError(); 732 client->NotifyError();
729 } 733 }
730 return true; 734 return true;
731 } 735 }
732 736
733 bool MultiplexRouter::ProcessIncomingMessage( 737 bool MultiplexRouter::ProcessIncomingMessage(
734 Message* message, 738 Message* message,
735 ClientCallBehavior client_call_behavior, 739 ClientCallBehavior client_call_behavior,
736 base::SingleThreadTaskRunner* current_task_runner) { 740 base::SingleThreadTaskRunner* current_task_runner) {
737 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); 741 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
738 DCHECK(!paused_); 742 DCHECK(!paused_);
739 DCHECK(message); 743 DCHECK(message);
740 lock_.AssertAcquired(); 744 AssertLockAcquired();
741 745
742 if (message->IsNull()) { 746 if (message->IsNull()) {
743 // This is a sync message and has been processed during sync handle 747 // This is a sync message and has been processed during sync handle
744 // watching. 748 // watching.
745 return true; 749 return true;
746 } 750 }
747 751
748 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { 752 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
749 if (!control_message_handler_.Accept(message)) 753 if (!control_message_handler_.Accept(message))
750 RaiseErrorInNonTestingMode(); 754 RaiseErrorInNonTestingMode();
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
804 808
805 InterfaceEndpointClient* client = endpoint->client(); 809 InterfaceEndpointClient* client = endpoint->client();
806 bool result = false; 810 bool result = false;
807 { 811 {
808 // We must unlock before calling into |client| because it may call this 812 // We must unlock before calling into |client| because it may call this
809 // object within HandleIncomingMessage(). Holding the lock will lead to 813 // object within HandleIncomingMessage(). Holding the lock will lead to
810 // deadlock. 814 // deadlock.
811 // 815 //
812 // It is safe to call into |client| without the lock. Because |client| is 816 // It is safe to call into |client| without the lock. Because |client| is
813 // always accessed on the same thread, including DetachEndpointClient(). 817 // always accessed on the same thread, including DetachEndpointClient().
814 base::AutoUnlock unlocker(lock_); 818 MayAutoUnlock unlocker(lock_.get());
815 result = client->HandleIncomingMessage(message); 819 result = client->HandleIncomingMessage(message);
816 } 820 }
817 if (!result) 821 if (!result)
818 RaiseErrorInNonTestingMode(); 822 RaiseErrorInNonTestingMode();
819 823
820 return true; 824 return true;
821 } 825 }
822 826
823 void MultiplexRouter::MaybePostToProcessTasks( 827 void MultiplexRouter::MaybePostToProcessTasks(
824 base::SingleThreadTaskRunner* task_runner) { 828 base::SingleThreadTaskRunner* task_runner) {
825 lock_.AssertAcquired(); 829 AssertLockAcquired();
826 if (posted_to_process_tasks_) 830 if (posted_to_process_tasks_)
827 return; 831 return;
828 832
829 posted_to_process_tasks_ = true; 833 posted_to_process_tasks_ = true;
830 posted_to_task_runner_ = task_runner; 834 posted_to_task_runner_ = task_runner;
831 task_runner->PostTask( 835 task_runner->PostTask(
832 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); 836 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
833 } 837 }
834 838
835 void MultiplexRouter::LockAndCallProcessTasks() { 839 void MultiplexRouter::LockAndCallProcessTasks() {
836 // There is no need to hold a ref to this class in this case because this is 840 // There is no need to hold a ref to this class in this case because this is
837 // always called using base::Bind(), which holds a ref. 841 // always called using base::Bind(), which holds a ref.
838 base::AutoLock locker(lock_); 842 MayAutoLock locker(lock_.get());
839 posted_to_process_tasks_ = false; 843 posted_to_process_tasks_ = false;
840 scoped_refptr<base::SingleThreadTaskRunner> runner( 844 scoped_refptr<base::SingleThreadTaskRunner> runner(
841 std::move(posted_to_task_runner_)); 845 std::move(posted_to_task_runner_));
842 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); 846 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
843 } 847 }
844 848
845 void MultiplexRouter::UpdateEndpointStateMayRemove( 849 void MultiplexRouter::UpdateEndpointStateMayRemove(
846 InterfaceEndpoint* endpoint, 850 InterfaceEndpoint* endpoint,
847 EndpointStateUpdateType type) { 851 EndpointStateUpdateType type) {
848 switch (type) { 852 switch (type) {
849 case ENDPOINT_CLOSED: 853 case ENDPOINT_CLOSED:
850 endpoint->set_closed(); 854 endpoint->set_closed();
851 break; 855 break;
852 case PEER_ENDPOINT_CLOSED: 856 case PEER_ENDPOINT_CLOSED:
853 endpoint->set_peer_closed(); 857 endpoint->set_peer_closed();
854 // If the interface endpoint is performing a sync watch, this makes sure 858 // If the interface endpoint is performing a sync watch, this makes sure
855 // it is notified and eventually exits the sync watch. 859 // it is notified and eventually exits the sync watch.
856 endpoint->SignalSyncMessageEvent(); 860 endpoint->SignalSyncMessageEvent();
857 break; 861 break;
858 } 862 }
859 if (endpoint->closed() && endpoint->peer_closed()) 863 if (endpoint->closed() && endpoint->peer_closed())
860 endpoints_.erase(endpoint->id()); 864 endpoints_.erase(endpoint->id());
861 } 865 }
862 866
863 void MultiplexRouter::RaiseErrorInNonTestingMode() { 867 void MultiplexRouter::RaiseErrorInNonTestingMode() {
864 lock_.AssertAcquired(); 868 AssertLockAcquired();
865 if (!testing_mode_) 869 if (!testing_mode_)
866 RaiseError(); 870 RaiseError();
867 } 871 }
868 872
869 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( 873 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
870 InterfaceId id, 874 InterfaceId id,
871 bool* inserted) { 875 bool* inserted) {
872 lock_.AssertAcquired(); 876 AssertLockAcquired();
873 // Either |inserted| is nullptr or it points to a boolean initialized as 877 // Either |inserted| is nullptr or it points to a boolean initialized as
874 // false. 878 // false.
875 DCHECK(!inserted || !*inserted); 879 DCHECK(!inserted || !*inserted);
876 880
877 auto iter = endpoints_.find(id); 881 auto iter = endpoints_.find(id);
878 InterfaceEndpoint* endpoint; 882 InterfaceEndpoint* endpoint;
879 if (iter == endpoints_.end()) { 883 if (iter == endpoints_.end()) {
880 endpoint = new InterfaceEndpoint(this, id); 884 endpoint = new InterfaceEndpoint(this, id);
881 endpoints_[id] = endpoint; 885 endpoints_[id] = endpoint;
882 if (inserted) 886 if (inserted)
883 *inserted = true; 887 *inserted = true;
884 } else { 888 } else {
885 endpoint = iter->second.get(); 889 endpoint = iter->second.get();
886 } 890 }
887 891
888 return endpoint; 892 return endpoint;
889 } 893 }
890 894
895 void MultiplexRouter::AssertLockAcquired() {
896 #if DCHECK_IS_ON()
897 if (lock_)
898 lock_->AssertAcquired();
899 #endif
900 }
901
891 } // namespace internal 902 } // namespace internal
892 } // namespace mojo 903 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698