| Index: mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| index d8e67f7c6f950415489a4777bae298794657ea9d..2d8c53e33994243be49eebb5b417f1b1764157ea 100644
|
| --- a/mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| +++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| @@ -274,69 +274,51 @@ class MultiplexRouter::InterfaceEndpoint
|
| DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
|
| };
|
|
|
| -// Message objects cannot be destroyed under the router's lock, if they contain
|
| -// ScopedInterfaceEndpointHandle objects.
|
| -// IncomingMessageWrapper is used to wrap messages which haven't got the payload
|
| -// interface IDs deserialized into ScopedInterfaceEndpointHandles. Wrapper
|
| -// objects are always destroyed under the router's lock. When a wrapper is
|
| -// destroyed and the message hasn't been consumed, the wrapper is responsible
|
| -// to send endpoint closed notifications.
|
| -class MultiplexRouter::IncomingMessageWrapper {
|
| +// MessageWrapper objects are always destroyed under the router's lock. On
|
| +// destruction, if the message it wrappers contains
|
| +// ScopedInterfaceEndpointHandles (which cannot be destructed under the
|
| +// router's lock), the wrapper unlocks to clean them up.
|
| +class MultiplexRouter::MessageWrapper {
|
| public:
|
| - IncomingMessageWrapper() = default;
|
| + MessageWrapper() = default;
|
|
|
| - IncomingMessageWrapper(MultiplexRouter* router, Message* message)
|
| - : router_(router), value_(std::move(*message)) {
|
| - DCHECK(value_.associated_endpoint_handles()->empty());
|
| - }
|
| + MessageWrapper(MultiplexRouter* router, Message message)
|
| + : router_(router), value_(std::move(message)) {}
|
|
|
| - IncomingMessageWrapper(IncomingMessageWrapper&& other)
|
| + MessageWrapper(MessageWrapper&& other)
|
| : router_(other.router_), value_(std::move(other.value_)) {}
|
|
|
| - ~IncomingMessageWrapper() {
|
| - if (value_.IsNull())
|
| + ~MessageWrapper() {
|
| + if (value_.associated_endpoint_handles()->empty())
|
| return;
|
|
|
| router_->AssertLockAcquired();
|
| -
|
| - uint32_t num_ids = value_.payload_num_interface_ids();
|
| - const uint32_t* ids = value_.payload_interface_ids();
|
| - for (uint32_t i = 0; i < num_ids; ++i) {
|
| + {
|
| MayAutoUnlock unlocker(router_->lock_.get());
|
| - router_->control_message_proxy_.NotifyPeerEndpointClosed(ids[i],
|
| - base::nullopt);
|
| + value_.mutable_associated_endpoint_handles()->clear();
|
| }
|
| }
|
|
|
| - IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) {
|
| + MessageWrapper& operator=(MessageWrapper&& other) {
|
| router_ = other.router_;
|
| value_ = std::move(other.value_);
|
| return *this;
|
| }
|
|
|
| - // Must be called outside of the router's lock.
|
| - bool TakeMessage(Message* output) {
|
| - DCHECK(!value_.IsNull());
|
| -
|
| - *output = std::move(value_);
|
| - return output->DeserializeAssociatedEndpointHandles(router_);
|
| - }
|
| -
|
| - const Message& value() const { return value_; }
|
| + Message& value() { return value_; }
|
|
|
| private:
|
| MultiplexRouter* router_ = nullptr;
|
| - // It must not hold any ScopedInterfaceEndpointHandle objects.
|
| Message value_;
|
|
|
| - DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper);
|
| + DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
|
| };
|
|
|
| struct MultiplexRouter::Task {
|
| public:
|
| // Doesn't take ownership of |message| but takes its contents.
|
| static std::unique_ptr<Task> CreateMessageTask(
|
| - IncomingMessageWrapper message_wrapper) {
|
| + MessageWrapper message_wrapper) {
|
| Task* task = new Task(MESSAGE);
|
| task->message_wrapper = std::move(message_wrapper);
|
| return base::WrapUnique(task);
|
| @@ -353,7 +335,7 @@ struct MultiplexRouter::Task {
|
| bool IsMessageTask() const { return type == MESSAGE; }
|
| bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
|
|
|
| - IncomingMessageWrapper message_wrapper;
|
| + MessageWrapper message_wrapper;
|
| scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
|
|
|
| enum Type { MESSAGE, NOTIFY_ERROR };
|
| @@ -461,6 +443,7 @@ void MultiplexRouter::CreateEndpointHandlePair(
|
| if (encountered_error_)
|
| UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
|
|
|
| + endpoint->set_handle_created();
|
| *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true);
|
| *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false);
|
| }
|
| @@ -629,34 +612,36 @@ void MultiplexRouter::EnableTestingMode() {
|
| bool MultiplexRouter::Accept(Message* message) {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
|
|
| + if (!message->DeserializeAssociatedEndpointHandles(this))
|
| + return false;
|
| +
|
| scoped_refptr<MultiplexRouter> protector(this);
|
| MayAutoLock locker(lock_.get());
|
|
|
| DCHECK(!paused_);
|
|
|
| - IncomingMessageWrapper message_wrapper(this, message);
|
| -
|
| ClientCallBehavior client_call_behavior =
|
| connector_.during_sync_handle_watcher_callback()
|
| ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
|
| : ALLOW_DIRECT_CLIENT_CALLS;
|
|
|
| - bool processed = tasks_.empty() && ProcessIncomingMessage(
|
| - &message_wrapper, client_call_behavior,
|
| - connector_.task_runner());
|
| + bool processed =
|
| + tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
|
| + connector_.task_runner());
|
|
|
| if (!processed) {
|
| // Either the task queue is not empty or we cannot process the message
|
| // directly. In both cases, there is no need to call ProcessTasks().
|
| - tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper)));
|
| + tasks_.push_back(
|
| + Task::CreateMessageTask(MessageWrapper(this, std::move(*message))));
|
| Task* task = tasks_.back().get();
|
|
|
| if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
|
| InterfaceId id = task->message_wrapper.value().interface_id();
|
| sync_message_tasks_[id].push_back(task);
|
| - auto iter = endpoints_.find(id);
|
| - if (iter != endpoints_.end())
|
| - iter->second->SignalSyncMessageEvent();
|
| + InterfaceEndpoint* endpoint = FindEndpoint(id);
|
| + if (endpoint)
|
| + endpoint->SignalSyncMessageEvent();
|
| }
|
| } else if (!tasks_.empty()) {
|
| // Processing the message may result in new tasks (for error notification)
|
| @@ -767,7 +752,7 @@ void MultiplexRouter::ProcessTasks(
|
| task->IsNotifyErrorTask()
|
| ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
|
| current_task_runner)
|
| - : ProcessIncomingMessage(&task->message_wrapper,
|
| + : ProcessIncomingMessage(&task->message_wrapper.value(),
|
| client_call_behavior, current_task_runner);
|
|
|
| if (!processed) {
|
| @@ -801,11 +786,12 @@ bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
|
| iter->second.pop_front();
|
|
|
| DCHECK(task->IsMessageTask());
|
| - IncomingMessageWrapper message_wrapper = std::move(task->message_wrapper);
|
| + MessageWrapper message_wrapper = std::move(task->message_wrapper);
|
|
|
| // Note: after this call, |task| and |iter| may be invalidated.
|
| bool processed = ProcessIncomingMessage(
|
| - &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
|
| + &message_wrapper.value(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES,
|
| + nullptr);
|
| DCHECK(processed);
|
|
|
| iter = sync_message_tasks_.find(id);
|
| @@ -857,29 +843,26 @@ bool MultiplexRouter::ProcessNotifyErrorTask(
|
| }
|
|
|
| bool MultiplexRouter::ProcessIncomingMessage(
|
| - IncomingMessageWrapper* message_wrapper,
|
| + Message* message,
|
| ClientCallBehavior client_call_behavior,
|
| base::SingleThreadTaskRunner* current_task_runner) {
|
| DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
|
| DCHECK(!paused_);
|
| - DCHECK(message_wrapper);
|
| + DCHECK(message);
|
| AssertLockAcquired();
|
|
|
| - if (message_wrapper->value().IsNull()) {
|
| + if (message->IsNull()) {
|
| // This is a sync message and has been processed during sync handle
|
| // watching.
|
| return true;
|
| }
|
|
|
| - if (PipeControlMessageHandler::IsPipeControlMessage(
|
| - &message_wrapper->value())) {
|
| + if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
|
| bool result = false;
|
|
|
| {
|
| MayAutoUnlock unlocker(lock_.get());
|
| - Message message;
|
| - result = message_wrapper->TakeMessage(&message) &&
|
| - control_message_handler_.Accept(&message);
|
| + result = control_message_handler_.Accept(message);
|
| }
|
|
|
| if (!result)
|
| @@ -888,34 +871,11 @@ bool MultiplexRouter::ProcessIncomingMessage(
|
| return true;
|
| }
|
|
|
| - InterfaceId id = message_wrapper->value().interface_id();
|
| + InterfaceId id = message->interface_id();
|
| DCHECK(IsValidInterfaceId(id));
|
|
|
| - bool inserted = false;
|
| - InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
|
| - if (inserted) {
|
| - // Currently, it is legitimate to receive messages for an endpoint
|
| - // that is not registered. For example, the endpoint is transferred in
|
| - // a message that is discarded. Once we add support to specify all
|
| - // enclosing endpoints in message header, we should be able to remove
|
| - // this.
|
| - UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
|
| -
|
| - // It is also possible that this newly-inserted endpoint is the master
|
| - // endpoint. When the master InterfacePtr/Binding goes away, the message
|
| - // pipe is closed and we explicitly trigger a pipe connection error. The
|
| - // error updates all the endpoints, including the master endpoint, with
|
| - // PEER_ENDPOINT_CLOSED and removes the master endpoint from the
|
| - // registration. We continue to process remaining tasks in the queue, as
|
| - // long as there are refs keeping the router alive. If there are remaining
|
| - // messages for the master endpoint, we will get here.
|
| - MayAutoUnlock unlocker(lock_.get());
|
| - if (!IsMasterInterfaceId(id))
|
| - control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt);
|
| - return true;
|
| - }
|
| -
|
| - if (endpoint->closed())
|
| + InterfaceEndpoint* endpoint = FindEndpoint(id);
|
| + if (!endpoint || endpoint->closed())
|
| return true;
|
|
|
| if (!endpoint->client()) {
|
| @@ -925,7 +885,7 @@ bool MultiplexRouter::ProcessIncomingMessage(
|
| }
|
|
|
| bool can_direct_call;
|
| - if (message_wrapper->value().has_flag(Message::kFlagIsSync)) {
|
| + if (message->has_flag(Message::kFlagIsSync)) {
|
| can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
|
| endpoint->task_runner()->BelongsToCurrentThread();
|
| } else {
|
| @@ -950,9 +910,7 @@ bool MultiplexRouter::ProcessIncomingMessage(
|
| // It is safe to call into |client| without the lock. Because |client| is
|
| // always accessed on the same thread, including DetachEndpointClient().
|
| MayAutoUnlock unlocker(lock_.get());
|
| - Message message;
|
| - result = message_wrapper->TakeMessage(&message) &&
|
| - client->HandleIncomingMessage(&message);
|
| + result = client->HandleIncomingMessage(message);
|
| }
|
| if (!result)
|
| RaiseErrorInNonTestingMode();
|
| @@ -1014,20 +972,24 @@ MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
|
| // false.
|
| DCHECK(!inserted || !*inserted);
|
|
|
| - auto iter = endpoints_.find(id);
|
| - InterfaceEndpoint* endpoint;
|
| - if (iter == endpoints_.end()) {
|
| + InterfaceEndpoint* endpoint = FindEndpoint(id);
|
| + if (!endpoint) {
|
| endpoint = new InterfaceEndpoint(this, id);
|
| endpoints_[id] = endpoint;
|
| if (inserted)
|
| *inserted = true;
|
| - } else {
|
| - endpoint = iter->second.get();
|
| }
|
|
|
| return endpoint;
|
| }
|
|
|
| +MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint(
|
| + InterfaceId id) {
|
| + AssertLockAcquired();
|
| + auto iter = endpoints_.find(id);
|
| + return iter != endpoints_.end() ? iter->second.get() : nullptr;
|
| +}
|
| +
|
| void MultiplexRouter::AssertLockAcquired() {
|
| #if DCHECK_IS_ON()
|
| if (lock_)
|
|
|