| Index: mojo/edk/system/message_pipe_dispatcher.cc
|
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
|
| index c630b8b928176d6861289479cdbc032f8181e43d..29a6c69575e9ad911398b0c579f98833950db08c 100644
|
| --- a/mojo/edk/system/message_pipe_dispatcher.cc
|
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc
|
| @@ -99,6 +99,47 @@ MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
|
| make_scoped_refptr(new PortObserverThunk(this)));
|
| }
|
|
|
| +bool MessagePipeDispatcher::BeginFuse() {
|
| + base::AutoLock lock(signal_lock_);
|
| + if (port_closed_ || in_transit_ || is_fusing_)
|
| + return false;
|
| + is_fusing_ = true;
|
| + return true;
|
| +}
|
| +
|
| +void MessagePipeDispatcher::CancelFuse() {
|
| + base::AutoLock lock(signal_lock_);
|
| + CHECK(is_fusing_);
|
| + is_fusing_ = false;
|
| +}
|
| +
|
| +bool MessagePipeDispatcher::CompleteFuse(MessagePipeDispatcher* other) {
|
| + node_controller_->SetPortObserver(port_, nullptr);
|
| + node_controller_->SetPortObserver(other->port_, nullptr);
|
| +
|
| + ports::PortRef port0;
|
| + {
|
| + base::AutoLock lock(signal_lock_);
|
| + CHECK(is_fusing_);
|
| + port0 = port_;
|
| + port_closed_ = true;
|
| + awakables_.CancelAll();
|
| + }
|
| +
|
| + ports::PortRef port1;
|
| + {
|
| + base::AutoLock lock(other->signal_lock_);
|
| + CHECK(other->is_fusing_);
|
| + port1 = other->port_;
|
| + other->port_closed_ = true;
|
| + other->awakables_.CancelAll();
|
| + }
|
| +
|
| + // Both ports are always closed by this call.
|
| + int rv = node_controller_->MergeLocalPorts(port0, port1);
|
| + return rv == ports::OK;
|
| +}
|
| +
|
| Dispatcher::Type MessagePipeDispatcher::GetType() const {
|
| return Type::MESSAGE_PIPE;
|
| }
|
| @@ -115,7 +156,7 @@ MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
|
| uintptr_t context) {
|
| base::AutoLock lock(signal_lock_);
|
|
|
| - if (port_closed_ || in_transit_)
|
| + if (port_closed_ || in_transit_ || is_fusing_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| return awakables_.AddWatcher(
|
| @@ -125,7 +166,7 @@ MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
|
| MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
|
| base::AutoLock lock(signal_lock_);
|
|
|
| - if (port_closed_ || in_transit_)
|
| + if (port_closed_ || in_transit_ || is_fusing_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| return awakables_.RemoveWatcher(context);
|
| @@ -140,7 +181,7 @@ MojoResult MessagePipeDispatcher::WriteMessage(
|
|
|
| {
|
| base::AutoLock lock(signal_lock_);
|
| - if (port_closed_ || in_transit_)
|
| + if (port_closed_ || in_transit_ || is_fusing_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| }
|
|
|
| @@ -294,7 +335,7 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
|
| {
|
| base::AutoLock lock(signal_lock_);
|
| // We can't read from a port that's closed or in transit!
|
| - if (port_closed_ || in_transit_)
|
| + if (port_closed_ || in_transit_ || is_fusing_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| }
|
|
|
| @@ -449,7 +490,7 @@ MojoResult MessagePipeDispatcher::AddAwakable(
|
| HandleSignalsState* signals_state) {
|
| base::AutoLock lock(signal_lock_);
|
|
|
| - if (port_closed_ || in_transit_) {
|
| + if (port_closed_ || in_transit_ || is_fusing_) {
|
| if (signals_state)
|
| *signals_state = HandleSignalsState();
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| @@ -487,7 +528,7 @@ MojoResult MessagePipeDispatcher::AddAwakable(
|
| void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable,
|
| HandleSignalsState* signals_state) {
|
| base::AutoLock lock(signal_lock_);
|
| - if (port_closed_ || in_transit_) {
|
| + if (port_closed_ || in_transit_ || is_fusing_) {
|
| if (signals_state)
|
| *signals_state = HandleSignalsState();
|
| } else if (signals_state) {
|
| @@ -522,7 +563,7 @@ bool MessagePipeDispatcher::EndSerialize(void* destination,
|
|
|
| bool MessagePipeDispatcher::BeginTransit() {
|
| base::AutoLock lock(signal_lock_);
|
| - if (in_transit_ || port_closed_)
|
| + if (in_transit_ || port_closed_ || is_fusing_)
|
| return false;
|
| in_transit_ = true;
|
| return in_transit_;
|
| @@ -573,7 +614,7 @@ MessagePipeDispatcher::~MessagePipeDispatcher() {
|
|
|
| MojoResult MessagePipeDispatcher::CloseNoLock() {
|
| signal_lock_.AssertAcquired();
|
| - if (port_closed_ || in_transit_)
|
| + if (port_closed_ || in_transit_ || is_fusing_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| port_closed_ = true;
|
|
|