Index: ipc/ipc_mojo_bootstrap.cc |
diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc |
index 425f794858f02732f6ccf5736f51c0fcfadfb84f..fc39d0d919e8b0a7dae947585d8abd6c93c4d87a 100644 |
--- a/ipc/ipc_mojo_bootstrap.cc |
+++ b/ipc/ipc_mojo_bootstrap.cc |
@@ -121,7 +121,7 @@ class ChannelAssociatedGroupController |
if (!is_local) { |
DCHECK(ContainsKey(endpoints_, id)); |
DCHECK(!mojo::IsMasterInterfaceId(id)); |
- control_message_proxy_.NotifyEndpointClosedBeforeSent(id); |
+ NotifyEndpointClosedBeforeSent(id); |
return; |
} |
@@ -132,7 +132,7 @@ class ChannelAssociatedGroupController |
MarkClosedAndMaybeRemove(endpoint); |
if (!mojo::IsMasterInterfaceId(id)) |
- control_message_proxy_.NotifyPeerEndpointClosed(id); |
+ NotifyPeerEndpointClosed(id); |
} |
mojo::InterfaceEndpointController* AttachEndpointClient( |
@@ -392,6 +392,30 @@ class ChannelAssociatedGroupController |
endpoints_.erase(endpoint->id()); |
} |
+ void NotifyPeerEndpointClosed(mojo::InterfaceId id) { |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ if (connector_.is_valid()) |
+ control_message_proxy_.NotifyPeerEndpointClosed(id); |
+ } else { |
+ task_runner_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&ChannelAssociatedGroupController |
+ ::NotifyPeerEndpointClosed, this, id)); |
+ } |
+ } |
+ |
+ void NotifyEndpointClosedBeforeSent(mojo::InterfaceId id) { |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ if (connector_.is_valid()) |
+ control_message_proxy_.NotifyEndpointClosedBeforeSent(id); |
+ } else { |
+ task_runner_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&ChannelAssociatedGroupController |
+ ::NotifyEndpointClosedBeforeSent, this, id)); |
+ } |
+ } |
+ |
Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) { |
lock_.AssertAcquired(); |
DCHECK(!inserted || !*inserted); |
@@ -411,26 +435,15 @@ class ChannelAssociatedGroupController |
bool Accept(mojo::Message* message) override { |
DCHECK(thread_checker_.CalledOnValidThread()); |
- if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) { |
- if (!control_message_handler_.Accept(message)) |
- RaiseError(); |
- return true; |
- } |
+ if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) |
+ return control_message_handler_.Accept(message); |
mojo::InterfaceId id = message->interface_id(); |
DCHECK(mojo::IsValidInterfaceId(id)); |
base::AutoLock locker(lock_); |
- bool inserted = false; |
- Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
- if (inserted) { |
- MarkClosedAndMaybeRemove(endpoint); |
- if (!mojo::IsMasterInterfaceId(id)) |
- control_message_proxy_.NotifyPeerEndpointClosed(id); |
- return true; |
- } |
- |
- if (endpoint->closed()) |
+ Endpoint* endpoint = GetEndpointForDispatch(id); |
+ if (!endpoint) |
return true; |
mojo::InterfaceEndpointClient* client = endpoint->client(); |
@@ -442,7 +455,6 @@ class ChannelAssociatedGroupController |
// If the client is not yet bound, it must be bound by the time this task |
// runs or else it's programmer error. |
DCHECK(proxy_task_runner_); |
- CHECK(false); |
std::unique_ptr<mojo::Message> passed_message(new mojo::Message); |
message->MoveTo(passed_message.get()); |
proxy_task_runner_->PostTask( |
@@ -456,23 +468,56 @@ class ChannelAssociatedGroupController |
// If it's happening, it's a bug. |
DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); |
+ base::AutoUnlock unlocker(lock_); |
+ return client->HandleIncomingMessage(message); |
+ } |
+ |
+ void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) { |
+ DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
+ |
+ mojo::InterfaceId id = message->interface_id(); |
+ DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); |
+ |
+ base::AutoLock locker(lock_); |
+ Endpoint* endpoint = GetEndpointForDispatch(id); |
+ if (!endpoint) |
+ return; |
+ |
+ mojo::InterfaceEndpointClient* client = endpoint->client(); |
+ if (!client) |
+ return; |
+ |
+ DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
+ |
+ // TODO(rockot): Implement sync dispatch. For now, sync messages are |
+ // unsupported here. |
+ DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); |
+ |
bool result = false; |
{ |
base::AutoUnlock unlocker(lock_); |
- result = client->HandleIncomingMessage(message); |
+ result = client->HandleIncomingMessage(message.get()); |
} |
if (!result) |
RaiseError(); |
- |
- return true; |
} |
- void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) { |
- DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
+ Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) { |
+ lock_.AssertAcquired(); |
+ bool inserted = false; |
+ Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
+ if (inserted) { |
+ MarkClosedAndMaybeRemove(endpoint); |
+ if (!mojo::IsMasterInterfaceId(id)) |
+ NotifyPeerEndpointClosed(id); |
+ return nullptr; |
+ } |
+ |
+ if (endpoint->closed()) |
+ return nullptr; |
- // TODO(rockot): Implement this. |
- NOTREACHED(); |
+ return endpoint; |
} |
// mojo::PipeControlMessageHandlerDelegate: |
@@ -561,6 +606,11 @@ class BootstrapMasterProxy { |
return controller_->associated_group(); |
} |
+ ChannelAssociatedGroupController* controller() { |
+ DCHECK(controller_); |
+ return controller_.get(); |
+ } |
+ |
mojom::Bootstrap* operator->() { |
DCHECK(proxy_); |
return proxy_.get(); |
@@ -596,6 +646,11 @@ class BootstrapMasterBinding { |
return controller_->associated_group(); |
} |
+ ChannelAssociatedGroupController* controller() { |
+ DCHECK(controller_); |
+ return controller_.get(); |
+ } |
+ |
void Bind(mojo::ScopedMessagePipeHandle handle) { |
DCHECK(!controller_); |
controller_ = |
@@ -626,10 +681,16 @@ class MojoServerBootstrap : public MojoBootstrap { |
private: |
// MojoBootstrap implementation. |
void Connect() override; |
+ |
mojo::AssociatedGroup* GetAssociatedGroup() override { |
return bootstrap_.associated_group(); |
} |
+ void SetProxyTaskRunner( |
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner) override { |
+ bootstrap_.controller()->SetProxyTaskRunner(task_runner); |
+ } |
+ |
void OnInitDone(int32_t peer_pid); |
BootstrapMasterProxy bootstrap_; |
@@ -688,10 +749,16 @@ class MojoClientBootstrap : public MojoBootstrap, public mojom::Bootstrap { |
private: |
// MojoBootstrap implementation. |
void Connect() override; |
+ |
mojo::AssociatedGroup* GetAssociatedGroup() override { |
return binding_.associated_group(); |
} |
+ void SetProxyTaskRunner( |
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner) override { |
+ binding_.controller()->SetProxyTaskRunner(task_runner); |
+ } |
+ |
// mojom::Bootstrap implementation. |
void Init(mojom::ChannelAssociatedRequest receive_channel, |
mojom::ChannelAssociatedPtrInfo send_channel, |