| Index: mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| index 17f225839a28ecba222b1a1e121e219a37d533fa..d8e67f7c6f950415489a4777bae298794657ea9d 100644
|
| --- a/mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| +++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| @@ -37,6 +37,7 @@ class MultiplexRouter::InterfaceEndpoint
|
| id_(id),
|
| closed_(false),
|
| peer_closed_(false),
|
| + handle_created_(false),
|
| client_(nullptr),
|
| event_signalled_(false) {}
|
|
|
| @@ -61,6 +62,12 @@ class MultiplexRouter::InterfaceEndpoint
|
| peer_closed_ = true;
|
| }
|
|
|
| + bool handle_created() const { return handle_created_; }
|
| + void set_handle_created() {
|
| + router_->AssertLockAcquired();
|
| + handle_created_ = true;
|
| + }
|
| +
|
| const base::Optional<DisconnectReason>& disconnect_reason() const {
|
| return disconnect_reason_;
|
| }
|
| @@ -134,6 +141,7 @@ class MultiplexRouter::InterfaceEndpoint
|
|
|
| bool SendMessage(Message* message) override {
|
| DCHECK(task_runner_->BelongsToCurrentThread());
|
| + message->SerializeAssociatedEndpointHandles(router_);
|
| message->set_interface_id(id_);
|
| return router_->connector_.Accept(message);
|
| }
|
| @@ -237,6 +245,10 @@ class MultiplexRouter::InterfaceEndpoint
|
| // Whether the peer endpoint has been closed.
|
| bool peer_closed_;
|
|
|
| + // Whether there is already a ScopedInterfaceEndpointHandle created for this
|
| + // endpoint.
|
| + bool handle_created_;
|
| +
|
| base::Optional<DisconnectReason> disconnect_reason_;
|
|
|
| // The task runner on which |client_|'s methods can be called.
|
| @@ -262,12 +274,71 @@ class MultiplexRouter::InterfaceEndpoint
|
| DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
|
| };
|
|
|
| +// Message objects cannot be destroyed under the router's lock, if they contain
|
| +// ScopedInterfaceEndpointHandle objects.
|
| +// IncomingMessageWrapper is used to wrap messages which haven't got the payload
|
| +// interface IDs deserialized into ScopedInterfaceEndpointHandles. Wrapper
|
| +// objects are always destroyed under the router's lock. When a wrapper is
|
| +// destroyed and the message hasn't been consumed, the wrapper is responsible
|
| +// to send endpoint closed notifications.
|
| +class MultiplexRouter::IncomingMessageWrapper {
|
| + public:
|
| + IncomingMessageWrapper() = default;
|
| +
|
| + IncomingMessageWrapper(MultiplexRouter* router, Message* message)
|
| + : router_(router), value_(std::move(*message)) {
|
| + DCHECK(value_.associated_endpoint_handles()->empty());
|
| + }
|
| +
|
| + IncomingMessageWrapper(IncomingMessageWrapper&& other)
|
| + : router_(other.router_), value_(std::move(other.value_)) {}
|
| +
|
| + ~IncomingMessageWrapper() {
|
| + if (value_.IsNull())
|
| + return;
|
| +
|
| + router_->AssertLockAcquired();
|
| +
|
| + uint32_t num_ids = value_.payload_num_interface_ids();
|
| + const uint32_t* ids = value_.payload_interface_ids();
|
| + for (uint32_t i = 0; i < num_ids; ++i) {
|
| + MayAutoUnlock unlocker(router_->lock_.get());
|
| + router_->control_message_proxy_.NotifyPeerEndpointClosed(ids[i],
|
| + base::nullopt);
|
| + }
|
| + }
|
| +
|
| + IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) {
|
| + router_ = other.router_;
|
| + value_ = std::move(other.value_);
|
| + return *this;
|
| + }
|
| +
|
| + // Must be called outside of the router's lock.
|
| + bool TakeMessage(Message* output) {
|
| + DCHECK(!value_.IsNull());
|
| +
|
| + *output = std::move(value_);
|
| + return output->DeserializeAssociatedEndpointHandles(router_);
|
| + }
|
| +
|
| + const Message& value() const { return value_; }
|
| +
|
| + private:
|
| + MultiplexRouter* router_ = nullptr;
|
| + // It must not hold any ScopedInterfaceEndpointHandle objects.
|
| + Message value_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper);
|
| +};
|
| +
|
| struct MultiplexRouter::Task {
|
| public:
|
| // Doesn't take ownership of |message| but takes its contents.
|
| - static std::unique_ptr<Task> CreateMessageTask(Message* message) {
|
| + static std::unique_ptr<Task> CreateMessageTask(
|
| + IncomingMessageWrapper message_wrapper) {
|
| Task* task = new Task(MESSAGE);
|
| - task->message = std::move(*message);
|
| + task->message_wrapper = std::move(message_wrapper);
|
| return base::WrapUnique(task);
|
| }
|
| static std::unique_ptr<Task> CreateNotifyErrorTask(
|
| @@ -282,7 +353,7 @@ struct MultiplexRouter::Task {
|
| bool IsMessageTask() const { return type == MESSAGE; }
|
| bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
|
|
|
| - Message message;
|
| + IncomingMessageWrapper message_wrapper;
|
| scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
|
|
|
| enum Type { MESSAGE, NOTIFY_ERROR };
|
| @@ -290,6 +361,8 @@ struct MultiplexRouter::Task {
|
|
|
| private:
|
| explicit Task(Type in_type) : type(in_type) {}
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(Task);
|
| };
|
|
|
| MultiplexRouter::MultiplexRouter(
|
| @@ -401,6 +474,8 @@ ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
|
| bool inserted = false;
|
| InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
|
| if (inserted) {
|
| + DCHECK(!endpoint->handle_created());
|
| +
|
| if (encountered_error_)
|
| UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
|
| } else {
|
| @@ -408,7 +483,12 @@ ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
|
| // notification that the peer endpoint has closed.
|
| CHECK(!endpoint->closed());
|
| CHECK(endpoint->peer_closed());
|
| +
|
| + if (endpoint->handle_created())
|
| + return ScopedInterfaceEndpointHandle();
|
| }
|
| +
|
| + endpoint->set_handle_created();
|
| return CreateScopedInterfaceEndpointHandle(id, true);
|
| }
|
|
|
| @@ -554,23 +634,25 @@ bool MultiplexRouter::Accept(Message* message) {
|
|
|
| DCHECK(!paused_);
|
|
|
| + IncomingMessageWrapper message_wrapper(this, message);
|
| +
|
| ClientCallBehavior client_call_behavior =
|
| connector_.during_sync_handle_watcher_callback()
|
| ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
|
| : ALLOW_DIRECT_CLIENT_CALLS;
|
|
|
| - bool processed =
|
| - tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
|
| - connector_.task_runner());
|
| + bool processed = tasks_.empty() && ProcessIncomingMessage(
|
| + &message_wrapper, client_call_behavior,
|
| + connector_.task_runner());
|
|
|
| if (!processed) {
|
| // Either the task queue is not empty or we cannot process the message
|
| // directly. In both cases, there is no need to call ProcessTasks().
|
| - tasks_.push_back(Task::CreateMessageTask(message));
|
| + tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper)));
|
| Task* task = tasks_.back().get();
|
|
|
| - if (task->message.has_flag(Message::kFlagIsSync)) {
|
| - InterfaceId id = task->message.interface_id();
|
| + if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
|
| + InterfaceId id = task->message_wrapper.value().interface_id();
|
| sync_message_tasks_[id].push_back(task);
|
| auto iter = endpoints_.find(id);
|
| if (iter != endpoints_.end())
|
| @@ -591,10 +673,9 @@ bool MultiplexRouter::Accept(Message* message) {
|
| bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
|
| InterfaceId id,
|
| const base::Optional<DisconnectReason>& reason) {
|
| - AssertLockAcquired();
|
| -
|
| DCHECK(!IsMasterInterfaceId(id) || reason);
|
|
|
| + MayAutoLock locker(lock_.get());
|
| InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
|
|
|
| if (reason)
|
| @@ -618,18 +699,18 @@ bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
|
| }
|
|
|
| bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
|
| - AssertLockAcquired();
|
| -
|
| if (IsMasterInterfaceId(id))
|
| return false;
|
|
|
| - InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
|
| - DCHECK(!endpoint->closed());
|
| - UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
|
| + {
|
| + MayAutoLock locker(lock_.get());
|
|
|
| - MayAutoUnlock unlocker(lock_.get());
|
| - control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt);
|
| + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
|
| + DCHECK(!endpoint->closed());
|
| + UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
|
| + }
|
|
|
| + control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt);
|
| return true;
|
| }
|
|
|
| @@ -672,10 +753,11 @@ void MultiplexRouter::ProcessTasks(
|
| tasks_.pop_front();
|
|
|
| InterfaceId id = kInvalidInterfaceId;
|
| - bool sync_message = task->IsMessageTask() && !task->message.IsNull() &&
|
| - task->message.has_flag(Message::kFlagIsSync);
|
| + bool sync_message =
|
| + task->IsMessageTask() && !task->message_wrapper.value().IsNull() &&
|
| + task->message_wrapper.value().has_flag(Message::kFlagIsSync);
|
| if (sync_message) {
|
| - id = task->message.interface_id();
|
| + id = task->message_wrapper.value().interface_id();
|
| auto& sync_message_queue = sync_message_tasks_[id];
|
| DCHECK_EQ(task.get(), sync_message_queue.front());
|
| sync_message_queue.pop_front();
|
| @@ -685,8 +767,8 @@ void MultiplexRouter::ProcessTasks(
|
| task->IsNotifyErrorTask()
|
| ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
|
| current_task_runner)
|
| - : ProcessIncomingMessage(&task->message, client_call_behavior,
|
| - current_task_runner);
|
| + : ProcessIncomingMessage(&task->message_wrapper,
|
| + client_call_behavior, current_task_runner);
|
|
|
| if (!processed) {
|
| if (sync_message) {
|
| @@ -719,11 +801,11 @@ bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
|
| iter->second.pop_front();
|
|
|
| DCHECK(task->IsMessageTask());
|
| - Message message = std::move(task->message);
|
| + IncomingMessageWrapper message_wrapper = std::move(task->message_wrapper);
|
|
|
| // Note: after this call, |task| and |iter| may be invalidated.
|
| bool processed = ProcessIncomingMessage(
|
| - &message, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
|
| + &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
|
| DCHECK(processed);
|
|
|
| iter = sync_message_tasks_.find(id);
|
| @@ -775,27 +857,38 @@ bool MultiplexRouter::ProcessNotifyErrorTask(
|
| }
|
|
|
| bool MultiplexRouter::ProcessIncomingMessage(
|
| - Message* message,
|
| + IncomingMessageWrapper* message_wrapper,
|
| ClientCallBehavior client_call_behavior,
|
| base::SingleThreadTaskRunner* current_task_runner) {
|
| DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
|
| DCHECK(!paused_);
|
| - DCHECK(message);
|
| + DCHECK(message_wrapper);
|
| AssertLockAcquired();
|
|
|
| - if (message->IsNull()) {
|
| + if (message_wrapper->value().IsNull()) {
|
| // This is a sync message and has been processed during sync handle
|
| // watching.
|
| return true;
|
| }
|
|
|
| - if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
|
| - if (!control_message_handler_.Accept(message))
|
| + if (PipeControlMessageHandler::IsPipeControlMessage(
|
| + &message_wrapper->value())) {
|
| + bool result = false;
|
| +
|
| + {
|
| + MayAutoUnlock unlocker(lock_.get());
|
| + Message message;
|
| + result = message_wrapper->TakeMessage(&message) &&
|
| + control_message_handler_.Accept(&message);
|
| + }
|
| +
|
| + if (!result)
|
| RaiseErrorInNonTestingMode();
|
| +
|
| return true;
|
| }
|
|
|
| - InterfaceId id = message->interface_id();
|
| + InterfaceId id = message_wrapper->value().interface_id();
|
| DCHECK(IsValidInterfaceId(id));
|
|
|
| bool inserted = false;
|
| @@ -832,7 +925,7 @@ bool MultiplexRouter::ProcessIncomingMessage(
|
| }
|
|
|
| bool can_direct_call;
|
| - if (message->has_flag(Message::kFlagIsSync)) {
|
| + if (message_wrapper->value().has_flag(Message::kFlagIsSync)) {
|
| can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
|
| endpoint->task_runner()->BelongsToCurrentThread();
|
| } else {
|
| @@ -857,7 +950,9 @@ bool MultiplexRouter::ProcessIncomingMessage(
|
| // It is safe to call into |client| without the lock. Because |client| is
|
| // always accessed on the same thread, including DetachEndpointClient().
|
| MayAutoUnlock unlocker(lock_.get());
|
| - result = client->HandleIncomingMessage(message);
|
| + Message message;
|
| + result = message_wrapper->TakeMessage(&message) &&
|
| + client->HandleIncomingMessage(&message);
|
| }
|
| if (!result)
|
| RaiseErrorInNonTestingMode();
|
|
|