Index: mojo/system/message_pipe.cc |
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc |
index 8cd94cbf632f153654dcd5c59061f6ba173396f0..bfb06466efedc59b6e1a20c7d12f02e0d9231841 100644 |
--- a/mojo/system/message_pipe.cc |
+++ b/mojo/system/message_pipe.cc |
@@ -6,7 +6,9 @@ |
#include "base/logging.h" |
#include "base/stl_util.h" |
+#include "mojo/system/local_message_pipe_endpoint.h" |
#include "mojo/system/message_in_transit.h" |
+#include "mojo/system/message_pipe_endpoint.h" |
namespace mojo { |
namespace system { |
@@ -20,17 +22,23 @@ unsigned DestinationPortFromSourcePort(unsigned port) { |
} // namespace |
+MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0, |
+ scoped_ptr<MessagePipeEndpoint> endpoint_1) { |
+ endpoints_[0].reset(endpoint_0.release()); |
+ endpoints_[1].reset(endpoint_1.release()); |
+} |
+ |
MessagePipe::MessagePipe() { |
- is_open_[0] = is_open_[1] = true; |
+ endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
+ endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
} |
void MessagePipe::CancelAllWaiters(unsigned port) { |
DCHECK(port == 0 || port == 1); |
base::AutoLock locker(lock_); |
- DCHECK(is_open_[port]); |
- |
- waiter_lists_[port].CancelAllWaiters(); |
+ DCHECK(endpoints_[port].get()); |
+ endpoints_[port]->CancelAllWaiters(); |
} |
void MessagePipe::Close(unsigned port) { |
@@ -39,35 +47,13 @@ void MessagePipe::Close(unsigned port) { |
unsigned destination_port = DestinationPortFromSourcePort(port); |
base::AutoLock locker(lock_); |
- DCHECK(is_open_[port]); |
- |
- // Record the old state of the other (destination) port, so we can tell if it |
- // changes. |
- // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we |
- // don't have to do this. |
- MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE; |
- MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE; |
- bool dest_is_open = is_open_[destination_port]; |
- if (dest_is_open) { |
- old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port); |
- old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port); |
- } |
- |
- is_open_[port] = false; |
- STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port. |
- |
- // Notify the other (destination) port if its state has changed. |
- if (dest_is_open) { |
- MojoWaitFlags new_dest_satisfied_flags = |
- SatisfiedFlagsNoLock(destination_port); |
- MojoWaitFlags new_dest_satisfiable_flags = |
- SatisfiableFlagsNoLock(destination_port); |
- if (new_dest_satisfied_flags != old_dest_satisfied_flags || |
- new_dest_satisfiable_flags != old_dest_satisfiable_flags) { |
- waiter_lists_[destination_port].AwakeWaitersForStateChange( |
- new_dest_satisfied_flags, new_dest_satisfiable_flags); |
- } |
- } |
+ DCHECK(endpoints_[port].get()); |
+ |
+ endpoints_[port]->Close(); |
+ if (endpoints_[destination_port].get()) |
+ endpoints_[destination_port]->OnPeerClose(); |
+ |
+ endpoints_[port].reset(); |
} |
// TODO(vtl): Handle flags. |
@@ -75,34 +61,21 @@ MojoResult MessagePipe::WriteMessage( |
unsigned port, |
const void* bytes, uint32_t num_bytes, |
const MojoHandle* handles, uint32_t num_handles, |
- MojoWriteMessageFlags /*flags*/) { |
+ MojoWriteMessageFlags flags) { |
DCHECK(port == 0 || port == 1); |
unsigned destination_port = DestinationPortFromSourcePort(port); |
base::AutoLock locker(lock_); |
- DCHECK(is_open_[port]); |
+ DCHECK(endpoints_[port].get()); |
// The destination port need not be open, unlike the source port. |
- if (!is_open_[destination_port]) |
+ if (!endpoints_[destination_port].get()) |
return MOJO_RESULT_FAILED_PRECONDITION; |
- bool dest_was_empty = message_queues_[destination_port].empty(); |
- |
- // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|. |
- message_queues_[destination_port].push_back( |
- MessageInTransit::Create(bytes, num_bytes)); |
- // TODO(vtl): Support sending handles. |
- |
- // The other (destination) port was empty and now isn't, so it should now be |
- // readable. Wake up anyone waiting on this. |
- if (dest_was_empty) { |
- waiter_lists_[destination_port].AwakeWaitersForStateChange( |
- SatisfiedFlagsNoLock(destination_port), |
- SatisfiableFlagsNoLock(destination_port)); |
- } |
- |
- return MOJO_RESULT_OK; |
+ return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes, |
+ handles, num_handles, |
+ flags); |
} |
MojoResult MessagePipe::ReadMessage(unsigned port, |
@@ -111,48 +84,12 @@ MojoResult MessagePipe::ReadMessage(unsigned port, |
MojoReadMessageFlags flags) { |
DCHECK(port == 0 || port == 1); |
- const uint32_t max_bytes = num_bytes ? *num_bytes : 0; |
- // TODO(vtl): We'll need this later: |
- // const uint32_t max_handles = num_handles ? *num_handles : 0; |
- |
base::AutoLock locker(lock_); |
- DCHECK(is_open_[port]); |
- |
- if (message_queues_[port].empty()) |
- return MOJO_RESULT_NOT_FOUND; |
- |
- // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
- // and release the lock immediately. |
- bool not_enough_space = false; |
- MessageInTransit* const message = message_queues_[port].front(); |
- if (num_bytes) |
- *num_bytes = message->data_size(); |
- if (message->data_size() <= max_bytes) |
- memcpy(bytes, message->data(), message->data_size()); |
- else |
- not_enough_space = true; |
- |
- // TODO(vtl): Support receiving handles. |
- if (num_handles) |
- *num_handles = 0; |
- |
- if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { |
- message_queues_[port].pop_front(); |
- message->Destroy(); |
- |
- // Now it's empty, thus no longer readable. |
- if (message_queues_[port].empty()) { |
- // It's currently not possible to wait for non-readability, but we should |
- // do the state change anyway. |
- waiter_lists_[port].AwakeWaitersForStateChange( |
- SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port)); |
- } |
- } |
- |
- if (not_enough_space) |
- return MOJO_RESULT_RESOURCE_EXHAUSTED; |
- |
- return MOJO_RESULT_OK; |
+ DCHECK(endpoints_[port].get()); |
+ |
+ return endpoints_[port]->ReadMessage(bytes, num_bytes, |
+ handles, num_handles, |
+ flags); |
} |
MojoResult MessagePipe::AddWaiter(unsigned port, |
@@ -162,64 +99,26 @@ MojoResult MessagePipe::AddWaiter(unsigned port, |
DCHECK(port == 0 || port == 1); |
base::AutoLock locker(lock_); |
- DCHECK(is_open_[port]); |
+ DCHECK(endpoints_[port].get()); |
- if ((flags & SatisfiedFlagsNoLock(port))) |
- return MOJO_RESULT_ALREADY_EXISTS; |
- if (!(flags & SatisfiableFlagsNoLock(port))) |
- return MOJO_RESULT_FAILED_PRECONDITION; |
- |
- waiter_lists_[port].AddWaiter(waiter, flags, wake_result); |
- return MOJO_RESULT_OK; |
+ return endpoints_[port]->AddWaiter(waiter, flags, wake_result); |
} |
void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { |
DCHECK(port == 0 || port == 1); |
base::AutoLock locker(lock_); |
- DCHECK(is_open_[port]); |
+ DCHECK(endpoints_[port].get()); |
- waiter_lists_[port].RemoveWaiter(waiter); |
+ endpoints_[port]->RemoveWaiter(waiter); |
} |
MessagePipe::~MessagePipe() { |
// Owned by the dispatchers. The owning dispatchers should only release us via |
// their |Close()| method, which should inform us of being closed via our |
// |Close()|. Thus these should already be null. |
- DCHECK(!is_open_[0]); |
- DCHECK(!is_open_[1]); |
-} |
- |
-MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) { |
- DCHECK(port == 0 || port == 1); |
- |
- unsigned destination_port = DestinationPortFromSourcePort(port); |
- |
- lock_.AssertAcquired(); |
- |
- MojoWaitFlags satisfied_flags = 0; |
- if (!message_queues_[port].empty()) |
- satisfied_flags |= MOJO_WAIT_FLAG_READABLE; |
- if (is_open_[destination_port]) |
- satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE; |
- |
- return satisfied_flags; |
-} |
- |
-MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) { |
- DCHECK(port == 0 || port == 1); |
- |
- unsigned destination_port = DestinationPortFromSourcePort(port); |
- |
- lock_.AssertAcquired(); |
- |
- MojoWaitFlags satisfiable_flags = 0; |
- if (!message_queues_[port].empty() || is_open_[destination_port]) |
- satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; |
- if (is_open_[destination_port]) |
- satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; |
- |
- return satisfiable_flags; |
+ DCHECK(!endpoints_[0].get()); |
+ DCHECK(!endpoints_[1].get()); |
} |
} // namespace system |