Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(456)

Unified Diff: ipc/ipc_mojo_bootstrap.cc

Issue 2147493006: Adds Channel-associated interface support on ChannelProxy's thread (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « ipc/ipc_mojo_bootstrap.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: ipc/ipc_mojo_bootstrap.cc
diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc
index 425f794858f02732f6ccf5736f51c0fcfadfb84f..fc39d0d919e8b0a7dae947585d8abd6c93c4d87a 100644
--- a/ipc/ipc_mojo_bootstrap.cc
+++ b/ipc/ipc_mojo_bootstrap.cc
@@ -121,7 +121,7 @@ class ChannelAssociatedGroupController
if (!is_local) {
DCHECK(ContainsKey(endpoints_, id));
DCHECK(!mojo::IsMasterInterfaceId(id));
- control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
+ NotifyEndpointClosedBeforeSent(id);
return;
}
@@ -132,7 +132,7 @@ class ChannelAssociatedGroupController
MarkClosedAndMaybeRemove(endpoint);
if (!mojo::IsMasterInterfaceId(id))
- control_message_proxy_.NotifyPeerEndpointClosed(id);
+ NotifyPeerEndpointClosed(id);
}
mojo::InterfaceEndpointController* AttachEndpointClient(
@@ -392,6 +392,30 @@ class ChannelAssociatedGroupController
endpoints_.erase(endpoint->id());
}
+ void NotifyPeerEndpointClosed(mojo::InterfaceId id) {
+ if (task_runner_->BelongsToCurrentThread()) {
+ if (connector_.is_valid())
+ control_message_proxy_.NotifyPeerEndpointClosed(id);
+ } else {
+ task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&ChannelAssociatedGroupController
+ ::NotifyPeerEndpointClosed, this, id));
+ }
+ }
+
+ void NotifyEndpointClosedBeforeSent(mojo::InterfaceId id) {
+ if (task_runner_->BelongsToCurrentThread()) {
+ if (connector_.is_valid())
+ control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
+ } else {
+ task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&ChannelAssociatedGroupController
+ ::NotifyEndpointClosedBeforeSent, this, id));
+ }
+ }
+
Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) {
lock_.AssertAcquired();
DCHECK(!inserted || !*inserted);
@@ -411,26 +435,15 @@ class ChannelAssociatedGroupController
bool Accept(mojo::Message* message) override {
DCHECK(thread_checker_.CalledOnValidThread());
- if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) {
- if (!control_message_handler_.Accept(message))
- RaiseError();
- return true;
- }
+ if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
+ return control_message_handler_.Accept(message);
mojo::InterfaceId id = message->interface_id();
DCHECK(mojo::IsValidInterfaceId(id));
base::AutoLock locker(lock_);
- bool inserted = false;
- Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
- if (inserted) {
- MarkClosedAndMaybeRemove(endpoint);
- if (!mojo::IsMasterInterfaceId(id))
- control_message_proxy_.NotifyPeerEndpointClosed(id);
- return true;
- }
-
- if (endpoint->closed())
+ Endpoint* endpoint = GetEndpointForDispatch(id);
+ if (!endpoint)
return true;
mojo::InterfaceEndpointClient* client = endpoint->client();
@@ -442,7 +455,6 @@ class ChannelAssociatedGroupController
// If the client is not yet bound, it must be bound by the time this task
// runs or else it's programmer error.
DCHECK(proxy_task_runner_);
- CHECK(false);
std::unique_ptr<mojo::Message> passed_message(new mojo::Message);
message->MoveTo(passed_message.get());
proxy_task_runner_->PostTask(
@@ -456,23 +468,56 @@ class ChannelAssociatedGroupController
// If it's happening, it's a bug.
DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
+ base::AutoUnlock unlocker(lock_);
+ return client->HandleIncomingMessage(message);
+ }
+
+ void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) {
+ DCHECK(proxy_task_runner_->BelongsToCurrentThread());
+
+ mojo::InterfaceId id = message->interface_id();
+ DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));
+
+ base::AutoLock locker(lock_);
+ Endpoint* endpoint = GetEndpointForDispatch(id);
+ if (!endpoint)
+ return;
+
+ mojo::InterfaceEndpointClient* client = endpoint->client();
+ if (!client)
+ return;
+
+ DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
+
+ // TODO(rockot): Implement sync dispatch. For now, sync messages are
+ // unsupported here.
+ DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
+
bool result = false;
{
base::AutoUnlock unlocker(lock_);
- result = client->HandleIncomingMessage(message);
+ result = client->HandleIncomingMessage(message.get());
}
if (!result)
RaiseError();
-
- return true;
}
- void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) {
- DCHECK(proxy_task_runner_->BelongsToCurrentThread());
+ Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) {
+ lock_.AssertAcquired();
+ bool inserted = false;
+ Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
+ if (inserted) {
+ MarkClosedAndMaybeRemove(endpoint);
+ if (!mojo::IsMasterInterfaceId(id))
+ NotifyPeerEndpointClosed(id);
+ return nullptr;
+ }
+
+ if (endpoint->closed())
+ return nullptr;
- // TODO(rockot): Implement this.
- NOTREACHED();
+ return endpoint;
}
// mojo::PipeControlMessageHandlerDelegate:
@@ -561,6 +606,11 @@ class BootstrapMasterProxy {
return controller_->associated_group();
}
+ ChannelAssociatedGroupController* controller() {
+ DCHECK(controller_);
+ return controller_.get();
+ }
+
mojom::Bootstrap* operator->() {
DCHECK(proxy_);
return proxy_.get();
@@ -596,6 +646,11 @@ class BootstrapMasterBinding {
return controller_->associated_group();
}
+ ChannelAssociatedGroupController* controller() {
+ DCHECK(controller_);
+ return controller_.get();
+ }
+
void Bind(mojo::ScopedMessagePipeHandle handle) {
DCHECK(!controller_);
controller_ =
@@ -626,10 +681,16 @@ class MojoServerBootstrap : public MojoBootstrap {
private:
// MojoBootstrap implementation.
void Connect() override;
+
mojo::AssociatedGroup* GetAssociatedGroup() override {
return bootstrap_.associated_group();
}
+ void SetProxyTaskRunner(
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner) override {
+ bootstrap_.controller()->SetProxyTaskRunner(task_runner);
+ }
+
void OnInitDone(int32_t peer_pid);
BootstrapMasterProxy bootstrap_;
@@ -688,10 +749,16 @@ class MojoClientBootstrap : public MojoBootstrap, public mojom::Bootstrap {
private:
// MojoBootstrap implementation.
void Connect() override;
+
mojo::AssociatedGroup* GetAssociatedGroup() override {
return binding_.associated_group();
}
+ void SetProxyTaskRunner(
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner) override {
+ binding_.controller()->SetProxyTaskRunner(task_runner);
+ }
+
// mojom::Bootstrap implementation.
void Init(mojom::ChannelAssociatedRequest receive_channel,
mojom::ChannelAssociatedPtrInfo send_channel,
« no previous file with comments | « ipc/ipc_mojo_bootstrap.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698