Index: ipc/ipc_mojo_bootstrap.cc |
diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc |
index 02354e3fb0d425c2b521ce106b4a003de16bdd0d..d92fd3ef10a01fb975ef35e3bc1e72a750987cfb 100644 |
--- a/ipc/ipc_mojo_bootstrap.cc |
+++ b/ipc/ipc_mojo_bootstrap.cc |
@@ -8,6 +8,7 @@ |
#include <map> |
#include <memory> |
+#include <queue> |
#include <utility> |
#include <vector> |
@@ -18,6 +19,7 @@ |
#include "base/single_thread_task_runner.h" |
#include "base/synchronization/lock.h" |
#include "base/threading/thread_task_runner_handle.h" |
+#include "ipc/mojo_event.h" |
#include "mojo/public/cpp/bindings/associated_group.h" |
#include "mojo/public/cpp/bindings/associated_group_controller.h" |
#include "mojo/public/cpp/bindings/connector.h" |
@@ -29,6 +31,7 @@ |
#include "mojo/public/cpp/bindings/pipe_control_message_handler.h" |
#include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" |
#include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" |
+#include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
namespace IPC { |
@@ -273,6 +276,31 @@ class ChannelAssociatedGroupController |
task_runner_ = nullptr; |
client_ = nullptr; |
+ sync_watcher_.reset(); |
+ } |
+ |
+ uint32_t EnqueueSyncMessage(std::unique_ptr<mojo::Message> message) { |
+ controller_->lock_.AssertAcquired(); |
+ uint32_t id = GenerateSyncMessageId(); |
+ sync_messages_.emplace(id, std::move(message)); |
+ SignalSyncMessageEvent(); |
+ return id; |
+ } |
+ |
+ void SignalSyncMessageEvent() { |
+ controller_->lock_.AssertAcquired(); |
+ EnsureSyncMessageEventExists(); |
+ sync_message_event_->Signal(); |
+ } |
+ |
+ std::unique_ptr<mojo::Message> PopSyncMessage(uint32_t id) { |
+ controller_->lock_.AssertAcquired(); |
+ if (sync_messages_.empty() || sync_messages_.front().first != id) |
+ return nullptr; |
+ std::unique_ptr<mojo::Message> message = |
+ std::move(sync_messages_.front().second); |
+ sync_messages_.pop(); |
+ return message; |
} |
// mojo::InterfaceEndpointController: |
@@ -285,8 +313,8 @@ class ChannelAssociatedGroupController |
void AllowWokenUpBySyncWatchOnSameThread() override { |
DCHECK(task_runner_->BelongsToCurrentThread()); |
- // TODO(rockot): Implement sync waiting. |
- NOTREACHED(); |
+ EnsureSyncWatcherExists(); |
+ sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
} |
bool SyncWatch(const bool* should_stop) override { |
@@ -297,15 +325,97 @@ class ChannelAssociatedGroupController |
DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); |
DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); |
- // TODO(rockot): Implement sync waiting. |
- NOTREACHED(); |
- return false; |
+ EnsureSyncWatcherExists(); |
+ return sync_watcher_->SyncWatch(should_stop); |
} |
private: |
friend class base::RefCountedThreadSafe<Endpoint>; |
- ~Endpoint() override {} |
+ ~Endpoint() override { |
+ controller_->lock_.AssertAcquired(); |
+ DCHECK(!client_); |
+ DCHECK(closed_); |
+ DCHECK(peer_closed_); |
+ DCHECK(!sync_watcher_); |
+ } |
+ |
+ void OnSyncMessageEventHandleReady(MojoResult result) { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ |
+ scoped_refptr<Endpoint> keepalive(this); |
+ scoped_refptr<AssociatedGroupController> controller_keepalive( |
+ controller_); |
+ |
+ bool reset_sync_watcher = false; |
+ { |
+ base::AutoLock locker(controller_->lock_); |
+ bool more_to_process = false; |
+ if (!sync_messages_.empty()) { |
+ std::unique_ptr<mojo::Message> message( |
+ std::move(sync_messages_.front().second)); |
+ sync_messages_.pop(); |
+ |
+ bool dispatch_succeeded; |
+ mojo::InterfaceEndpointClient* client = client_; |
+ { |
+ base::AutoUnlock unlocker(controller_->lock_); |
+ dispatch_succeeded = client->HandleIncomingMessage(message.get()); |
+ } |
+ |
+ if (!sync_messages_.empty()) |
+ more_to_process = true; |
+ |
+ if (!dispatch_succeeded) |
+ controller_->RaiseError(); |
+ } |
+ |
+ if (!more_to_process) |
+ sync_message_event_->Reset(); |
+ |
+ // If there are no queued sync messages and the peer has closed, there |
+ // there won't be incoming sync messages in the future. |
+ reset_sync_watcher = !more_to_process && peer_closed_; |
+ } |
+ |
+ if (reset_sync_watcher) { |
+ // If a SyncWatch() call (or multiple ones) of this interface endpoint |
+ // is on the call stack, resetting the sync watcher will allow it to |
+ // exit when the call stack unwinds to that frame. |
+ sync_watcher_.reset(); |
+ } |
+ } |
+ |
+ void EnsureSyncWatcherExists() { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ if (sync_watcher_) |
+ return; |
+ |
+ { |
+ base::AutoLock locker(controller_->lock_); |
+ EnsureSyncMessageEventExists(); |
+ if (!sync_messages_.empty()) |
+ SignalSyncMessageEvent(); |
+ } |
+ |
+ sync_watcher_.reset(new mojo::SyncHandleWatcher( |
+ sync_message_event_->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
+ base::Bind(&Endpoint::OnSyncMessageEventHandleReady, |
+ base::Unretained(this)))); |
+ } |
+ |
+ void EnsureSyncMessageEventExists() { |
+ controller_->lock_.AssertAcquired(); |
+ if (!sync_message_event_) |
+ sync_message_event_.reset(new MojoEvent); |
+ } |
+ |
+ uint32_t GenerateSyncMessageId() { |
+ // Overflow is fine. |
+ uint32_t id = next_sync_message_id_++; |
+ DCHECK(sync_messages_.empty() || sync_messages_.front().first != id); |
+ return id; |
+ } |
ChannelAssociatedGroupController* const controller_; |
const mojo::InterfaceId id_; |
@@ -314,6 +424,11 @@ class ChannelAssociatedGroupController |
bool peer_closed_ = false; |
mojo::InterfaceEndpointClient* client_ = nullptr; |
scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
+ std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; |
+ std::unique_ptr<MojoEvent> sync_message_event_; |
+ std::queue<std::pair<uint32_t, std::unique_ptr<mojo::Message>>> |
+ sync_messages_; |
+ uint32_t next_sync_message_id_ = 0; |
DISALLOW_COPY_AND_ASSIGN(Endpoint); |
}; |
@@ -423,17 +538,22 @@ class ChannelAssociatedGroupController |
endpoint->task_runner()->PostTask( |
FROM_HERE, |
base::Bind(&ChannelAssociatedGroupController |
- ::NotifyEndpointOfErrorOnEndpointThread, this, |
- make_scoped_refptr(endpoint))); |
+ ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), |
+ endpoint)); |
} |
} |
- void NotifyEndpointOfErrorOnEndpointThread(scoped_refptr<Endpoint> endpoint) { |
+ void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, |
+ Endpoint* endpoint) { |
base::AutoLock locker(lock_); |
+ auto iter = endpoints_.find(id); |
+ if (iter == endpoints_.end() || iter->second.get() != endpoint) |
+ return; |
if (!endpoint->client()) |
return; |
+ |
DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
- NotifyEndpointOfError(endpoint.get(), false /* force_async */); |
+ NotifyEndpointOfError(endpoint, false /* force_async */); |
} |
void MarkClosedAndMaybeRemove(Endpoint* endpoint) { |
@@ -446,6 +566,7 @@ class ChannelAssociatedGroupController |
void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) { |
lock_.AssertAcquired(); |
endpoint->set_peer_closed(); |
+ endpoint->SignalSyncMessageEvent(); |
if (endpoint->closed() && endpoint->peer_closed()) |
endpoints_.erase(endpoint->id()); |
} |
@@ -489,8 +610,25 @@ class ChannelAssociatedGroupController |
// If the client is not yet bound, it must be bound by the time this task |
// runs or else it's programmer error. |
DCHECK(proxy_task_runner_); |
+ |
std::unique_ptr<mojo::Message> passed_message(new mojo::Message); |
message->MoveTo(passed_message.get()); |
+ |
+ if (passed_message->has_flag(mojo::Message::kFlagIsSync)) { |
+ // Sync messages may need to be handled by the endpoint if it's blocking |
+ // on a sync reply. We pass ownership of the message to the endpoint's |
+ // sync message queue. If the endpoint was blocking, it will dequeue the |
+ // message and dispatch it. Otherwise the posted |AcceptSyncMessage()| |
+ // call will dequeue the message and dispatch it. |
+ uint32_t message_id = |
+ endpoint->EnqueueSyncMessage(std::move(passed_message)); |
+ proxy_task_runner_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage, |
+ this, id, message_id)); |
+ return true; |
+ } |
+ |
proxy_task_runner_->PostTask( |
FROM_HERE, |
base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, |
@@ -500,7 +638,8 @@ class ChannelAssociatedGroupController |
// We do not expect to receive sync responses on the master endpoint thread. |
// If it's happening, it's a bug. |
- DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); |
+ DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) || |
+ !message->has_flag(mojo::Message::kFlagIsResponse)); |
base::AutoUnlock unlocker(lock_); |
return client->HandleIncomingMessage(message); |
@@ -523,8 +662,7 @@ class ChannelAssociatedGroupController |
DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
- // TODO(rockot): Implement sync dispatch. For now, sync messages are |
- // unsupported here. |
+ // Sync messages should never make their way to this method. |
DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); |
bool result = false; |
@@ -537,6 +675,37 @@ class ChannelAssociatedGroupController |
RaiseError(); |
} |
+ void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { |
+ DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
+ |
+ base::AutoLock locker(lock_); |
+ Endpoint* endpoint = GetEndpointForDispatch(interface_id); |
+ if (!endpoint) |
+ return; |
+ |
+ DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
+ std::unique_ptr<mojo::Message> message = |
+ endpoint->PopSyncMessage(message_id); |
+ |
+ // The message must have already been dequeued by the endpoint waking up |
+ // from a sync wait. Nothing to do. |
+ if (!message) |
+ return; |
+ |
+ mojo::InterfaceEndpointClient* client = endpoint->client(); |
+ if (!client) |
+ return; |
+ |
+ bool result = false; |
+ { |
+ base::AutoUnlock unlocker(lock_); |
+ result = client->HandleIncomingMessage(message.get()); |
+ } |
+ |
+ if (!result) |
+ RaiseError(); |
+ } |
+ |
Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) { |
lock_.AssertAcquired(); |
bool inserted = false; |