Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(206)

Unified Diff: ipc/ipc_mojo_bootstrap.cc

Issue 2195953002: Adds sync message support to Channel-associated interfaces. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: nits Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « ipc/ipc_channel_mojo_unittest.cc ('k') | ipc/ipc_sync_channel.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: ipc/ipc_mojo_bootstrap.cc
diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc
index 02354e3fb0d425c2b521ce106b4a003de16bdd0d..d92fd3ef10a01fb975ef35e3bc1e72a750987cfb 100644
--- a/ipc/ipc_mojo_bootstrap.cc
+++ b/ipc/ipc_mojo_bootstrap.cc
@@ -8,6 +8,7 @@
#include <map>
#include <memory>
+#include <queue>
#include <utility>
#include <vector>
@@ -18,6 +19,7 @@
#include "base/single_thread_task_runner.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread_task_runner_handle.h"
+#include "ipc/mojo_event.h"
#include "mojo/public/cpp/bindings/associated_group.h"
#include "mojo/public/cpp/bindings/associated_group_controller.h"
#include "mojo/public/cpp/bindings/connector.h"
@@ -29,6 +31,7 @@
#include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
#include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
#include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
+#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
namespace IPC {
@@ -273,6 +276,31 @@ class ChannelAssociatedGroupController
task_runner_ = nullptr;
client_ = nullptr;
+ sync_watcher_.reset();
+ }
+
+ uint32_t EnqueueSyncMessage(std::unique_ptr<mojo::Message> message) {
+ controller_->lock_.AssertAcquired();
+ uint32_t id = GenerateSyncMessageId();
+ sync_messages_.emplace(id, std::move(message));
+ SignalSyncMessageEvent();
+ return id;
+ }
+
+ void SignalSyncMessageEvent() {
+ controller_->lock_.AssertAcquired();
+ EnsureSyncMessageEventExists();
+ sync_message_event_->Signal();
+ }
+
+ std::unique_ptr<mojo::Message> PopSyncMessage(uint32_t id) {
+ controller_->lock_.AssertAcquired();
+ if (sync_messages_.empty() || sync_messages_.front().first != id)
+ return nullptr;
+ std::unique_ptr<mojo::Message> message =
+ std::move(sync_messages_.front().second);
+ sync_messages_.pop();
+ return message;
}
// mojo::InterfaceEndpointController:
@@ -285,8 +313,8 @@ class ChannelAssociatedGroupController
void AllowWokenUpBySyncWatchOnSameThread() override {
DCHECK(task_runner_->BelongsToCurrentThread());
- // TODO(rockot): Implement sync waiting.
- NOTREACHED();
+ EnsureSyncWatcherExists();
+ sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
}
bool SyncWatch(const bool* should_stop) override {
@@ -297,15 +325,97 @@ class ChannelAssociatedGroupController
DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
- // TODO(rockot): Implement sync waiting.
- NOTREACHED();
- return false;
+ EnsureSyncWatcherExists();
+ return sync_watcher_->SyncWatch(should_stop);
}
private:
friend class base::RefCountedThreadSafe<Endpoint>;
- ~Endpoint() override {}
+ ~Endpoint() override {
+ controller_->lock_.AssertAcquired();
+ DCHECK(!client_);
+ DCHECK(closed_);
+ DCHECK(peer_closed_);
+ DCHECK(!sync_watcher_);
+ }
+
+ void OnSyncMessageEventHandleReady(MojoResult result) {
+ DCHECK(task_runner_->BelongsToCurrentThread());
+
+ scoped_refptr<Endpoint> keepalive(this);
+ scoped_refptr<AssociatedGroupController> controller_keepalive(
+ controller_);
+
+ bool reset_sync_watcher = false;
+ {
+ base::AutoLock locker(controller_->lock_);
+ bool more_to_process = false;
+ if (!sync_messages_.empty()) {
+ std::unique_ptr<mojo::Message> message(
+ std::move(sync_messages_.front().second));
+ sync_messages_.pop();
+
+ bool dispatch_succeeded;
+ mojo::InterfaceEndpointClient* client = client_;
+ {
+ base::AutoUnlock unlocker(controller_->lock_);
+ dispatch_succeeded = client->HandleIncomingMessage(message.get());
+ }
+
+ if (!sync_messages_.empty())
+ more_to_process = true;
+
+ if (!dispatch_succeeded)
+ controller_->RaiseError();
+ }
+
+ if (!more_to_process)
+ sync_message_event_->Reset();
+
+ // If there are no queued sync messages and the peer has closed, there
+ // there won't be incoming sync messages in the future.
+ reset_sync_watcher = !more_to_process && peer_closed_;
+ }
+
+ if (reset_sync_watcher) {
+ // If a SyncWatch() call (or multiple ones) of this interface endpoint
+ // is on the call stack, resetting the sync watcher will allow it to
+ // exit when the call stack unwinds to that frame.
+ sync_watcher_.reset();
+ }
+ }
+
+ void EnsureSyncWatcherExists() {
+ DCHECK(task_runner_->BelongsToCurrentThread());
+ if (sync_watcher_)
+ return;
+
+ {
+ base::AutoLock locker(controller_->lock_);
+ EnsureSyncMessageEventExists();
+ if (!sync_messages_.empty())
+ SignalSyncMessageEvent();
+ }
+
+ sync_watcher_.reset(new mojo::SyncHandleWatcher(
+ sync_message_event_->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&Endpoint::OnSyncMessageEventHandleReady,
+ base::Unretained(this))));
+ }
+
+ void EnsureSyncMessageEventExists() {
+ controller_->lock_.AssertAcquired();
+ if (!sync_message_event_)
+ sync_message_event_.reset(new MojoEvent);
+ }
+
+ uint32_t GenerateSyncMessageId() {
+ // Overflow is fine.
+ uint32_t id = next_sync_message_id_++;
+ DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
+ return id;
+ }
ChannelAssociatedGroupController* const controller_;
const mojo::InterfaceId id_;
@@ -314,6 +424,11 @@ class ChannelAssociatedGroupController
bool peer_closed_ = false;
mojo::InterfaceEndpointClient* client_ = nullptr;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+ std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_;
+ std::unique_ptr<MojoEvent> sync_message_event_;
+ std::queue<std::pair<uint32_t, std::unique_ptr<mojo::Message>>>
+ sync_messages_;
+ uint32_t next_sync_message_id_ = 0;
DISALLOW_COPY_AND_ASSIGN(Endpoint);
};
@@ -423,17 +538,22 @@ class ChannelAssociatedGroupController
endpoint->task_runner()->PostTask(
FROM_HERE,
base::Bind(&ChannelAssociatedGroupController
- ::NotifyEndpointOfErrorOnEndpointThread, this,
- make_scoped_refptr(endpoint)));
+ ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(),
+ endpoint));
}
}
- void NotifyEndpointOfErrorOnEndpointThread(scoped_refptr<Endpoint> endpoint) {
+ void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
+ Endpoint* endpoint) {
base::AutoLock locker(lock_);
+ auto iter = endpoints_.find(id);
+ if (iter == endpoints_.end() || iter->second.get() != endpoint)
+ return;
if (!endpoint->client())
return;
+
DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
- NotifyEndpointOfError(endpoint.get(), false /* force_async */);
+ NotifyEndpointOfError(endpoint, false /* force_async */);
}
void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
@@ -446,6 +566,7 @@ class ChannelAssociatedGroupController
void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
lock_.AssertAcquired();
endpoint->set_peer_closed();
+ endpoint->SignalSyncMessageEvent();
if (endpoint->closed() && endpoint->peer_closed())
endpoints_.erase(endpoint->id());
}
@@ -489,8 +610,25 @@ class ChannelAssociatedGroupController
// If the client is not yet bound, it must be bound by the time this task
// runs or else it's programmer error.
DCHECK(proxy_task_runner_);
+
std::unique_ptr<mojo::Message> passed_message(new mojo::Message);
message->MoveTo(passed_message.get());
+
+ if (passed_message->has_flag(mojo::Message::kFlagIsSync)) {
+ // Sync messages may need to be handled by the endpoint if it's blocking
+ // on a sync reply. We pass ownership of the message to the endpoint's
+ // sync message queue. If the endpoint was blocking, it will dequeue the
+ // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
+ // call will dequeue the message and dispatch it.
+ uint32_t message_id =
+ endpoint->EnqueueSyncMessage(std::move(passed_message));
+ proxy_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage,
+ this, id, message_id));
+ return true;
+ }
+
proxy_task_runner_->PostTask(
FROM_HERE,
base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread,
@@ -500,7 +638,8 @@ class ChannelAssociatedGroupController
// We do not expect to receive sync responses on the master endpoint thread.
// If it's happening, it's a bug.
- DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
+ DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) ||
+ !message->has_flag(mojo::Message::kFlagIsResponse));
base::AutoUnlock unlocker(lock_);
return client->HandleIncomingMessage(message);
@@ -523,8 +662,7 @@ class ChannelAssociatedGroupController
DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
- // TODO(rockot): Implement sync dispatch. For now, sync messages are
- // unsupported here.
+ // Sync messages should never make their way to this method.
DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
bool result = false;
@@ -537,6 +675,37 @@ class ChannelAssociatedGroupController
RaiseError();
}
+ void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) {
+ DCHECK(proxy_task_runner_->BelongsToCurrentThread());
+
+ base::AutoLock locker(lock_);
+ Endpoint* endpoint = GetEndpointForDispatch(interface_id);
+ if (!endpoint)
+ return;
+
+ DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
+ std::unique_ptr<mojo::Message> message =
+ endpoint->PopSyncMessage(message_id);
+
+ // The message must have already been dequeued by the endpoint waking up
+ // from a sync wait. Nothing to do.
+ if (!message)
+ return;
+
+ mojo::InterfaceEndpointClient* client = endpoint->client();
+ if (!client)
+ return;
+
+ bool result = false;
+ {
+ base::AutoUnlock unlocker(lock_);
+ result = client->HandleIncomingMessage(message.get());
+ }
+
+ if (!result)
+ RaiseError();
+ }
+
Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) {
lock_.AssertAcquired();
bool inserted = false;
« no previous file with comments | « ipc/ipc_channel_mojo_unittest.cc ('k') | ipc/ipc_sync_channel.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698