Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(19)

Unified Diff: mojo/system/message_pipe.cc

Issue 23621056: Initial in-process implementation of some Mojo primitives. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: build fix Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/system/message_pipe.cc
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
new file mode 100644
index 0000000000000000000000000000000000000000..8461705e4ebae51b2b66d9c4303ea1872e2310ad
--- /dev/null
+++ b/mojo/system/message_pipe.cc
@@ -0,0 +1,248 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/message_pipe.h"
+
+#include "base/logging.h"
+#include "base/stl_util.h"
+#include "mojo/system/limits.h"
+#include "mojo/system/memory.h"
+
+namespace mojo {
+namespace system {
+
+namespace {
+
+unsigned DestinationPortFromSourcePort(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+ return port ^ 1;
+}
+
+} // namespace
+
+MessagePipe::MessagePipe() {
+ is_open_[0] = is_open_[1] = true;
+}
+
+void MessagePipe::CancelAllWaiters(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+
+ base::AutoLock locker(lock_);
+ DCHECK(is_open_[port]);
+
+ waiter_lists_[port].CancelAllWaiters();
+}
+
+void MessagePipe::Close(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+
+ unsigned destination_port = DestinationPortFromSourcePort(port);
+
+ base::AutoLock locker(lock_);
+ DCHECK(is_open_[port]);
+
+ // Record the old state of the other (destination) port, so we can tell if it
+ // changes.
+ // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we
+ // don't have to do this.
+ MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE;
+ MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE;
+ bool dest_is_open = is_open_[destination_port];
+ if (dest_is_open) {
+ old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port);
+ old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port);
+ }
+
+ is_open_[port] = false;
+ STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port.
+
+ // Notify the other (destination) port if its state has changed.
+ if (dest_is_open) {
+ MojoWaitFlags new_dest_satisfied_flags =
+ SatisfiedFlagsNoLock(destination_port);
+ MojoWaitFlags new_dest_satisfiable_flags =
+ SatisfiableFlagsNoLock(destination_port);
+ if (new_dest_satisfied_flags != old_dest_satisfied_flags ||
+ new_dest_satisfiable_flags != old_dest_satisfiable_flags) {
+ waiter_lists_[destination_port].AwakeWaitersForStateChange(
+ new_dest_satisfied_flags, new_dest_satisfiable_flags);
+ }
+ }
+}
+
+MojoResult MessagePipe::WriteMessage(
+ unsigned port,
+ const void* bytes, uint32_t num_bytes,
+ const MojoHandle* handles, uint32_t num_handles,
+ MojoWriteMessageFlags /*flags*/) {
+ DCHECK(port == 0 || port == 1);
+
+ unsigned destination_port = DestinationPortFromSourcePort(port);
+
+ if (!VerifyUserPointer(bytes, num_bytes, 1))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (num_bytes > kMaxMessageNumBytes)
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+
+ if (!VerifyUserPointer(handles, num_handles, sizeof(handles[0])))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (num_handles > kMaxMessageNumHandles)
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+ if (num_handles > 0) {
+ // TODO(vtl): Verify each handle.
+ NOTIMPLEMENTED();
+ return MOJO_RESULT_UNIMPLEMENTED;
+ }
+
+ // TODO(vtl): Handle flags.
+
+ base::AutoLock locker(lock_);
+ DCHECK(is_open_[port]);
+
+ // The destination port need not be open, unlike the source port.
+ if (!is_open_[destination_port])
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ bool dest_was_empty = message_queues_[destination_port].empty();
+
+ // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
+ message_queues_[destination_port].push_back(
+ new MessageInTransit(bytes, num_bytes));
+ // TODO(vtl): Support sending handles.
+
+ // The other (destination) port was empty and now isn't, so it should now be
+ // readable. Wake up anyone waiting on this.
+ if (dest_was_empty) {
+ waiter_lists_[destination_port].AwakeWaitersForStateChange(
+ SatisfiedFlagsNoLock(destination_port),
+ SatisfiableFlagsNoLock(destination_port));
+ }
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult MessagePipe::ReadMessage(unsigned port,
+ void* bytes, uint32_t* num_bytes,
+ MojoHandle* handles, uint32_t* num_handles,
+ MojoReadMessageFlags flags) {
+ DCHECK(port == 0 || port == 1);
+
+ const size_t max_bytes = num_bytes ? *num_bytes : 0;
+ if (!VerifyUserPointer(bytes, max_bytes, 1))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ const size_t max_handles = num_handles ? *num_handles : 0;
+ if (!VerifyUserPointer(handles, max_handles, sizeof(handles[0])))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ base::AutoLock locker(lock_);
+ DCHECK(is_open_[port]);
+
+ if (message_queues_[port].empty())
+ return MOJO_RESULT_NOT_FOUND;
+
+ // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
+ // and release the lock immediately.
+ bool not_enough_space = false;
+ MessageInTransit* const message = message_queues_[port].front();
+ const size_t message_size = message->data.size();
+ if (num_bytes)
+ *num_bytes = static_cast<uint32_t>(message_size);
+ if (message_size <= max_bytes)
+ memcpy(bytes, message->data.data(), message_size);
+ else
+ not_enough_space = true;
+
+ // TODO(vtl): Support receiving handles.
+ if (num_handles)
+ *num_handles = 0;
+
+ if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
+ message_queues_[port].pop_front();
+ delete message;
+
+ // Now it's empty, thus no longer readable.
+ if (message_queues_[port].empty()) {
+ // It's currently not possible to wait for non-readability, but we should
+ // do the state change anyway.
+ waiter_lists_[port].AwakeWaitersForStateChange(
+ SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port));
+ }
+ }
+
+ if (not_enough_space)
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult MessagePipe::AddWaiter(unsigned port,
+ Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) {
+ DCHECK(port == 0 || port == 1);
+
+ base::AutoLock locker(lock_);
+ DCHECK(is_open_[port]);
+
+ if ((flags & SatisfiedFlagsNoLock(port)))
+ return MOJO_RESULT_ALREADY_EXISTS;
+ if (!(flags & SatisfiableFlagsNoLock(port)))
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ waiter_lists_[port].AddWaiter(waiter, flags, wake_result);
+ return MOJO_RESULT_OK;
+}
+
+void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
+ DCHECK(port == 0 || port == 1);
+
+ base::AutoLock locker(lock_);
+ DCHECK(is_open_[port]);
+
+ waiter_lists_[port].RemoveWaiter(waiter);
+}
+
+MessagePipe::~MessagePipe() {
+ // Owned by the dispatchers. The owning dispatchers should only release us via
+ // their |Close()| method, which should inform us of being closed via our
+ // |Close()|. Thus these should already be null.
+ DCHECK(!is_open_[0]);
+ DCHECK(!is_open_[1]);
+}
+
+MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+
+ unsigned destination_port = DestinationPortFromSourcePort(port);
+
+ lock_.AssertAcquired();
+
+ MojoWaitFlags satisfied_flags = 0;
+ if (!message_queues_[port].empty())
+ satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
+ if (is_open_[destination_port])
+ satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
+
+ return satisfied_flags;
+}
+
+MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+
+ unsigned destination_port = DestinationPortFromSourcePort(port);
+
+ lock_.AssertAcquired();
+
+ MojoWaitFlags satisfiable_flags = 0;
+ if (!message_queues_[port].empty() || is_open_[destination_port])
+ satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
+ if (is_open_[destination_port])
+ satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
+
+ return satisfiable_flags;
+}
+
+} // namespace system
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698