| Index: ipc/ipc_mojo_bootstrap.cc
 | 
| diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc
 | 
| index 02354e3fb0d425c2b521ce106b4a003de16bdd0d..d92fd3ef10a01fb975ef35e3bc1e72a750987cfb 100644
 | 
| --- a/ipc/ipc_mojo_bootstrap.cc
 | 
| +++ b/ipc/ipc_mojo_bootstrap.cc
 | 
| @@ -8,6 +8,7 @@
 | 
|  
 | 
|  #include <map>
 | 
|  #include <memory>
 | 
| +#include <queue>
 | 
|  #include <utility>
 | 
|  #include <vector>
 | 
|  
 | 
| @@ -18,6 +19,7 @@
 | 
|  #include "base/single_thread_task_runner.h"
 | 
|  #include "base/synchronization/lock.h"
 | 
|  #include "base/threading/thread_task_runner_handle.h"
 | 
| +#include "ipc/mojo_event.h"
 | 
|  #include "mojo/public/cpp/bindings/associated_group.h"
 | 
|  #include "mojo/public/cpp/bindings/associated_group_controller.h"
 | 
|  #include "mojo/public/cpp/bindings/connector.h"
 | 
| @@ -29,6 +31,7 @@
 | 
|  #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
 | 
|  #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
 | 
|  #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
 | 
| +#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
 | 
|  
 | 
|  namespace IPC {
 | 
|  
 | 
| @@ -273,6 +276,31 @@ class ChannelAssociatedGroupController
 | 
|  
 | 
|        task_runner_ = nullptr;
 | 
|        client_ = nullptr;
 | 
| +      sync_watcher_.reset();
 | 
| +    }
 | 
| +
 | 
| +    uint32_t EnqueueSyncMessage(std::unique_ptr<mojo::Message> message) {
 | 
| +      controller_->lock_.AssertAcquired();
 | 
| +      uint32_t id = GenerateSyncMessageId();
 | 
| +      sync_messages_.emplace(id, std::move(message));
 | 
| +      SignalSyncMessageEvent();
 | 
| +      return id;
 | 
| +    }
 | 
| +
 | 
| +    void SignalSyncMessageEvent() {
 | 
| +      controller_->lock_.AssertAcquired();
 | 
| +      EnsureSyncMessageEventExists();
 | 
| +      sync_message_event_->Signal();
 | 
| +    }
 | 
| +
 | 
| +    std::unique_ptr<mojo::Message> PopSyncMessage(uint32_t id) {
 | 
| +      controller_->lock_.AssertAcquired();
 | 
| +      if (sync_messages_.empty() || sync_messages_.front().first != id)
 | 
| +        return nullptr;
 | 
| +      std::unique_ptr<mojo::Message> message =
 | 
| +          std::move(sync_messages_.front().second);
 | 
| +      sync_messages_.pop();
 | 
| +      return message;
 | 
|      }
 | 
|  
 | 
|      // mojo::InterfaceEndpointController:
 | 
| @@ -285,8 +313,8 @@ class ChannelAssociatedGroupController
 | 
|      void AllowWokenUpBySyncWatchOnSameThread() override {
 | 
|        DCHECK(task_runner_->BelongsToCurrentThread());
 | 
|  
 | 
| -      // TODO(rockot): Implement sync waiting.
 | 
| -      NOTREACHED();
 | 
| +      EnsureSyncWatcherExists();
 | 
| +      sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
 | 
|      }
 | 
|  
 | 
|      bool SyncWatch(const bool* should_stop) override {
 | 
| @@ -297,15 +325,97 @@ class ChannelAssociatedGroupController
 | 
|        DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
 | 
|        DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
 | 
|  
 | 
| -      // TODO(rockot): Implement sync waiting.
 | 
| -      NOTREACHED();
 | 
| -      return false;
 | 
| +      EnsureSyncWatcherExists();
 | 
| +      return sync_watcher_->SyncWatch(should_stop);
 | 
|      }
 | 
|  
 | 
|     private:
 | 
|      friend class base::RefCountedThreadSafe<Endpoint>;
 | 
|  
 | 
| -    ~Endpoint() override {}
 | 
| +    ~Endpoint() override {
 | 
| +      controller_->lock_.AssertAcquired();
 | 
| +      DCHECK(!client_);
 | 
| +      DCHECK(closed_);
 | 
| +      DCHECK(peer_closed_);
 | 
| +      DCHECK(!sync_watcher_);
 | 
| +    }
 | 
| +
 | 
