Index: mojo/public/cpp/bindings/lib/multiplex_router.cc |
diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
index 17f225839a28ecba222b1a1e121e219a37d533fa..d66c65f97aa56d4fea624bee6b380f95ed36ce93 100644 |
--- a/mojo/public/cpp/bindings/lib/multiplex_router.cc |
+++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
@@ -12,8 +12,9 @@ |
#include "base/location.h" |
#include "base/macros.h" |
#include "base/memory/ptr_util.h" |
-#include "base/single_thread_task_runner.h" |
+#include "base/sequenced_task_runner.h" |
#include "base/stl_util.h" |
+#include "base/threading/sequenced_task_runner_handle.h" |
#include "base/threading/thread_task_runner_handle.h" |
#include "mojo/public/cpp/bindings/associated_group.h" |
#include "mojo/public/cpp/bindings/interface_endpoint_client.h" |
@@ -70,18 +71,16 @@ class MultiplexRouter::InterfaceEndpoint |
disconnect_reason_ = disconnect_reason; |
} |
- base::SingleThreadTaskRunner* task_runner() const { |
- return task_runner_.get(); |
- } |
+ base::SequencedTaskRunner* task_runner() const { return task_runner_.get(); } |
InterfaceEndpointClient* client() const { return client_; } |
void AttachClient(InterfaceEndpointClient* client, |
- scoped_refptr<base::SingleThreadTaskRunner> runner) { |
+ scoped_refptr<base::SequencedTaskRunner> runner) { |
router_->AssertLockAcquired(); |
DCHECK(!client_); |
DCHECK(!closed_); |
- DCHECK(runner->BelongsToCurrentThread()); |
+ DCHECK(runner->RunsTasksOnCurrentThread()); |
task_runner_ = std::move(runner); |
client_ = client; |
@@ -92,7 +91,7 @@ class MultiplexRouter::InterfaceEndpoint |
void DetachClient() { |
router_->AssertLockAcquired(); |
DCHECK(client_); |
- DCHECK(task_runner_->BelongsToCurrentThread()); |
+ DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
DCHECK(!closed_); |
task_runner_ = nullptr; |
@@ -133,20 +132,20 @@ class MultiplexRouter::InterfaceEndpoint |
// AttachClient() call. They are called outside of the router's lock. |
bool SendMessage(Message* message) override { |
- DCHECK(task_runner_->BelongsToCurrentThread()); |
+ DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
message->set_interface_id(id_); |
return router_->connector_.Accept(message); |
} |
void AllowWokenUpBySyncWatchOnSameThread() override { |
- DCHECK(task_runner_->BelongsToCurrentThread()); |
+ DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
EnsureSyncWatcherExists(); |
sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
} |
bool SyncWatch(const bool* should_stop) override { |
- DCHECK(task_runner_->BelongsToCurrentThread()); |
+ DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
EnsureSyncWatcherExists(); |
return sync_watcher_->SyncWatch(should_stop); |
@@ -165,7 +164,7 @@ class MultiplexRouter::InterfaceEndpoint |
} |
void OnHandleReady(MojoResult result) { |
- DCHECK(task_runner_->BelongsToCurrentThread()); |
+ DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
scoped_refptr<InterfaceEndpoint> self_protector(this); |
scoped_refptr<MultiplexRouter> router_protector(router_); |
@@ -194,7 +193,7 @@ class MultiplexRouter::InterfaceEndpoint |
} |
void EnsureSyncWatcherExists() { |
- DCHECK(task_runner_->BelongsToCurrentThread()); |
+ DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
if (sync_watcher_) |
return; |
@@ -240,7 +239,7 @@ class MultiplexRouter::InterfaceEndpoint |
base::Optional<DisconnectReason> disconnect_reason_; |
// The task runner on which |client_|'s methods can be called. |
- scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
+ scoped_refptr<base::SequencedTaskRunner> task_runner_; |
// Not owned. It is null if no client is attached to this endpoint. |
InterfaceEndpointClient* client_; |
@@ -296,7 +295,7 @@ MultiplexRouter::MultiplexRouter( |
ScopedMessagePipeHandle message_pipe, |
Config config, |
bool set_interface_id_namesapce_bit, |
- scoped_refptr<base::SingleThreadTaskRunner> runner) |
+ scoped_refptr<base::SequencedTaskRunner> runner) |
: set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
task_runner_(runner), |
header_validator_(nullptr), |
@@ -313,7 +312,8 @@ MultiplexRouter::MultiplexRouter( |
encountered_error_(false), |
paused_(false), |
testing_mode_(false) { |
- DCHECK(task_runner_->BelongsToCurrentThread()); |
+ DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
+ DCHECK(config == SINGLE_INTERFACE || base::ThreadTaskRunnerHandle::IsSet()); |
if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS || |
config == MULTI_INTERFACE) { |
@@ -362,7 +362,7 @@ MultiplexRouter::~MultiplexRouter() { |
} |
void MultiplexRouter::SetMasterInterfaceName(const char* name) { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(sequence_checker_.CalledOnValidSequence()); |
header_validator_->SetDescription( |
std::string(name) + " [master] MessageHeaderValidator"); |
control_message_handler_.SetDescription( |
@@ -449,7 +449,7 @@ void MultiplexRouter::CloseEndpointHandle( |
InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( |
const ScopedInterfaceEndpointHandle& handle, |
InterfaceEndpointClient* client, |
- scoped_refptr<base::SingleThreadTaskRunner> runner) { |
+ scoped_refptr<base::SequencedTaskRunner> runner) { |
const InterfaceId id = handle.id(); |
DCHECK(IsValidInterfaceId(id)); |
@@ -482,7 +482,7 @@ void MultiplexRouter::DetachEndpointClient( |
} |
void MultiplexRouter::RaiseError() { |
- if (task_runner_->BelongsToCurrentThread()) { |
+ if (task_runner_->RunsTasksOnCurrentThread()) { |
connector_.RaiseError(); |
} else { |
task_runner_->PostTask(FROM_HERE, |
@@ -491,7 +491,7 @@ void MultiplexRouter::RaiseError() { |
} |
void MultiplexRouter::CloseMessagePipe() { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(sequence_checker_.CalledOnValidSequence()); |
connector_.CloseMessagePipe(); |
// CloseMessagePipe() above won't trigger connection error handler. |
// Explicitly call OnPipeConnectionError() so that associated endpoints will |
@@ -500,7 +500,7 @@ void MultiplexRouter::CloseMessagePipe() { |
} |
void MultiplexRouter::PauseIncomingMethodCallProcessing() { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(sequence_checker_.CalledOnValidSequence()); |
connector_.PauseIncomingMethodCallProcessing(); |
MayAutoLock locker(lock_.get()); |
@@ -511,7 +511,7 @@ void MultiplexRouter::PauseIncomingMethodCallProcessing() { |
} |
void MultiplexRouter::ResumeIncomingMethodCallProcessing() { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(sequence_checker_.CalledOnValidSequence()); |
connector_.ResumeIncomingMethodCallProcessing(); |
MayAutoLock locker(lock_.get()); |
@@ -527,7 +527,7 @@ void MultiplexRouter::ResumeIncomingMethodCallProcessing() { |
} |
bool MultiplexRouter::HasAssociatedEndpoints() const { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(sequence_checker_.CalledOnValidSequence()); |
MayAutoLock locker(lock_.get()); |
if (endpoints_.size() > 1) |
@@ -539,7 +539,7 @@ bool MultiplexRouter::HasAssociatedEndpoints() const { |
} |
void MultiplexRouter::EnableTestingMode() { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(sequence_checker_.CalledOnValidSequence()); |
MayAutoLock locker(lock_.get()); |
testing_mode_ = true; |
@@ -547,7 +547,7 @@ void MultiplexRouter::EnableTestingMode() { |
} |
bool MultiplexRouter::Accept(Message* message) { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(sequence_checker_.CalledOnValidSequence()); |
scoped_refptr<MultiplexRouter> protector(this); |
MayAutoLock locker(lock_.get()); |
@@ -634,7 +634,7 @@ bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { |
} |
void MultiplexRouter::OnPipeConnectionError() { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(sequence_checker_.CalledOnValidSequence()); |
scoped_refptr<MultiplexRouter> protector(this); |
MayAutoLock locker(lock_.get()); |
@@ -661,7 +661,7 @@ void MultiplexRouter::OnPipeConnectionError() { |
void MultiplexRouter::ProcessTasks( |
ClientCallBehavior client_call_behavior, |
- base::SingleThreadTaskRunner* current_task_runner) { |
+ base::SequencedTaskRunner* current_task_runner) { |
AssertLockAcquired(); |
if (posted_to_process_tasks_) |
@@ -741,8 +741,9 @@ bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { |
bool MultiplexRouter::ProcessNotifyErrorTask( |
Task* task, |
ClientCallBehavior client_call_behavior, |
- base::SingleThreadTaskRunner* current_task_runner) { |
- DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
+ base::SequencedTaskRunner* current_task_runner) { |
+ DCHECK(!current_task_runner || |
+ current_task_runner->RunsTasksOnCurrentThread()); |
DCHECK(!paused_); |
AssertLockAcquired(); |
@@ -756,7 +757,7 @@ bool MultiplexRouter::ProcessNotifyErrorTask( |
return false; |
} |
- DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
+ DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); |
InterfaceEndpointClient* client = endpoint->client(); |
base::Optional<DisconnectReason> disconnect_reason( |
@@ -777,8 +778,9 @@ bool MultiplexRouter::ProcessNotifyErrorTask( |
bool MultiplexRouter::ProcessIncomingMessage( |
Message* message, |
ClientCallBehavior client_call_behavior, |
- base::SingleThreadTaskRunner* current_task_runner) { |
- DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
+ base::SequencedTaskRunner* current_task_runner) { |
+ DCHECK(!current_task_runner || |
+ current_task_runner->RunsTasksOnCurrentThread()); |
DCHECK(!paused_); |
DCHECK(message); |
AssertLockAcquired(); |
@@ -834,7 +836,7 @@ bool MultiplexRouter::ProcessIncomingMessage( |
bool can_direct_call; |
if (message->has_flag(Message::kFlagIsSync)) { |
can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && |
- endpoint->task_runner()->BelongsToCurrentThread(); |
+ endpoint->task_runner()->RunsTasksOnCurrentThread(); |
} else { |
can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && |
endpoint->task_runner() == current_task_runner; |
@@ -845,7 +847,7 @@ bool MultiplexRouter::ProcessIncomingMessage( |
return false; |
} |
- DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
+ DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); |
InterfaceEndpointClient* client = endpoint->client(); |
bool result = false; |
@@ -866,7 +868,7 @@ bool MultiplexRouter::ProcessIncomingMessage( |
} |
void MultiplexRouter::MaybePostToProcessTasks( |
- base::SingleThreadTaskRunner* task_runner) { |
+ base::SequencedTaskRunner* task_runner) { |
AssertLockAcquired(); |
if (posted_to_process_tasks_) |
return; |
@@ -882,7 +884,7 @@ void MultiplexRouter::LockAndCallProcessTasks() { |
// always called using base::Bind(), which holds a ref. |
MayAutoLock locker(lock_.get()); |
posted_to_process_tasks_ = false; |
- scoped_refptr<base::SingleThreadTaskRunner> runner( |
+ scoped_refptr<base::SequencedTaskRunner> runner( |
std::move(posted_to_task_runner_)); |
ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); |
} |