| Index: mojo/edk/system/message_pipe_dispatcher.cc
|
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
|
| index f27336b56f4d074a0e5e32ec2a592fcccff363a3..76f412306d042ce7d072b4bcf8c2eb7e4cb43de7 100644
|
| --- a/mojo/edk/system/message_pipe_dispatcher.cc
|
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc
|
| @@ -164,7 +164,8 @@ MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
|
| : node_controller_(node_controller),
|
| port_(port),
|
| pipe_id_(pipe_id),
|
| - endpoint_(endpoint) {
|
| + endpoint_(endpoint),
|
| + watchers_(this) {
|
| DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
|
| << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
|
|
|
| @@ -183,6 +184,7 @@ bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
|
| port0 = port_;
|
| port_closed_.Set(true);
|
| awakables_.CancelAll();
|
| + watchers_.NotifyClosed();
|
| }
|
|
|
| ports::PortRef port1;
|
| @@ -191,6 +193,7 @@ bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
|
| port1 = other->port_;
|
| other->port_closed_.Set(true);
|
| other->awakables_.CancelAll();
|
| + other->watchers_.NotifyClosed();
|
| }
|
|
|
| // Both ports are always closed by this call.
|
| @@ -209,27 +212,6 @@ MojoResult MessagePipeDispatcher::Close() {
|
| return CloseNoLock();
|
| }
|
|
|
| -MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
|
| - const Watcher::WatchCallback& callback,
|
| - uintptr_t context) {
|
| - base::AutoLock lock(signal_lock_);
|
| -
|
| - if (port_closed_ || in_transit_)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| -
|
| - return awakables_.AddWatcher(
|
| - signals, callback, context, GetHandleSignalsStateNoLock());
|
| -}
|
| -
|
| -MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
|
| - base::AutoLock lock(signal_lock_);
|
| -
|
| - if (port_closed_ || in_transit_)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| -
|
| - return awakables_.RemoveWatcher(context);
|
| -}
|
| -
|
| MojoResult MessagePipeDispatcher::WriteMessage(
|
| std::unique_ptr<MessageForTransit> message,
|
| MojoWriteMessageFlags flags) {
|
| @@ -299,6 +281,12 @@ MojoResult MessagePipeDispatcher::ReadMessage(
|
| }
|
|
|
| if (no_space) {
|
| + if (may_discard) {
|
| + // May have been the last message on the pipe. Need to update signals just
|
| + // in case.
|
| + base::AutoLock lock(signal_lock_);
|
| + watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| + }
|
| // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't
|
| // sufficient to hold this message's data. The message will still be in
|
| // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
|
| @@ -319,6 +307,13 @@ MojoResult MessagePipeDispatcher::ReadMessage(
|
| // Alright! We have a message and the caller has provided sufficient storage
|
| // in which to receive it.
|
|
|
| + {
|
| + // We need to update anyone watching our signals in case that was the last
|
| + // available message.
|
| + base::AutoLock lock(signal_lock_);
|
| + watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| + }
|
| +
|
| std::unique_ptr<PortsMessage> msg(
|
| static_cast<PortsMessage*>(ports_message.release()));
|
|
|
| @@ -396,6 +391,23 @@ MessagePipeDispatcher::GetHandleSignalsState() const {
|
| return GetHandleSignalsStateNoLock();
|
| }
|
|
|
| +MojoResult MessagePipeDispatcher::AddWatcherRef(
|
| + const scoped_refptr<WatcherDispatcher>& watcher,
|
| + uintptr_t context) {
|
| + base::AutoLock lock(signal_lock_);
|
| + if (port_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
|
| +}
|
| +
|
| +MojoResult MessagePipeDispatcher::RemoveWatcherRef(WatcherDispatcher* watcher,
|
| + uintptr_t context) {
|
| + base::AutoLock lock(signal_lock_);
|
| + if (port_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + return watchers_.Remove(watcher, context);
|
| +}
|
| +
|
| MojoResult MessagePipeDispatcher::AddAwakable(
|
| Awakable* awakable,
|
| MojoHandleSignals signals,
|
| @@ -496,7 +508,9 @@ void MessagePipeDispatcher::CancelTransit() {
|
| in_transit_.Set(false);
|
|
|
| // Something may have happened while we were waiting for potential transit.
|
| - awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| + awakables_.AwakeForStateChange(state);
|
| + watchers_.NotifyState(state);
|
| }
|
|
|
| // static
|
| @@ -532,6 +546,7 @@ MojoResult MessagePipeDispatcher::CloseNoLock() {
|
|
|
| port_closed_.Set(true);
|
| awakables_.CancelAll();
|
| + watchers_.NotifyClosed();
|
|
|
| if (!port_transferred_) {
|
| base::AutoUnlock unlock(signal_lock_);
|
| @@ -596,7 +611,9 @@ void MessagePipeDispatcher::OnPortStatusChanged() {
|
| }
|
| #endif
|
|
|
| - awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| + awakables_.AwakeForStateChange(state);
|
| + watchers_.NotifyState(state);
|
| }
|
|
|
| } // namespace edk
|
|
|