| +    void OnSyncMessageEventHandleReady(MojoResult result) {
 | 
| +      DCHECK(task_runner_->BelongsToCurrentThread());
 | 
| +
 | 
| +      scoped_refptr<Endpoint> keepalive(this);
 | 
| +      scoped_refptr<AssociatedGroupController> controller_keepalive(
 | 
| +          controller_);
 | 
| +
 | 
| +      bool reset_sync_watcher = false;
 | 
| +      {
 | 
| +        base::AutoLock locker(controller_->lock_);
 | 
| +        bool more_to_process = false;
 | 
| +        if (!sync_messages_.empty()) {
 | 
| +          std::unique_ptr<mojo::Message> message(
 | 
| +              std::move(sync_messages_.front().second));
 | 
| +          sync_messages_.pop();
 | 
| +
 | 
| +          bool dispatch_succeeded;
 | 
| +          mojo::InterfaceEndpointClient* client = client_;
 | 
| +          {
 | 
| +            base::AutoUnlock unlocker(controller_->lock_);
 | 
| +            dispatch_succeeded = client->HandleIncomingMessage(message.get());
 | 
| +          }
 | 
| +
 | 
| +          if (!sync_messages_.empty())
 | 
| +            more_to_process = true;
 | 
| +
 | 
| +          if (!dispatch_succeeded)
 | 
| +            controller_->RaiseError();
 | 
| +        }
 | 
| +
 | 
| +        if (!more_to_process)
 | 
| +          sync_message_event_->Reset();
 | 
| +
 | 
| +        // If there are no queued sync messages and the peer has closed, there
 | 
| +        // there won't be incoming sync messages in the future.
 | 
| +        reset_sync_watcher = !more_to_process && peer_closed_;
 | 
| +      }
 | 
| +
 | 
| +      if (reset_sync_watcher) {
 | 
| +        // If a SyncWatch() call (or multiple ones) of this interface endpoint
 | 
| +        // is on the call stack, resetting the sync watcher will allow it to
 | 
| +        // exit when the call stack unwinds to that frame.
 | 
| +        sync_watcher_.reset();
 | 
| +      }
 | 
| +    }
 | 
| +
 | 
| +    void EnsureSyncWatcherExists() {
 | 
| +      DCHECK(task_runner_->BelongsToCurrentThread());
 | 
| +      if (sync_watcher_)
 | 
| +        return;
 | 
| +
 | 
| +      {
 | 
| +        base::AutoLock locker(controller_->lock_);
 | 
| +        EnsureSyncMessageEventExists();
 | 
| +        if (!sync_messages_.empty())
 | 
| +          SignalSyncMessageEvent();
 | 
| +      }
 | 
| +
 | 
| +      sync_watcher_.reset(new mojo::SyncHandleWatcher(
 | 
| +          sync_message_event_->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
 | 
| +          base::Bind(&Endpoint::OnSyncMessageEventHandleReady,
 | 
| +                     base::Unretained(this))));
 | 
| +    }
 | 
| +
 | 
| +    void EnsureSyncMessageEventExists() {
 | 
| +      controller_->lock_.AssertAcquired();
 | 
| +      if (!sync_message_event_)
 | 
| +        sync_message_event_.reset(new MojoEvent);
 | 
| +    }
 | 
| +
 | 
| +    uint32_t GenerateSyncMessageId() {
 | 
| +      // Overflow is fine.
 | 
| +      uint32_t id = next_sync_message_id_++;
 | 
| +      DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
 | 
| +      return id;
 | 
| +    }
 | 
|  
 | 
|      ChannelAssociatedGroupController* const controller_;
 | 
|      const mojo::InterfaceId id_;
 | 
| @@ -314,6 +424,11 @@ class ChannelAssociatedGroupController
 | 
|      bool peer_closed_ = false;
 | 
|      mojo::InterfaceEndpointClient* client_ = nullptr;
 | 
|      scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
 | 
| +    std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_;
 | 
| +    std::unique_ptr<MojoEvent> sync_message_event_;
 | 
| +    std::queue<std::pair<uint32_t, std::unique_ptr<mojo::Message>>>
 | 
| +        sync_messages_;
 | 
| +    uint32_t next_sync_message_id_ = 0;
 | 
|  
 | 
|      DISALLOW_COPY_AND_ASSIGN(Endpoint);
 | 
|    };
 | 
| @@ -423,17 +538,22 @@ class ChannelAssociatedGroupController
 | 
|        endpoint->task_runner()->PostTask(
 | 
|            FROM_HERE,
 | 
|            base::Bind(&ChannelAssociatedGroupController
 | 
| -                ::NotifyEndpointOfErrorOnEndpointThread, this,
 | 
| -                make_scoped_refptr(endpoint)));
 | 
| +                ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(),
 | 
| +                endpoint));
 | 
|      }
 | 
|    }
 | 
|  
 | 
