Chromium Code Reviews| Index: mojo/system/message_pipe.cc |
| diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc |
| index 8cd94cbf632f153654dcd5c59061f6ba173396f0..974effef5d9e535ed6c12f479aa92083ba04d0e5 100644 |
| --- a/mojo/system/message_pipe.cc |
| +++ b/mojo/system/message_pipe.cc |
| @@ -6,7 +6,9 @@ |
| #include "base/logging.h" |
| #include "base/stl_util.h" |
| +#include "mojo/system/local_message_pipe_endpoint.h" |
| #include "mojo/system/message_in_transit.h" |
| +#include "mojo/system/message_pipe_endpoint.h" |
| namespace mojo { |
| namespace system { |
| @@ -21,16 +23,16 @@ unsigned DestinationPortFromSourcePort(unsigned port) { |
| } // namespace |
| MessagePipe::MessagePipe() { |
| - is_open_[0] = is_open_[1] = true; |
| + endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
|
darin (slow to review)
2013/10/15 06:50:36
I see. This is the connected pair case.
|
| + endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
| } |
| void MessagePipe::CancelAllWaiters(unsigned port) { |
| DCHECK(port == 0 || port == 1); |
| base::AutoLock locker(lock_); |
| - DCHECK(is_open_[port]); |
| - |
| - waiter_lists_[port].CancelAllWaiters(); |
| + DCHECK(endpoints_[port].get()); |
| + endpoints_[port]->CancelAllWaiters(); |
| } |
| void MessagePipe::Close(unsigned port) { |
| @@ -39,35 +41,13 @@ void MessagePipe::Close(unsigned port) { |
| unsigned destination_port = DestinationPortFromSourcePort(port); |
| base::AutoLock locker(lock_); |
| - DCHECK(is_open_[port]); |
| - |
| - // Record the old state of the other (destination) port, so we can tell if it |
| - // changes. |
| - // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we |
| - // don't have to do this. |
| - MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE; |
| - MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE; |
| - bool dest_is_open = is_open_[destination_port]; |
| - if (dest_is_open) { |
| - old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port); |
| - old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port); |
| - } |
| - |
| - is_open_[port] = false; |
| - STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port. |
| - |
| - // Notify the other (destination) port if its state has changed. |
| - if (dest_is_open) { |
| - MojoWaitFlags new_dest_satisfied_flags = |
| - SatisfiedFlagsNoLock(destination_port); |
| - MojoWaitFlags new_dest_satisfiable_flags = |
| - SatisfiableFlagsNoLock(destination_port); |
| - if (new_dest_satisfied_flags != old_dest_satisfied_flags || |
| - new_dest_satisfiable_flags != old_dest_satisfiable_flags) { |
| - waiter_lists_[destination_port].AwakeWaitersForStateChange( |
| - new_dest_satisfied_flags, new_dest_satisfiable_flags); |
| - } |
| - } |
| + DCHECK(endpoints_[port].get()); |
| + |
| + endpoints_[port]->Close(); |
| + if (endpoints_[destination_port].get()) |
| + endpoints_[destination_port]->OnPeerClose(); |
| + |
| + endpoints_[port].reset(); |
| } |
| // TODO(vtl): Handle flags. |
| @@ -75,34 +55,21 @@ MojoResult MessagePipe::WriteMessage( |
| unsigned port, |
| const void* bytes, uint32_t num_bytes, |
| const MojoHandle* handles, uint32_t num_handles, |
| - MojoWriteMessageFlags /*flags*/) { |
| + MojoWriteMessageFlags flags) { |
| DCHECK(port == 0 || port == 1); |
| unsigned destination_port = DestinationPortFromSourcePort(port); |
| base::AutoLock locker(lock_); |
| - DCHECK(is_open_[port]); |
| + DCHECK(endpoints_[port].get()); |
| // The destination port need not be open, unlike the source port. |
| - if (!is_open_[destination_port]) |
| + if (!endpoints_[destination_port].get()) |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| - bool dest_was_empty = message_queues_[destination_port].empty(); |
| - |
| - // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|. |
| - message_queues_[destination_port].push_back( |
| - MessageInTransit::Create(bytes, num_bytes)); |
| - // TODO(vtl): Support sending handles. |
| - |
| - // The other (destination) port was empty and now isn't, so it should now be |
| - // readable. Wake up anyone waiting on this. |
| - if (dest_was_empty) { |
| - waiter_lists_[destination_port].AwakeWaitersForStateChange( |
| - SatisfiedFlagsNoLock(destination_port), |
| - SatisfiableFlagsNoLock(destination_port)); |
| - } |
| - |
| - return MOJO_RESULT_OK; |
| + return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes, |
| + handles, num_handles, |
| + flags); |
| } |
| MojoResult MessagePipe::ReadMessage(unsigned port, |
| @@ -111,48 +78,12 @@ MojoResult MessagePipe::ReadMessage(unsigned port, |
| MojoReadMessageFlags flags) { |
| DCHECK(port == 0 || port == 1); |
| - const uint32_t max_bytes = num_bytes ? *num_bytes : 0; |
| - // TODO(vtl): We'll need this later: |
| - // const uint32_t max_handles = num_handles ? *num_handles : 0; |
| - |
| base::AutoLock locker(lock_); |
| - DCHECK(is_open_[port]); |
| - |
| - if (message_queues_[port].empty()) |
| - return MOJO_RESULT_NOT_FOUND; |
| - |
| - // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
| - // and release the lock immediately. |
| - bool not_enough_space = false; |
| - MessageInTransit* const message = message_queues_[port].front(); |
| - if (num_bytes) |
| - *num_bytes = message->data_size(); |
| - if (message->data_size() <= max_bytes) |
| - memcpy(bytes, message->data(), message->data_size()); |
| - else |
| - not_enough_space = true; |
| - |
| - // TODO(vtl): Support receiving handles. |
| - if (num_handles) |
| - *num_handles = 0; |
| - |
| - if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { |
| - message_queues_[port].pop_front(); |
| - message->Destroy(); |
| - |
| - // Now it's empty, thus no longer readable. |
| - if (message_queues_[port].empty()) { |
| - // It's currently not possible to wait for non-readability, but we should |
| - // do the state change anyway. |
| - waiter_lists_[port].AwakeWaitersForStateChange( |
| - SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port)); |
| - } |
| - } |
| - |
| - if (not_enough_space) |
| - return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| - |
| - return MOJO_RESULT_OK; |
| + DCHECK(endpoints_[port].get()); |
| + |
| + return endpoints_[port]->ReadMessage(bytes, num_bytes, |
| + handles, num_handles, |
| + flags); |
| } |
| MojoResult MessagePipe::AddWaiter(unsigned port, |
| @@ -162,64 +93,26 @@ MojoResult MessagePipe::AddWaiter(unsigned port, |
| DCHECK(port == 0 || port == 1); |
| base::AutoLock locker(lock_); |
| - DCHECK(is_open_[port]); |
| - |
| - if ((flags & SatisfiedFlagsNoLock(port))) |
| - return MOJO_RESULT_ALREADY_EXISTS; |
| - if (!(flags & SatisfiableFlagsNoLock(port))) |
| - return MOJO_RESULT_FAILED_PRECONDITION; |
| + DCHECK(endpoints_[port].get()); |
| - waiter_lists_[port].AddWaiter(waiter, flags, wake_result); |
| - return MOJO_RESULT_OK; |
| + return endpoints_[port]->AddWaiter(waiter, flags, wake_result); |
| } |
| void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { |
| DCHECK(port == 0 || port == 1); |
| base::AutoLock locker(lock_); |
| - DCHECK(is_open_[port]); |
| + DCHECK(endpoints_[port].get()); |
| - waiter_lists_[port].RemoveWaiter(waiter); |
| + endpoints_[port]->RemoveWaiter(waiter); |
| } |
| MessagePipe::~MessagePipe() { |
| // Owned by the dispatchers. The owning dispatchers should only release us via |
| // their |Close()| method, which should inform us of being closed via our |
| // |Close()|. Thus these should already be null. |
| - DCHECK(!is_open_[0]); |
| - DCHECK(!is_open_[1]); |
| -} |
| - |
| -MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) { |
| - DCHECK(port == 0 || port == 1); |
| - |
| - unsigned destination_port = DestinationPortFromSourcePort(port); |
| - |
| - lock_.AssertAcquired(); |
| - |
| - MojoWaitFlags satisfied_flags = 0; |
| - if (!message_queues_[port].empty()) |
| - satisfied_flags |= MOJO_WAIT_FLAG_READABLE; |
| - if (is_open_[destination_port]) |
| - satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE; |
| - |
| - return satisfied_flags; |
| -} |
| - |
| -MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) { |
| - DCHECK(port == 0 || port == 1); |
| - |
| - unsigned destination_port = DestinationPortFromSourcePort(port); |
| - |
| - lock_.AssertAcquired(); |
| - |
| - MojoWaitFlags satisfiable_flags = 0; |
| - if (!message_queues_[port].empty() || is_open_[destination_port]) |
| - satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; |
| - if (is_open_[destination_port]) |
| - satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; |
| - |
| - return satisfiable_flags; |
| + DCHECK(!endpoints_[0].get()); |
| + DCHECK(!endpoints_[1].get()); |
| } |
| } // namespace system |