Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Unified Diff: ipc/ipc_mojo_bootstrap.cc

Issue 2195953002: Adds sync message support to Channel-associated interfaces. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: ipc/ipc_mojo_bootstrap.cc
diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc
index 02354e3fb0d425c2b521ce106b4a003de16bdd0d..3177d4d55b37ddc88c3438460ab215ce8cb77796 100644
--- a/ipc/ipc_mojo_bootstrap.cc
+++ b/ipc/ipc_mojo_bootstrap.cc
@@ -8,6 +8,7 @@
#include <map>
#include <memory>
+#include <queue>
#include <utility>
#include <vector>
@@ -18,6 +19,7 @@
#include "base/single_thread_task_runner.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread_task_runner_handle.h"
+#include "ipc/mojo_event.h"
#include "mojo/public/cpp/bindings/associated_group.h"
#include "mojo/public/cpp/bindings/associated_group_controller.h"
#include "mojo/public/cpp/bindings/connector.h"
@@ -29,6 +31,7 @@
#include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
#include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
#include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
+#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
namespace IPC {
@@ -273,6 +276,29 @@ class ChannelAssociatedGroupController
task_runner_ = nullptr;
client_ = nullptr;
+ sync_watcher_.reset();
+ }
+
+ void EnqueueSyncMessage(std::unique_ptr<mojo::Message> message) {
+ controller_->lock_.AssertAcquired();
+ sync_messages_.emplace(std::move(message));
+ SignalSyncMessageEvent();
+ }
+
+ void SignalSyncMessageEvent() {
+ controller_->lock_.AssertAcquired();
+ EnsureSyncMessageEventExists();
+ sync_message_event_->Signal();
+ }
+
+ std::unique_ptr<mojo::Message> PopSyncMessage(mojo::Message* raw_message) {
+ controller_->lock_.AssertAcquired();
+ if (sync_messages_.empty() || sync_messages_.front().get() != raw_message)
+ return nullptr;
+ std::unique_ptr<mojo::Message> message =
+ std::move(sync_messages_.front());
+ sync_messages_.pop();
+ return message;
}
// mojo::InterfaceEndpointController:
@@ -285,8 +311,8 @@ class ChannelAssociatedGroupController
void AllowWokenUpBySyncWatchOnSameThread() override {
DCHECK(task_runner_->BelongsToCurrentThread());
- // TODO(rockot): Implement sync waiting.
- NOTREACHED();
+ EnsureSyncWatcherExists();
+ sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
}
bool SyncWatch(const bool* should_stop) override {
@@ -297,15 +323,90 @@ class ChannelAssociatedGroupController
DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
- // TODO(rockot): Implement sync waiting.
- NOTREACHED();
- return false;
+ EnsureSyncWatcherExists();
+ return sync_watcher_->SyncWatch(should_stop);
}
private:
friend class base::RefCountedThreadSafe<Endpoint>;
- ~Endpoint() override {}
+ ~Endpoint() override {
+ controller_->lock_.AssertAcquired();
+ DCHECK(!client_);
+ DCHECK(closed_);
+ DCHECK(peer_closed_);
+ DCHECK(!sync_watcher_);
+ }
+
+ void OnSyncMessageEventHandleReady(MojoResult result) {
+ DCHECK(task_runner_->BelongsToCurrentThread());
+
+ scoped_refptr<Endpoint> keepalive(this);
+ scoped_refptr<AssociatedGroupController> controller_keepalive(
+ controller_);
+
+ bool reset_sync_watcher = false;
+ {
+ base::AutoLock locker(controller_->lock_);
+ bool more_to_process = false;
+ if (!sync_messages_.empty()) {
+ std::unique_ptr<mojo::Message> message(
+ std::move(sync_messages_.front()));
+ sync_messages_.pop();
+
+ bool dispatch_succeeded;
+ mojo::InterfaceEndpointClient* client = client_;
+ {
+ base::AutoUnlock unlocker(controller_->lock_);
+ dispatch_succeeded = client->HandleIncomingMessage(message.get());
+ }
+
+ if (!sync_messages_.empty())
+ more_to_process = true;
+
+ if (!dispatch_succeeded)
+ controller_->RaiseError();
+ }
+
+ if (!more_to_process)
+ sync_message_event_->Reset();
+
+ // If there are no queued sync messages and the peer has closed, there
+ // there won't be incoming sync messages in the future.
+ reset_sync_watcher = !more_to_process && peer_closed_;
+ }
+
+ if (reset_sync_watcher) {
+ // If a SyncWatch() call (or multiple ones) of this interface endpoint
+ // is on the call stack, resetting the sync watcher will allow it to
+ // exit when the call stack unwinds to that frame.
+ sync_watcher_.reset();
+ }
+ }
+
+ void EnsureSyncWatcherExists() {
+ DCHECK(task_runner_->BelongsToCurrentThread());
+ if (sync_watcher_)
+ return;
+
+ {
+ base::AutoLock locker(controller_->lock_);
+ EnsureSyncMessageEventExists();
+ if (!sync_messages_.empty())
+ SignalSyncMessageEvent();
+ }
+
+ sync_watcher_.reset(new mojo::SyncHandleWatcher(
+ sync_message_event_->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&Endpoint::OnSyncMessageEventHandleReady,
+ base::Unretained(this))));
+ }
+
+ void EnsureSyncMessageEventExists() {
+ controller_->lock_.AssertAcquired();
+ if (!sync_message_event_)
+ sync_message_event_.reset(new MojoEvent);
+ }
ChannelAssociatedGroupController* const controller_;
const mojo::InterfaceId id_;
@@ -314,6 +415,9 @@ class ChannelAssociatedGroupController
bool peer_closed_ = false;
mojo::InterfaceEndpointClient* client_ = nullptr;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+ std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_;
+ std::unique_ptr<MojoEvent> sync_message_event_;
+ std::queue<std::unique_ptr<mojo::Message>> sync_messages_;
DISALLOW_COPY_AND_ASSIGN(Endpoint);
};
@@ -423,17 +527,22 @@ class ChannelAssociatedGroupController
endpoint->task_runner()->PostTask(
FROM_HERE,
base::Bind(&ChannelAssociatedGroupController
- ::NotifyEndpointOfErrorOnEndpointThread, this,
- make_scoped_refptr(endpoint)));
+ ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(),
+ endpoint));
}
}
- void NotifyEndpointOfErrorOnEndpointThread(scoped_refptr<Endpoint> endpoint) {
+ void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
+ Endpoint* endpoint) {
base::AutoLock locker(lock_);
+ auto iter = endpoints_.find(id);
+ if (iter == endpoints_.end() || iter->second.get() != endpoint)
+ return;
if (!endpoint->client())
return;
+
DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
- NotifyEndpointOfError(endpoint.get(), false /* force_async */);
+ NotifyEndpointOfError(endpoint, false /* force_async */);
}
void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
@@ -446,6 +555,7 @@ class ChannelAssociatedGroupController
void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
lock_.AssertAcquired();
endpoint->set_peer_closed();
+ endpoint->SignalSyncMessageEvent();
if (endpoint->closed() && endpoint->peer_closed())
endpoints_.erase(endpoint->id());
}
@@ -489,8 +599,25 @@ class ChannelAssociatedGroupController
// If the client is not yet bound, it must be bound by the time this task
// runs or else it's programmer error.
DCHECK(proxy_task_runner_);
+
std::unique_ptr<mojo::Message> passed_message(new mojo::Message);
message->MoveTo(passed_message.get());
+
+ if (passed_message->has_flag(mojo::Message::kFlagIsSync)) {
+ // Sync messages may need to be handled by the endpoint if it's blocking
+ // on a sync reply. We pass ownership of the message to the endpoint's
+ // sync message queue. If the endpoint was blocking, it will dequeue the
+ // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
+ // call will dequeue the message and dispatch it.
+ mojo::Message* raw_message = passed_message.get();
+ endpoint->EnqueueSyncMessage(std::move(passed_message));
+ proxy_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage,
+ this, id, raw_message));
yzshen1 2016/08/02 17:38:37 Raw message pointer may not be an appropriate ID:
Ken Rockot(use gerrit already) 2016/08/02 18:21:24 I've switched to using sequential integers. Overfl
yzshen1 2016/08/02 18:34:11 SGTM
+ return true;
+ }
+
proxy_task_runner_->PostTask(
FROM_HERE,
base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread,
@@ -500,7 +627,8 @@ class ChannelAssociatedGroupController
// We do not expect to receive sync responses on the master endpoint thread.
// If it's happening, it's a bug.
- DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
+ DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) ||
+ !message->has_flag(mojo::Message::kFlagIsResponse));
base::AutoUnlock unlocker(lock_);
return client->HandleIncomingMessage(message);
@@ -512,6 +640,7 @@ class ChannelAssociatedGroupController
mojo::InterfaceId id = message->interface_id();
DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));
+ scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
yzshen1 2016/08/02 17:38:37 I think this line is not needed because the base::
Ken Rockot(use gerrit already) 2016/08/02 18:21:24 Done
base::AutoLock locker(lock_);
Endpoint* endpoint = GetEndpointForDispatch(id);
if (!endpoint)
@@ -523,8 +652,7 @@ class ChannelAssociatedGroupController
DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
- // TODO(rockot): Implement sync dispatch. For now, sync messages are
- // unsupported here.
+ // Sync messages should never make their way to this method.
DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
bool result = false;
@@ -537,6 +665,39 @@ class ChannelAssociatedGroupController
RaiseError();
}
+ void AcceptSyncMessage(mojo::InterfaceId interface_id,
+ mojo::Message* raw_message) {
+ DCHECK(proxy_task_runner_->BelongsToCurrentThread());
+
+ scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
+ base::AutoLock locker(lock_);
+ Endpoint* endpoint = GetEndpointForDispatch(interface_id);
+ if (!endpoint)
+ return;
+
+ mojo::InterfaceEndpointClient* client = endpoint->client();
+ if (!client)
yzshen1 2016/08/02 17:38:37 If there is not client, what should we do here? Sh
Ken Rockot(use gerrit already) 2016/08/02 18:21:24 Fixed to drop the message in this case
yzshen1 2016/08/02 18:34:11 Btw, I think we don't have code to disallow re-att
+ return;
+
+ DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
+ std::unique_ptr<mojo::Message> message =
+ endpoint->PopSyncMessage(raw_message);
+
+ // The message must have already been dequeued by the endpoint waking up
+ // from a sync wait. Nothing to do.
+ if (!message)
+ return;
+
+ bool result = false;
+ {
+ base::AutoUnlock unlocker(lock_);
+ result = client->HandleIncomingMessage(message.get());
+ }
+
+ if (!result)
+ RaiseError();
+ }
+
Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) {
lock_.AssertAcquired();
bool inserted = false;

Powered by Google App Engine
This is Rietveld 408576698