| Index: mojo/system/message_pipe.cc
|
| diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
|
| index 8cd94cbf632f153654dcd5c59061f6ba173396f0..bfb06466efedc59b6e1a20c7d12f02e0d9231841 100644
|
| --- a/mojo/system/message_pipe.cc
|
| +++ b/mojo/system/message_pipe.cc
|
| @@ -6,7 +6,9 @@
|
|
|
| #include "base/logging.h"
|
| #include "base/stl_util.h"
|
| +#include "mojo/system/local_message_pipe_endpoint.h"
|
| #include "mojo/system/message_in_transit.h"
|
| +#include "mojo/system/message_pipe_endpoint.h"
|
|
|
| namespace mojo {
|
| namespace system {
|
| @@ -20,17 +22,23 @@ unsigned DestinationPortFromSourcePort(unsigned port) {
|
|
|
| } // namespace
|
|
|
| +MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0,
|
| + scoped_ptr<MessagePipeEndpoint> endpoint_1) {
|
| + endpoints_[0].reset(endpoint_0.release());
|
| + endpoints_[1].reset(endpoint_1.release());
|
| +}
|
| +
|
| MessagePipe::MessagePipe() {
|
| - is_open_[0] = is_open_[1] = true;
|
| + endpoints_[0].reset(new LocalMessagePipeEndpoint());
|
| + endpoints_[1].reset(new LocalMessagePipeEndpoint());
|
| }
|
|
|
| void MessagePipe::CancelAllWaiters(unsigned port) {
|
| DCHECK(port == 0 || port == 1);
|
|
|
| base::AutoLock locker(lock_);
|
| - DCHECK(is_open_[port]);
|
| -
|
| - waiter_lists_[port].CancelAllWaiters();
|
| + DCHECK(endpoints_[port].get());
|
| + endpoints_[port]->CancelAllWaiters();
|
| }
|
|
|
| void MessagePipe::Close(unsigned port) {
|
| @@ -39,35 +47,13 @@ void MessagePipe::Close(unsigned port) {
|
| unsigned destination_port = DestinationPortFromSourcePort(port);
|
|
|
| base::AutoLock locker(lock_);
|
| - DCHECK(is_open_[port]);
|
| -
|
| - // Record the old state of the other (destination) port, so we can tell if it
|
| - // changes.
|
| - // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we
|
| - // don't have to do this.
|
| - MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE;
|
| - MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE;
|
| - bool dest_is_open = is_open_[destination_port];
|
| - if (dest_is_open) {
|
| - old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port);
|
| - old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port);
|
| - }
|
| -
|
| - is_open_[port] = false;
|
| - STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port.
|
| -
|
| - // Notify the other (destination) port if its state has changed.
|
| - if (dest_is_open) {
|
| - MojoWaitFlags new_dest_satisfied_flags =
|
| - SatisfiedFlagsNoLock(destination_port);
|
| - MojoWaitFlags new_dest_satisfiable_flags =
|
| - SatisfiableFlagsNoLock(destination_port);
|
| - if (new_dest_satisfied_flags != old_dest_satisfied_flags ||
|
| - new_dest_satisfiable_flags != old_dest_satisfiable_flags) {
|
| - waiter_lists_[destination_port].AwakeWaitersForStateChange(
|
| - new_dest_satisfied_flags, new_dest_satisfiable_flags);
|
| - }
|
| - }
|
| + DCHECK(endpoints_[port].get());
|
| +
|
| + endpoints_[port]->Close();
|
| + if (endpoints_[destination_port].get())
|
| + endpoints_[destination_port]->OnPeerClose();
|
| +
|
| + endpoints_[port].reset();
|
| }
|
|
|
| // TODO(vtl): Handle flags.
|
| @@ -75,34 +61,21 @@ MojoResult MessagePipe::WriteMessage(
|
| unsigned port,
|
| const void* bytes, uint32_t num_bytes,
|
| const MojoHandle* handles, uint32_t num_handles,
|
| - MojoWriteMessageFlags /*flags*/) {
|
| + MojoWriteMessageFlags flags) {
|
| DCHECK(port == 0 || port == 1);
|
|
|
| unsigned destination_port = DestinationPortFromSourcePort(port);
|
|
|
| base::AutoLock locker(lock_);
|
| - DCHECK(is_open_[port]);
|
| + DCHECK(endpoints_[port].get());
|
|
|
| // The destination port need not be open, unlike the source port.
|
| - if (!is_open_[destination_port])
|
| + if (!endpoints_[destination_port].get())
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
|
|
| - bool dest_was_empty = message_queues_[destination_port].empty();
|
| -
|
| - // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
|
| - message_queues_[destination_port].push_back(
|
| - MessageInTransit::Create(bytes, num_bytes));
|
| - // TODO(vtl): Support sending handles.
|
| -
|
| - // The other (destination) port was empty and now isn't, so it should now be
|
| - // readable. Wake up anyone waiting on this.
|
| - if (dest_was_empty) {
|
| - waiter_lists_[destination_port].AwakeWaitersForStateChange(
|
| - SatisfiedFlagsNoLock(destination_port),
|
| - SatisfiableFlagsNoLock(destination_port));
|
| - }
|
| -
|
| - return MOJO_RESULT_OK;
|
| + return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes,
|
| + handles, num_handles,
|
| + flags);
|
| }
|
|
|
| MojoResult MessagePipe::ReadMessage(unsigned port,
|
| @@ -111,48 +84,12 @@ MojoResult MessagePipe::ReadMessage(unsigned port,
|
| MojoReadMessageFlags flags) {
|
| DCHECK(port == 0 || port == 1);
|
|
|
| - const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
|
| - // TODO(vtl): We'll need this later:
|
| - // const uint32_t max_handles = num_handles ? *num_handles : 0;
|
| -
|
| base::AutoLock locker(lock_);
|
| - DCHECK(is_open_[port]);
|
| -
|
| - if (message_queues_[port].empty())
|
| - return MOJO_RESULT_NOT_FOUND;
|
| -
|
| - // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
|
| - // and release the lock immediately.
|
| - bool not_enough_space = false;
|
| - MessageInTransit* const message = message_queues_[port].front();
|
| - if (num_bytes)
|
| - *num_bytes = message->data_size();
|
| - if (message->data_size() <= max_bytes)
|
| - memcpy(bytes, message->data(), message->data_size());
|
| - else
|
| - not_enough_space = true;
|
| -
|
| - // TODO(vtl): Support receiving handles.
|
| - if (num_handles)
|
| - *num_handles = 0;
|
| -
|
| - if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
|
| - message_queues_[port].pop_front();
|
| - message->Destroy();
|
| -
|
| - // Now it's empty, thus no longer readable.
|
| - if (message_queues_[port].empty()) {
|
| - // It's currently not possible to wait for non-readability, but we should
|
| - // do the state change anyway.
|
| - waiter_lists_[port].AwakeWaitersForStateChange(
|
| - SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port));
|
| - }
|
| - }
|
| -
|
| - if (not_enough_space)
|
| - return MOJO_RESULT_RESOURCE_EXHAUSTED;
|
| -
|
| - return MOJO_RESULT_OK;
|
| + DCHECK(endpoints_[port].get());
|
| +
|
| + return endpoints_[port]->ReadMessage(bytes, num_bytes,
|
| + handles, num_handles,
|
| + flags);
|
| }
|
|
|
| MojoResult MessagePipe::AddWaiter(unsigned port,
|
| @@ -162,64 +99,26 @@ MojoResult MessagePipe::AddWaiter(unsigned port,
|
| DCHECK(port == 0 || port == 1);
|
|
|
| base::AutoLock locker(lock_);
|
| - DCHECK(is_open_[port]);
|
| + DCHECK(endpoints_[port].get());
|
|
|
| - if ((flags & SatisfiedFlagsNoLock(port)))
|
| - return MOJO_RESULT_ALREADY_EXISTS;
|
| - if (!(flags & SatisfiableFlagsNoLock(port)))
|
| - return MOJO_RESULT_FAILED_PRECONDITION;
|
| -
|
| - waiter_lists_[port].AddWaiter(waiter, flags, wake_result);
|
| - return MOJO_RESULT_OK;
|
| + return endpoints_[port]->AddWaiter(waiter, flags, wake_result);
|
| }
|
|
|
| void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
|
| DCHECK(port == 0 || port == 1);
|
|
|
| base::AutoLock locker(lock_);
|
| - DCHECK(is_open_[port]);
|
| + DCHECK(endpoints_[port].get());
|
|
|
| - waiter_lists_[port].RemoveWaiter(waiter);
|
| + endpoints_[port]->RemoveWaiter(waiter);
|
| }
|
|
|
| MessagePipe::~MessagePipe() {
|
| // Owned by the dispatchers. The owning dispatchers should only release us via
|
| // their |Close()| method, which should inform us of being closed via our
|
| // |Close()|. Thus these should already be null.
|
| - DCHECK(!is_open_[0]);
|
| - DCHECK(!is_open_[1]);
|
| -}
|
| -
|
| -MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) {
|
| - DCHECK(port == 0 || port == 1);
|
| -
|
| - unsigned destination_port = DestinationPortFromSourcePort(port);
|
| -
|
| - lock_.AssertAcquired();
|
| -
|
| - MojoWaitFlags satisfied_flags = 0;
|
| - if (!message_queues_[port].empty())
|
| - satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
|
| - if (is_open_[destination_port])
|
| - satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
|
| -
|
| - return satisfied_flags;
|
| -}
|
| -
|
| -MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) {
|
| - DCHECK(port == 0 || port == 1);
|
| -
|
| - unsigned destination_port = DestinationPortFromSourcePort(port);
|
| -
|
| - lock_.AssertAcquired();
|
| -
|
| - MojoWaitFlags satisfiable_flags = 0;
|
| - if (!message_queues_[port].empty() || is_open_[destination_port])
|
| - satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
|
| - if (is_open_[destination_port])
|
| - satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
|
| -
|
| - return satisfiable_flags;
|
| + DCHECK(!endpoints_[0].get());
|
| + DCHECK(!endpoints_[1].get());
|
| }
|
|
|
| } // namespace system
|
|
|