Index: ipc/ipc_mojo_bootstrap.cc |
diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc |
index 02354e3fb0d425c2b521ce106b4a003de16bdd0d..3177d4d55b37ddc88c3438460ab215ce8cb77796 100644 |
--- a/ipc/ipc_mojo_bootstrap.cc |
+++ b/ipc/ipc_mojo_bootstrap.cc |
@@ -8,6 +8,7 @@ |
#include <map> |
#include <memory> |
+#include <queue> |
#include <utility> |
#include <vector> |
@@ -18,6 +19,7 @@ |
#include "base/single_thread_task_runner.h" |
#include "base/synchronization/lock.h" |
#include "base/threading/thread_task_runner_handle.h" |
+#include "ipc/mojo_event.h" |
#include "mojo/public/cpp/bindings/associated_group.h" |
#include "mojo/public/cpp/bindings/associated_group_controller.h" |
#include "mojo/public/cpp/bindings/connector.h" |
@@ -29,6 +31,7 @@ |
#include "mojo/public/cpp/bindings/pipe_control_message_handler.h" |
#include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" |
#include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" |
+#include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
namespace IPC { |
@@ -273,6 +276,29 @@ class ChannelAssociatedGroupController |
task_runner_ = nullptr; |
client_ = nullptr; |
+ sync_watcher_.reset(); |
+ } |
+ |
+ void EnqueueSyncMessage(std::unique_ptr<mojo::Message> message) { |
+ controller_->lock_.AssertAcquired(); |
+ sync_messages_.emplace(std::move(message)); |
+ SignalSyncMessageEvent(); |
+ } |
+ |
+ void SignalSyncMessageEvent() { |
+ controller_->lock_.AssertAcquired(); |
+ EnsureSyncMessageEventExists(); |
+ sync_message_event_->Signal(); |
+ } |
+ |
+ std::unique_ptr<mojo::Message> PopSyncMessage(mojo::Message* raw_message) { |
+ controller_->lock_.AssertAcquired(); |
+ if (sync_messages_.empty() || sync_messages_.front().get() != raw_message) |
+ return nullptr; |
+ std::unique_ptr<mojo::Message> message = |
+ std::move(sync_messages_.front()); |
+ sync_messages_.pop(); |
+ return message; |
} |
// mojo::InterfaceEndpointController: |
@@ -285,8 +311,8 @@ class ChannelAssociatedGroupController |
void AllowWokenUpBySyncWatchOnSameThread() override { |
DCHECK(task_runner_->BelongsToCurrentThread()); |
- // TODO(rockot): Implement sync waiting. |
- NOTREACHED(); |
+ EnsureSyncWatcherExists(); |
+ sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
} |
bool SyncWatch(const bool* should_stop) override { |
@@ -297,15 +323,90 @@ class ChannelAssociatedGroupController |
DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); |
DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); |
- // TODO(rockot): Implement sync waiting. |
- NOTREACHED(); |
- return false; |
+ EnsureSyncWatcherExists(); |
+ return sync_watcher_->SyncWatch(should_stop); |
} |
private: |
friend class base::RefCountedThreadSafe<Endpoint>; |
- ~Endpoint() override {} |
+ ~Endpoint() override { |
+ controller_->lock_.AssertAcquired(); |
+ DCHECK(!client_); |
+ DCHECK(closed_); |
+ DCHECK(peer_closed_); |
+ DCHECK(!sync_watcher_); |
+ } |
+ |
+ void OnSyncMessageEventHandleReady(MojoResult result) { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ |
+ scoped_refptr<Endpoint> keepalive(this); |
+ scoped_refptr<AssociatedGroupController> controller_keepalive( |
+ controller_); |
+ |
+ bool reset_sync_watcher = false; |
+ { |
+ base::AutoLock locker(controller_->lock_); |
+ bool more_to_process = false; |
+ if (!sync_messages_.empty()) { |
+ std::unique_ptr<mojo::Message> message( |
+ std::move(sync_messages_.front())); |
+ sync_messages_.pop(); |
+ |
+ bool dispatch_succeeded; |
+ mojo::InterfaceEndpointClient* client = client_; |
+ { |
+ base::AutoUnlock unlocker(controller_->lock_); |
+ dispatch_succeeded = client->HandleIncomingMessage(message.get()); |
+ } |
+ |
+ if (!sync_messages_.empty()) |
+ more_to_process = true; |
+ |
+ if (!dispatch_succeeded) |
+ controller_->RaiseError(); |
+ } |
+ |
+ if (!more_to_process) |
+ sync_message_event_->Reset(); |
+ |
+ // If there are no queued sync messages and the peer has closed, there |
+ // there won't be incoming sync messages in the future. |
+ reset_sync_watcher = !more_to_process && peer_closed_; |
+ } |
+ |
+ if (reset_sync_watcher) { |
+ // If a SyncWatch() call (or multiple ones) of this interface endpoint |
+ // is on the call stack, resetting the sync watcher will allow it to |
+ // exit when the call stack unwinds to that frame. |
+ sync_watcher_.reset(); |
+ } |
+ } |
+ |
+ void EnsureSyncWatcherExists() { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ if (sync_watcher_) |
+ return; |
+ |
+ { |
+ base::AutoLock locker(controller_->lock_); |
+ EnsureSyncMessageEventExists(); |
+ if (!sync_messages_.empty()) |
+ SignalSyncMessageEvent(); |
+ } |
+ |
+ sync_watcher_.reset(new mojo::SyncHandleWatcher( |
+ sync_message_event_->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
+ base::Bind(&Endpoint::OnSyncMessageEventHandleReady, |
+ base::Unretained(this)))); |
+ } |
+ |
+ void EnsureSyncMessageEventExists() { |
+ controller_->lock_.AssertAcquired(); |
+ if (!sync_message_event_) |
+ sync_message_event_.reset(new MojoEvent); |
+ } |
ChannelAssociatedGroupController* const controller_; |
const mojo::InterfaceId id_; |
@@ -314,6 +415,9 @@ class ChannelAssociatedGroupController |
bool peer_closed_ = false; |
mojo::InterfaceEndpointClient* client_ = nullptr; |
scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
+ std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; |
+ std::unique_ptr<MojoEvent> sync_message_event_; |
+ std::queue<std::unique_ptr<mojo::Message>> sync_messages_; |
DISALLOW_COPY_AND_ASSIGN(Endpoint); |
}; |
@@ -423,17 +527,22 @@ class ChannelAssociatedGroupController |
endpoint->task_runner()->PostTask( |
FROM_HERE, |
base::Bind(&ChannelAssociatedGroupController |
- ::NotifyEndpointOfErrorOnEndpointThread, this, |
- make_scoped_refptr(endpoint))); |
+ ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), |
+ endpoint)); |
} |
} |
- void NotifyEndpointOfErrorOnEndpointThread(scoped_refptr<Endpoint> endpoint) { |
+ void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, |
+ Endpoint* endpoint) { |
base::AutoLock locker(lock_); |
+ auto iter = endpoints_.find(id); |
+ if (iter == endpoints_.end() || iter->second.get() != endpoint) |
+ return; |
if (!endpoint->client()) |
return; |
+ |
DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
- NotifyEndpointOfError(endpoint.get(), false /* force_async */); |
+ NotifyEndpointOfError(endpoint, false /* force_async */); |
} |
void MarkClosedAndMaybeRemove(Endpoint* endpoint) { |
@@ -446,6 +555,7 @@ class ChannelAssociatedGroupController |
void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) { |
lock_.AssertAcquired(); |
endpoint->set_peer_closed(); |
+ endpoint->SignalSyncMessageEvent(); |
if (endpoint->closed() && endpoint->peer_closed()) |
endpoints_.erase(endpoint->id()); |
} |
@@ -489,8 +599,25 @@ class ChannelAssociatedGroupController |
// If the client is not yet bound, it must be bound by the time this task |
// runs or else it's programmer error. |
DCHECK(proxy_task_runner_); |
+ |
std::unique_ptr<mojo::Message> passed_message(new mojo::Message); |
message->MoveTo(passed_message.get()); |
+ |
+ if (passed_message->has_flag(mojo::Message::kFlagIsSync)) { |
+ // Sync messages may need to be handled by the endpoint if it's blocking |
+ // on a sync reply. We pass ownership of the message to the endpoint's |
+ // sync message queue. If the endpoint was blocking, it will dequeue the |
+ // message and dispatch it. Otherwise the posted |AcceptSyncMessage()| |
+ // call will dequeue the message and dispatch it. |
+ mojo::Message* raw_message = passed_message.get(); |
+ endpoint->EnqueueSyncMessage(std::move(passed_message)); |
+ proxy_task_runner_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage, |
+ this, id, raw_message)); |
yzshen1
2016/08/02 17:38:37
Raw message pointer may not be an appropriate ID:
Ken Rockot(use gerrit already)
2016/08/02 18:21:24
I've switched to using sequential integers. Overfl
yzshen1
2016/08/02 18:34:11
SGTM
|
+ return true; |
+ } |
+ |
proxy_task_runner_->PostTask( |
FROM_HERE, |
base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, |
@@ -500,7 +627,8 @@ class ChannelAssociatedGroupController |
// We do not expect to receive sync responses on the master endpoint thread. |
// If it's happening, it's a bug. |
- DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); |
+ DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) || |
+ !message->has_flag(mojo::Message::kFlagIsResponse)); |
base::AutoUnlock unlocker(lock_); |
return client->HandleIncomingMessage(message); |
@@ -512,6 +640,7 @@ class ChannelAssociatedGroupController |
mojo::InterfaceId id = message->interface_id(); |
DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); |
+ scoped_refptr<ChannelAssociatedGroupController> keepalive(this); |
yzshen1
2016/08/02 17:38:37
I think this line is not needed because the base::
Ken Rockot(use gerrit already)
2016/08/02 18:21:24
Done
|
base::AutoLock locker(lock_); |
Endpoint* endpoint = GetEndpointForDispatch(id); |
if (!endpoint) |
@@ -523,8 +652,7 @@ class ChannelAssociatedGroupController |
DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
- // TODO(rockot): Implement sync dispatch. For now, sync messages are |
- // unsupported here. |
+ // Sync messages should never make their way to this method. |
DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); |
bool result = false; |
@@ -537,6 +665,39 @@ class ChannelAssociatedGroupController |
RaiseError(); |
} |
+ void AcceptSyncMessage(mojo::InterfaceId interface_id, |
+ mojo::Message* raw_message) { |
+ DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
+ |
+ scoped_refptr<ChannelAssociatedGroupController> keepalive(this); |
+ base::AutoLock locker(lock_); |
+ Endpoint* endpoint = GetEndpointForDispatch(interface_id); |
+ if (!endpoint) |
+ return; |
+ |
+ mojo::InterfaceEndpointClient* client = endpoint->client(); |
+ if (!client) |
yzshen1
2016/08/02 17:38:37
If there is not client, what should we do here? Sh
Ken Rockot(use gerrit already)
2016/08/02 18:21:24
Fixed to drop the message in this case
yzshen1
2016/08/02 18:34:11
Btw, I think we don't have code to disallow re-att
|
+ return; |
+ |
+ DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
+ std::unique_ptr<mojo::Message> message = |
+ endpoint->PopSyncMessage(raw_message); |
+ |
+ // The message must have already been dequeued by the endpoint waking up |
+ // from a sync wait. Nothing to do. |
+ if (!message) |
+ return; |
+ |
+ bool result = false; |
+ { |
+ base::AutoUnlock unlocker(lock_); |
+ result = client->HandleIncomingMessage(message.get()); |
+ } |
+ |
+ if (!result) |
+ RaiseError(); |
+ } |
+ |
Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) { |
lock_.AssertAcquired(); |
bool inserted = false; |