Chromium Code Reviews| Index: mojo/public/cpp/bindings/lib/multiplex_router.cc |
| diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
| index b9e086c53e11b4371a2858ba0a808a2613cb69ad..523029b096d30f26d4aea5033a8a710653a9e56a 100644 |
| --- a/mojo/public/cpp/bindings/lib/multiplex_router.cc |
| +++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
| @@ -15,107 +15,268 @@ |
| #include "base/stl_util.h" |
| #include "mojo/public/cpp/bindings/associated_group.h" |
| #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
| +#include "mojo/public/cpp/bindings/lib/interface_endpoint_controller.h" |
| +#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" |
| namespace mojo { |
| namespace internal { |
| // InterfaceEndpoint stores the information of an interface endpoint registered |
| -// with the router. Always accessed under the router's lock. |
| +// with the router. |
| // No one other than the router's |endpoints_| and |tasks_| should hold refs to |
| // this object. |
| class MultiplexRouter::InterfaceEndpoint |
| - : public base::RefCounted<InterfaceEndpoint> { |
| + : public base::RefCounted<InterfaceEndpoint>, |
| + public InterfaceEndpointController { |
| public: |
| InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) |
| - : router_lock_(&router->lock_), |
| + : router_(router), |
| id_(id), |
| closed_(false), |
| peer_closed_(false), |
| - client_(nullptr) { |
| - router_lock_->AssertAcquired(); |
| - } |
| + client_(nullptr), |
| + event_signalled_(false) {} |
| + |
| + // --------------------------------------------------------------------------- |
| + // The following public methods are safe to call from any threads without |
| + // locking. |
| InterfaceId id() const { return id_; } |
| + // --------------------------------------------------------------------------- |
| + // The following public methods are called under the router's lock. |
| + |
| bool closed() const { return closed_; } |
| void set_closed() { |
| - router_lock_->AssertAcquired(); |
| + router_->lock_.AssertAcquired(); |
| closed_ = true; |
| } |
| bool peer_closed() const { return peer_closed_; } |
| void set_peer_closed() { |
| - router_lock_->AssertAcquired(); |
| + router_->lock_.AssertAcquired(); |
| peer_closed_ = true; |
| } |
| base::SingleThreadTaskRunner* task_runner() const { |
| return task_runner_.get(); |
| } |
| - void set_task_runner( |
| - scoped_refptr<base::SingleThreadTaskRunner> task_runner) { |
| - router_lock_->AssertAcquired(); |
| - task_runner_ = std::move(task_runner); |
| - } |
| InterfaceEndpointClient* client() const { return client_; } |
| - void set_client(InterfaceEndpointClient* client) { |
| - router_lock_->AssertAcquired(); |
| + |
| + void AttachClient(InterfaceEndpointClient* client) { |
| + router_->lock_.AssertAcquired(); |
| + DCHECK(!client_); |
| + DCHECK(!closed_); |
| + |
| + task_runner_ = base::MessageLoop::current()->task_runner(); |
| client_ = client; |
| } |
| + // It should be called on the same thread as the corresponding AttachClient() |
|
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: should = must? Also perhaps "This" or "This m
yzshen1
2016/03/29 16:19:01
Done. English is hard. :)
|
| + // call. |
| + void DetachClient() { |
| + router_->lock_.AssertAcquired(); |
| + DCHECK(client_); |
| + DCHECK(task_runner_->BelongsToCurrentThread()); |
| + DCHECK(!closed_); |
| + |
| + task_runner_ = nullptr; |
| + client_ = nullptr; |
| + sync_watcher_.reset(); |
| + } |
| + |
| + void SignalSyncMessageEvent() { |
| + router_->lock_.AssertAcquired(); |
| + if (event_signalled_) |
| + return; |
| + |
| + EnsureEventMessagePipeExists(); |
| + event_signalled_ = true; |
| + char dummy_message = '\0'; |
| + MojoResult result = |
| + WriteMessageRaw(sync_message_event_sender_.get(), &dummy_message, 1, |
|
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: It is perfectly legal to write a zero-length
yzshen1
2016/03/29 16:19:01
Done.
|
| + nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE); |
| + DCHECK_EQ(MOJO_RESULT_OK, result); |
| + } |
| + |
| + // --------------------------------------------------------------------------- |
| + // The following public methods (i.e., InterfaceEndpointController |
| + // implementation) are called by the client on the same thread as the |
| + // AttachClient() call. They are called outside of the router's lock. |
| + |
| + bool SendMessage(Message* message) override { |
| + DCHECK(task_runner_->BelongsToCurrentThread()); |
| + message->set_interface_id(id_); |
| + return router_->connector_.Accept(message); |
| + } |
| + |
| + void AllowWokenUpBySyncWatchOnSameThread() override { |
| + DCHECK(task_runner_->BelongsToCurrentThread()); |
| + |
| + EnsureSyncWatcherExists(); |
| + sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| + } |
| + |
| + bool SyncWatch(const bool* should_stop) override { |
| + DCHECK(task_runner_->BelongsToCurrentThread()); |
| + |
| + EnsureSyncWatcherExists(); |
| + return sync_watcher_->SyncWatch(should_stop); |
| + } |
| + |
| private: |
| friend class base::RefCounted<InterfaceEndpoint>; |
| - ~InterfaceEndpoint() { |
| - router_lock_->AssertAcquired(); |
| + ~InterfaceEndpoint() override { |
| + router_->lock_.AssertAcquired(); |
| DCHECK(!client_); |
| DCHECK(closed_); |
| DCHECK(peer_closed_); |
| + DCHECK(!sync_watcher_); |
| + } |
| + |
| + void OnHandleReady(MojoResult result) { |
| + DCHECK(task_runner_->BelongsToCurrentThread()); |
| + scoped_refptr<InterfaceEndpoint> self_protector(this); |
| + scoped_refptr<MultiplexRouter> router_protector(router_); |
| + |
| + // Because we never close |sync_message_event_{sender,receiver}_| before |
| + // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. |
| + DCHECK_EQ(MOJO_RESULT_OK, result); |
| + bool reset_sync_watcher = false; |
| + { |
| + base::AutoLock locker(router_->lock_); |
| + |
| + bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); |
| + |
| + if (!more_to_process) |
| + ResetSyncMessageSignal(); |
| + |
| + // Currently there are no queued sync messages and the peer has closed so |
| + // there won't be incoming sync messages in the future. |
| + reset_sync_watcher = !more_to_process && peer_closed_; |
| + } |
| + if (reset_sync_watcher) { |
| + // If a SyncWatch() call (or multiple ones) of this interface endpoint is |
| + // on the call stack, resetting the sync watcher will allow it to exit |
| + // when the call stack unwinds to that frame. |
| + sync_watcher_.reset(); |
| + } |
| + } |
| + |
| + void EnsureSyncWatcherExists() { |
| + DCHECK(task_runner_->BelongsToCurrentThread()); |
| + if (sync_watcher_) |
| + return; |
| + |
| + { |
| + base::AutoLock locker(router_->lock_); |
| + EnsureEventMessagePipeExists(); |
| + |
| + auto iter = router_->sync_message_tasks_.find(id_); |
| + if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) |
| + SignalSyncMessageEvent(); |
| + } |
| + |
| + sync_watcher_.reset(new SyncHandleWatcher( |
| + sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| + base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); |
| + } |
| + |
| + void EnsureEventMessagePipeExists() { |
| + router_->lock_.AssertAcquired(); |
| + |
| + if (sync_message_event_receiver_.is_valid()) |
| + return; |
| + |
| + MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, |
| + &sync_message_event_receiver_); |
| + DCHECK_EQ(MOJO_RESULT_OK, result); |
| + } |
| + |
| + void ResetSyncMessageSignal() { |
| + router_->lock_.AssertAcquired(); |
| + |
| + if (!event_signalled_) |
| + return; |
| + |
| + DCHECK(sync_message_event_receiver_.is_valid()); |
| + char dummy_message = 0; |
| + uint32_t size = 1; |
| + MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(), |
|
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: It's fine to pass nullptr for the read buffer
yzshen1
2016/03/29 16:19:01
Done.
|
| + &dummy_message, &size, nullptr, nullptr, |
| + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
| + DCHECK_EQ(MOJO_RESULT_OK, result); |
| + event_signalled_ = false; |
| } |
| - base::Lock* const router_lock_; |
| + // --------------------------------------------------------------------------- |
| + // The following members are safe to access from any threads. |
| + |
| + MultiplexRouter* const router_; |
| const InterfaceId id_; |
| + // --------------------------------------------------------------------------- |
| + // The following members are accessed under the router's lock. |
| + |
| // Whether the endpoint has been closed. |
| bool closed_; |
| // Whether the peer endpoint has been closed. |
| bool peer_closed_; |
| - // The task runner on which |client_| can be accessed. |
| + // The task runner on which |client_|'s methods can be called. |
| scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| // Not owned. It is null if no client is attached to this endpoint. |
| InterfaceEndpointClient* client_; |
| + // A message pipe used as an event to signal that sync messages are available. |
| + // The message pipe handles are initialized under the router's lock and remain |
| + // unchanged afterwards. They may be accessed outside of the router's lock |
| + // later. |
| + ScopedMessagePipeHandle sync_message_event_sender_; |
| + ScopedMessagePipeHandle sync_message_event_receiver_; |
| + bool event_signalled_; |
| + |
| + // --------------------------------------------------------------------------- |
| + // The following members are only valid while a client is attached. They are |
| + // used exclusively on the client's thread. They may be accessed outside of |
| + // the router's lock. |
| + |
| + scoped_ptr<SyncHandleWatcher> sync_watcher_; |
| + |
| DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); |
| }; |
| struct MultiplexRouter::Task { |
| public: |
| // Doesn't take ownership of |message| but takes its contents. |
| - static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) { |
| - Task* task = new Task(); |
| + static scoped_ptr<Task> CreateMessageTask(Message* message) { |
| + Task* task = new Task(MESSAGE); |
| task->message.reset(new Message); |
| message->MoveTo(task->message.get()); |
| return make_scoped_ptr(task); |
| } |
| static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { |
| - Task* task = new Task(); |
| + Task* task = new Task(NOTIFY_ERROR); |
| task->endpoint_to_notify = endpoint; |
| return make_scoped_ptr(task); |
| } |
| ~Task() {} |
| - bool IsIncomingMessageTask() const { return !!message; } |
| - bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } |
| + bool IsMessageTask() const { return type == MESSAGE; } |
| + bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } |
| scoped_ptr<Message> message; |
| scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
| + enum Type { MESSAGE, NOTIFY_ERROR }; |
| + Type type; |
| + |
| private: |
| - Task() {} |
| + explicit Task(Type in_type) : type(in_type) {} |
| }; |
| MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
| @@ -125,12 +286,16 @@ MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
| set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
| header_validator_(this), |
| connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), |
| - encountered_error_(false), |
| control_message_handler_(this), |
| control_message_proxy_(&connector_), |
| next_interface_id_value_(1), |
| posted_to_process_tasks_(false), |
| + encountered_error_(false), |
| testing_mode_(false) { |
| + // Always participate in sync handle watching, because even if it doesn't |
| + // expect sync requests during sync handle watching, it may still need to |
| + // dispatch messages to associated endpoints on a different thread. |
| + connector_.AllowWokenUpBySyncWatchOnSameThread(); |
| connector_.set_incoming_receiver(&header_validator_); |
| connector_.set_connection_error_handler( |
| [this]() { OnPipeConnectionError(); }); |
| @@ -139,6 +304,7 @@ MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
| MultiplexRouter::~MultiplexRouter() { |
| base::AutoLock locker(lock_); |
| + sync_message_tasks_.clear(); |
| tasks_.clear(); |
| for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
| @@ -221,10 +387,10 @@ void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { |
| if (!IsMasterInterfaceId(id)) |
| control_message_proxy_.NotifyPeerEndpointClosed(id); |
| - ProcessTasks(true); |
| + ProcessTasks(NO_DIRECT_CLIENT_CALLS); |
| } |
| -void MultiplexRouter::AttachEndpointClient( |
| +InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( |
| const ScopedInterfaceEndpointHandle& handle, |
| InterfaceEndpointClient* client) { |
| const InterfaceId id = handle.id(); |
| @@ -236,15 +402,13 @@ void MultiplexRouter::AttachEndpointClient( |
| DCHECK(ContainsKey(endpoints_, id)); |
| InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| - DCHECK(!endpoint->client()); |
| - DCHECK(!endpoint->closed()); |
| - |
| - endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); |
| - endpoint->set_client(client); |
| + endpoint->AttachClient(client); |
| if (endpoint->peer_closed()) |
| tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| - ProcessTasks(true); |
| + ProcessTasks(NO_DIRECT_CLIENT_CALLS); |
| + |
| + return endpoint; |
| } |
| void MultiplexRouter::DetachEndpointClient( |
| @@ -257,18 +421,7 @@ void MultiplexRouter::DetachEndpointClient( |
| DCHECK(ContainsKey(endpoints_, id)); |
| InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| - DCHECK(endpoint->client()); |
| - DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| - DCHECK(!endpoint->closed()); |
| - |
| - endpoint->set_task_runner(nullptr); |
| - endpoint->set_client(nullptr); |
| -} |
| - |
| -bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, |
| - Message* message) { |
| - message->set_interface_id(handle.id()); |
| - return connector_.Accept(message); |
| + endpoint->DetachClient(); |
| } |
| void MultiplexRouter::RaiseError() { |
| @@ -291,6 +444,15 @@ MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { |
| return associated_group->router_.get(); |
| } |
| +void MultiplexRouter::CloseMessagePipe() { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + connector_.CloseMessagePipe(); |
| + // CloseMessagePipe() above won't trigger connection error handler. |
| + // Explicitly call OnPipeConnectionError() so that associated endpoints will |
| + // get notified. |
| + OnPipeConnectionError(); |
| +} |
| + |
| bool MultiplexRouter::HasAssociatedEndpoints() const { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| base::AutoLock locker(lock_); |
| @@ -317,17 +479,32 @@ bool MultiplexRouter::Accept(Message* message) { |
| scoped_refptr<MultiplexRouter> protector(this); |
| base::AutoLock locker(lock_); |
| - bool processed = tasks_.empty() && ProcessIncomingMessage(message, false); |
| + ClientCallBehavior client_call_behavior = |
| + connector_.during_sync_handle_watcher_callback() |
| + ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| + : ALLOW_DIRECT_CLIENT_CALLS; |
| + |
| + bool processed = |
| + tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior); |
| if (!processed) { |
| // Either the task queue is not empty or we cannot process the message |
| // directly. In both cases, there is no need to call ProcessTasks(). |
| - tasks_.push_back(Task::CreateIncomingMessageTask(message)); |
| + tasks_.push_back(Task::CreateMessageTask(message)); |
| + Task* task = tasks_.back().get(); |
| + |
| + if (task->message->has_flag(kMessageIsSync)) { |
| + InterfaceId id = task->message->interface_id(); |
| + sync_message_tasks_[id].push_back(task); |
| + auto iter = endpoints_.find(id); |
| + if (iter != endpoints_.end()) |
| + iter->second->SignalSyncMessageEvent(); |
| + } |
| } else if (!tasks_.empty()) { |
| // Processing the message may result in new tasks (for error notification) |
| // being added to the queue. In this case, we have to attempt to process the |
| // tasks. |
| - ProcessTasks(false); |
| + ProcessTasks(client_call_behavior); |
| } |
| // Always return true. If we see errors during message processing, we will |
| @@ -388,10 +565,12 @@ void MultiplexRouter::OnPipeConnectionError() { |
| UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| } |
| - ProcessTasks(false); |
| + ProcessTasks(connector_.during_sync_handle_watcher_callback() |
| + ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| + : ALLOW_DIRECT_CLIENT_CALLS); |
| } |
| -void MultiplexRouter::ProcessTasks(bool force_async) { |
| +void MultiplexRouter::ProcessTasks(ClientCallBehavior client_call_behavior) { |
| lock_.AssertAcquired(); |
| if (posted_to_process_tasks_) |
| @@ -401,25 +580,70 @@ void MultiplexRouter::ProcessTasks(bool force_async) { |
| scoped_ptr<Task> task(std::move(tasks_.front())); |
| tasks_.pop_front(); |
| + InterfaceId id = kInvalidInterfaceId; |
| + bool sync_message = task->IsMessageTask() && task->message && |
| + task->message->has_flag(kMessageIsSync); |
| + if (sync_message) { |
| + InterfaceId id = task->message->interface_id(); |
| + auto& sync_message_queue = sync_message_tasks_[id]; |
| + DCHECK_EQ(task.get(), sync_message_queue.front()); |
| + sync_message_queue.pop_front(); |
| + } |
| + |
| bool processed = |
| task->IsNotifyErrorTask() |
| - ? ProcessNotifyErrorTask(task.get(), force_async) |
| - : ProcessIncomingMessage(task->message.get(), force_async); |
| + ? ProcessNotifyErrorTask(task.get(), client_call_behavior) |
| + : ProcessIncomingMessage(task->message.get(), client_call_behavior); |
| if (!processed) { |
| tasks_.push_front(std::move(task)); |
| + if (sync_message) { |
| + auto& sync_message_queue = sync_message_tasks_[id]; |
| + sync_message_queue.push_front(task.get()); |
| + } |
| break; |
| + } else { |
| + if (sync_message) { |
| + auto iter = sync_message_tasks_.find(id); |
| + if (iter != sync_message_tasks_.end() && iter->second.empty()) |
| + sync_message_tasks_.erase(iter); |
| + } |
| } |
| } |
| } |
| -bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { |
| +bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { |
| + lock_.AssertAcquired(); |
| + |
| + auto iter = sync_message_tasks_.find(id); |
| + if (iter == sync_message_tasks_.end()) |
| + return false; |
| + |
| + MultiplexRouter::Task* task = iter->second.front(); |
| + iter->second.pop_front(); |
| + |
| + DCHECK(task->IsMessageTask()); |
| + scoped_ptr<Message> message(std::move(task->message)); |
| + |
| + // Note: after this call, |task| and |iter| may be invalidated. |
| + bool processed = ProcessIncomingMessage( |
| + message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES); |
| + DCHECK(processed); |
| + |
| + iter = sync_message_tasks_.find(id); |
| + return iter != sync_message_tasks_.end() && !iter->second.empty(); |
| +} |
| + |
| +bool MultiplexRouter::ProcessNotifyErrorTask( |
| + Task* task, |
| + ClientCallBehavior client_call_behavior) { |
| lock_.AssertAcquired(); |
| InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
| if (!endpoint->client()) |
| return true; |
| - if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { |
| + if (!endpoint->task_runner()->BelongsToCurrentThread() || |
| + client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS) { |
| MaybePostToProcessTasks(endpoint->task_runner()); |
| return false; |
| } |
| @@ -437,9 +661,17 @@ bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { |
| return true; |
| } |
| -bool MultiplexRouter::ProcessIncomingMessage(Message* message, |
| - bool force_async) { |
| +bool MultiplexRouter::ProcessIncomingMessage( |
| + Message* message, |
| + ClientCallBehavior client_call_behavior) { |
| lock_.AssertAcquired(); |
| + |
| + if (!message) { |
| + // This is a sync message and has been processed during sync handle |
| + // watching. |
| + return true; |
| + } |
| + |
| if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| if (!control_message_handler_.Accept(message)) |
| RaiseErrorInNonTestingMode(); |
| @@ -474,7 +706,12 @@ bool MultiplexRouter::ProcessIncomingMessage(Message* message, |
| return false; |
| } |
| - if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { |
| + bool can_direct_call = |
| + (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS) || |
| + (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES && |
| + message->has_flag(kMessageIsSync)); |
| + |
| + if (!endpoint->task_runner()->BelongsToCurrentThread() || !can_direct_call) { |
| MaybePostToProcessTasks(endpoint->task_runner()); |
| return false; |
| } |
| @@ -513,7 +750,7 @@ void MultiplexRouter::LockAndCallProcessTasks() { |
| // always called using base::Bind(), which holds a ref. |
| base::AutoLock locker(lock_); |
| posted_to_process_tasks_ = false; |
| - ProcessTasks(false); |
| + ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS); |
| } |
| void MultiplexRouter::UpdateEndpointStateMayRemove( |
| @@ -525,6 +762,9 @@ void MultiplexRouter::UpdateEndpointStateMayRemove( |
| break; |
| case PEER_ENDPOINT_CLOSED: |
| endpoint->set_peer_closed(); |
| + // If the interface endpoint is performing a sync watch, this makes sure |
| + // it is notified and eventually exits the sync watch. |
| + endpoint->SignalSyncMessageEvent(); |
| break; |
| } |
| if (endpoint->closed() && endpoint->peer_closed()) |