| -  void NotifyEndpointOfErrorOnEndpointThread(scoped_refptr<Endpoint> endpoint) {
 | 
| +  void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
 | 
| +                                             Endpoint* endpoint) {
 | 
|      base::AutoLock locker(lock_);
 | 
| +    auto iter = endpoints_.find(id);
 | 
| +    if (iter == endpoints_.end() || iter->second.get() != endpoint)
 | 
| +      return;
 | 
|      if (!endpoint->client())
 | 
|        return;
 | 
| +
 | 
|      DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
 | 
| -    NotifyEndpointOfError(endpoint.get(), false /* force_async */);
 | 
| +    NotifyEndpointOfError(endpoint, false /* force_async */);
 | 
|    }
 | 
|  
 | 
|    void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
 | 
| @@ -446,6 +566,7 @@ class ChannelAssociatedGroupController
 | 
|    void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
 | 
|      lock_.AssertAcquired();
 | 
|      endpoint->set_peer_closed();
 | 
| +    endpoint->SignalSyncMessageEvent();
 | 
|      if (endpoint->closed() && endpoint->peer_closed())
 | 
|        endpoints_.erase(endpoint->id());
 | 
|    }
 | 
| @@ -489,8 +610,25 @@ class ChannelAssociatedGroupController
 | 
|        // If the client is not yet bound, it must be bound by the time this task
 | 
|        // runs or else it's programmer error.
 | 
|        DCHECK(proxy_task_runner_);
 | 
| +
 | 
|        std::unique_ptr<mojo::Message> passed_message(new mojo::Message);
 | 
|        message->MoveTo(passed_message.get());
 | 
| +
 | 
| +      if (passed_message->has_flag(mojo::Message::kFlagIsSync)) {
 | 
| +        // Sync messages may need to be handled by the endpoint if it's blocking
 | 
| +        // on a sync reply. We pass ownership of the message to the endpoint's
 | 
| +        // sync message queue. If the endpoint was blocking, it will dequeue the
 | 
| +        // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
 | 
| +        // call will dequeue the message and dispatch it.
 | 
| +        uint32_t message_id =
 | 
| +            endpoint->EnqueueSyncMessage(std::move(passed_message));
 | 
| +        proxy_task_runner_->PostTask(
 | 
| +            FROM_HERE,
 | 
| +            base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage,
 | 
| +                       this, id, message_id));
 | 
| +        return true;
 | 
| +      }
 | 
| +
 | 
|        proxy_task_runner_->PostTask(
 | 
|            FROM_HERE,
 | 
|            base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread,
 | 
| @@ -500,7 +638,8 @@ class ChannelAssociatedGroupController
 | 
|  
 | 
|      // We do not expect to receive sync responses on the master endpoint thread.
 | 
|      // If it's happening, it's a bug.
 | 
| -    DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
 | 
| +    DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) ||
 | 
| +           !message->has_flag(mojo::Message::kFlagIsResponse));
 | 
|  
 | 
|      base::AutoUnlock unlocker(lock_);
 | 
|      return client->HandleIncomingMessage(message);
 | 
| @@ -523,8 +662,7 @@ class ChannelAssociatedGroupController
 | 
|  
 | 
|      DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
 | 
|  
 | 
| -    // TODO(rockot): Implement sync dispatch. For now, sync messages are
 | 
| -    // unsupported here.
 | 
| +    // Sync messages should never make their way to this method.
 | 
|      DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
 | 
|  
 | 
|      bool result = false;
 | 
| @@ -537,6 +675,37 @@ class ChannelAssociatedGroupController
 | 
|        RaiseError();
 | 
|    }
 | 
|  
 | 
| +  void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) {
 | 
| +    DCHECK(proxy_task_runner_->BelongsToCurrentThread());
 | 
| +
 | 
| +    base::AutoLock locker(lock_);
 | 
| +    Endpoint* endpoint = GetEndpointForDispatch(interface_id);
 | 
| +    if (!endpoint)
 | 
| +      return;
 | 
| +
 | 
| +    DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
 | 
| +    std::unique_ptr<mojo::Message> message =
 | 
| +        endpoint->PopSyncMessage(message_id);
 | 
| +
 | 
| +    // The message must have already been dequeued by the endpoint waking up
 | 
| +    // from a sync wait. Nothing to do.
 | 
| +    if (!message)
 | 
| +      return;
 | 
| +
 | 
| +    mojo::InterfaceEndpointClient* client = endpoint->client();
 | 
| +    if (!client)
 | 
| +      return;
 | 
| +
 | 
| +    bool result = false;
 | 
| +    {
 | 
| +      base::AutoUnlock unlocker(lock_);
 | 
| +      result = client->HandleIncomingMessage(message.get());
 | 
| +    }
 | 
| +
 | 
| +    if (!result)
 | 
| +      RaiseError();
 | 
| +  }
 | 
| +
 | 
|    Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) {
 | 
|      lock_.AssertAcquired();
 | 
|      bool inserted = false;
 | 
| 
 |