Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1938)

Unified Diff: mojo/system/message_pipe.cc

Issue 27060003: Mojo: Abstract out the endpoints of MessagePipes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebased Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/system/message_pipe.h ('k') | mojo/system/message_pipe_endpoint.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/system/message_pipe.cc
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
index 8cd94cbf632f153654dcd5c59061f6ba173396f0..bfb06466efedc59b6e1a20c7d12f02e0d9231841 100644
--- a/mojo/system/message_pipe.cc
+++ b/mojo/system/message_pipe.cc
@@ -6,7 +6,9 @@
#include "base/logging.h"
#include "base/stl_util.h"
+#include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/message_in_transit.h"
+#include "mojo/system/message_pipe_endpoint.h"
namespace mojo {
namespace system {
@@ -20,17 +22,23 @@ unsigned DestinationPortFromSourcePort(unsigned port) {
} // namespace
+MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0,
+ scoped_ptr<MessagePipeEndpoint> endpoint_1) {
+ endpoints_[0].reset(endpoint_0.release());
+ endpoints_[1].reset(endpoint_1.release());
+}
+
MessagePipe::MessagePipe() {
- is_open_[0] = is_open_[1] = true;
+ endpoints_[0].reset(new LocalMessagePipeEndpoint());
+ endpoints_[1].reset(new LocalMessagePipeEndpoint());
}
void MessagePipe::CancelAllWaiters(unsigned port) {
DCHECK(port == 0 || port == 1);
base::AutoLock locker(lock_);
- DCHECK(is_open_[port]);
-
- waiter_lists_[port].CancelAllWaiters();
+ DCHECK(endpoints_[port].get());
+ endpoints_[port]->CancelAllWaiters();
}
void MessagePipe::Close(unsigned port) {
@@ -39,35 +47,13 @@ void MessagePipe::Close(unsigned port) {
unsigned destination_port = DestinationPortFromSourcePort(port);
base::AutoLock locker(lock_);
- DCHECK(is_open_[port]);
-
- // Record the old state of the other (destination) port, so we can tell if it
- // changes.
- // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we
- // don't have to do this.
- MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE;
- MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE;
- bool dest_is_open = is_open_[destination_port];
- if (dest_is_open) {
- old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port);
- old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port);
- }
-
- is_open_[port] = false;
- STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port.
-
- // Notify the other (destination) port if its state has changed.
- if (dest_is_open) {
- MojoWaitFlags new_dest_satisfied_flags =
- SatisfiedFlagsNoLock(destination_port);
- MojoWaitFlags new_dest_satisfiable_flags =
- SatisfiableFlagsNoLock(destination_port);
- if (new_dest_satisfied_flags != old_dest_satisfied_flags ||
- new_dest_satisfiable_flags != old_dest_satisfiable_flags) {
- waiter_lists_[destination_port].AwakeWaitersForStateChange(
- new_dest_satisfied_flags, new_dest_satisfiable_flags);
- }
- }
+ DCHECK(endpoints_[port].get());
+
+ endpoints_[port]->Close();
+ if (endpoints_[destination_port].get())
+ endpoints_[destination_port]->OnPeerClose();
+
+ endpoints_[port].reset();
}
// TODO(vtl): Handle flags.
@@ -75,34 +61,21 @@ MojoResult MessagePipe::WriteMessage(
unsigned port,
const void* bytes, uint32_t num_bytes,
const MojoHandle* handles, uint32_t num_handles,
- MojoWriteMessageFlags /*flags*/) {
+ MojoWriteMessageFlags flags) {
DCHECK(port == 0 || port == 1);
unsigned destination_port = DestinationPortFromSourcePort(port);
base::AutoLock locker(lock_);
- DCHECK(is_open_[port]);
+ DCHECK(endpoints_[port].get());
// The destination port need not be open, unlike the source port.
- if (!is_open_[destination_port])
+ if (!endpoints_[destination_port].get())
return MOJO_RESULT_FAILED_PRECONDITION;
- bool dest_was_empty = message_queues_[destination_port].empty();
-
- // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
- message_queues_[destination_port].push_back(
- MessageInTransit::Create(bytes, num_bytes));
- // TODO(vtl): Support sending handles.
-
- // The other (destination) port was empty and now isn't, so it should now be
- // readable. Wake up anyone waiting on this.
- if (dest_was_empty) {
- waiter_lists_[destination_port].AwakeWaitersForStateChange(
- SatisfiedFlagsNoLock(destination_port),
- SatisfiableFlagsNoLock(destination_port));
- }
-
- return MOJO_RESULT_OK;
+ return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes,
+ handles, num_handles,
+ flags);
}
MojoResult MessagePipe::ReadMessage(unsigned port,
@@ -111,48 +84,12 @@ MojoResult MessagePipe::ReadMessage(unsigned port,
MojoReadMessageFlags flags) {
DCHECK(port == 0 || port == 1);
- const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
- // TODO(vtl): We'll need this later:
- // const uint32_t max_handles = num_handles ? *num_handles : 0;
-
base::AutoLock locker(lock_);
- DCHECK(is_open_[port]);
-
- if (message_queues_[port].empty())
- return MOJO_RESULT_NOT_FOUND;
-
- // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
- // and release the lock immediately.
- bool not_enough_space = false;
- MessageInTransit* const message = message_queues_[port].front();
- if (num_bytes)
- *num_bytes = message->data_size();
- if (message->data_size() <= max_bytes)
- memcpy(bytes, message->data(), message->data_size());
- else
- not_enough_space = true;
-
- // TODO(vtl): Support receiving handles.
- if (num_handles)
- *num_handles = 0;
-
- if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
- message_queues_[port].pop_front();
- message->Destroy();
-
- // Now it's empty, thus no longer readable.
- if (message_queues_[port].empty()) {
- // It's currently not possible to wait for non-readability, but we should
- // do the state change anyway.
- waiter_lists_[port].AwakeWaitersForStateChange(
- SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port));
- }
- }
-
- if (not_enough_space)
- return MOJO_RESULT_RESOURCE_EXHAUSTED;
-
- return MOJO_RESULT_OK;
+ DCHECK(endpoints_[port].get());
+
+ return endpoints_[port]->ReadMessage(bytes, num_bytes,
+ handles, num_handles,
+ flags);
}
MojoResult MessagePipe::AddWaiter(unsigned port,
@@ -162,64 +99,26 @@ MojoResult MessagePipe::AddWaiter(unsigned port,
DCHECK(port == 0 || port == 1);
base::AutoLock locker(lock_);
- DCHECK(is_open_[port]);
+ DCHECK(endpoints_[port].get());
- if ((flags & SatisfiedFlagsNoLock(port)))
- return MOJO_RESULT_ALREADY_EXISTS;
- if (!(flags & SatisfiableFlagsNoLock(port)))
- return MOJO_RESULT_FAILED_PRECONDITION;
-
- waiter_lists_[port].AddWaiter(waiter, flags, wake_result);
- return MOJO_RESULT_OK;
+ return endpoints_[port]->AddWaiter(waiter, flags, wake_result);
}
void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
DCHECK(port == 0 || port == 1);
base::AutoLock locker(lock_);
- DCHECK(is_open_[port]);
+ DCHECK(endpoints_[port].get());
- waiter_lists_[port].RemoveWaiter(waiter);
+ endpoints_[port]->RemoveWaiter(waiter);
}
MessagePipe::~MessagePipe() {
// Owned by the dispatchers. The owning dispatchers should only release us via
// their |Close()| method, which should inform us of being closed via our
// |Close()|. Thus these should already be null.
- DCHECK(!is_open_[0]);
- DCHECK(!is_open_[1]);
-}
-
-MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) {
- DCHECK(port == 0 || port == 1);
-
- unsigned destination_port = DestinationPortFromSourcePort(port);
-
- lock_.AssertAcquired();
-
- MojoWaitFlags satisfied_flags = 0;
- if (!message_queues_[port].empty())
- satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
- if (is_open_[destination_port])
- satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
-
- return satisfied_flags;
-}
-
-MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) {
- DCHECK(port == 0 || port == 1);
-
- unsigned destination_port = DestinationPortFromSourcePort(port);
-
- lock_.AssertAcquired();
-
- MojoWaitFlags satisfiable_flags = 0;
- if (!message_queues_[port].empty() || is_open_[destination_port])
- satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
- if (is_open_[destination_port])
- satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
-
- return satisfiable_flags;
+ DCHECK(!endpoints_[0].get());
+ DCHECK(!endpoints_[1].get());
}
} // namespace system
« no previous file with comments | « mojo/system/message_pipe.h ('k') | mojo/system/message_pipe_endpoint.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698