| Index: mojo/system/message_pipe.cc
|
| diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..8461705e4ebae51b2b66d9c4303ea1872e2310ad
|
| --- /dev/null
|
| +++ b/mojo/system/message_pipe.cc
|
| @@ -0,0 +1,248 @@
|
| +// Copyright 2013 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "mojo/system/message_pipe.h"
|
| +
|
| +#include "base/logging.h"
|
| +#include "base/stl_util.h"
|
| +#include "mojo/system/limits.h"
|
| +#include "mojo/system/memory.h"
|
| +
|
| +namespace mojo {
|
| +namespace system {
|
| +
|
| +namespace {
|
| +
|
| +unsigned DestinationPortFromSourcePort(unsigned port) {
|
| + DCHECK(port == 0 || port == 1);
|
| + return port ^ 1;
|
| +}
|
| +
|
| +} // namespace
|
| +
|
| +MessagePipe::MessagePipe() {
|
| + is_open_[0] = is_open_[1] = true;
|
| +}
|
| +
|
| +void MessagePipe::CancelAllWaiters(unsigned port) {
|
| + DCHECK(port == 0 || port == 1);
|
| +
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(is_open_[port]);
|
| +
|
| + waiter_lists_[port].CancelAllWaiters();
|
| +}
|
| +
|
| +void MessagePipe::Close(unsigned port) {
|
| + DCHECK(port == 0 || port == 1);
|
| +
|
| + unsigned destination_port = DestinationPortFromSourcePort(port);
|
| +
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(is_open_[port]);
|
| +
|
| + // Record the old state of the other (destination) port, so we can tell if it
|
| + // changes.
|
| + // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we
|
| + // don't have to do this.
|
| + MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE;
|
| + MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE;
|
| + bool dest_is_open = is_open_[destination_port];
|
| + if (dest_is_open) {
|
| + old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port);
|
| + old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port);
|
| + }
|
| +
|
| + is_open_[port] = false;
|
| + STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port.
|
| +
|
| + // Notify the other (destination) port if its state has changed.
|
| + if (dest_is_open) {
|
| + MojoWaitFlags new_dest_satisfied_flags =
|
| + SatisfiedFlagsNoLock(destination_port);
|
| + MojoWaitFlags new_dest_satisfiable_flags =
|
| + SatisfiableFlagsNoLock(destination_port);
|
| + if (new_dest_satisfied_flags != old_dest_satisfied_flags ||
|
| + new_dest_satisfiable_flags != old_dest_satisfiable_flags) {
|
| + waiter_lists_[destination_port].AwakeWaitersForStateChange(
|
| + new_dest_satisfied_flags, new_dest_satisfiable_flags);
|
| + }
|
| + }
|
| +}
|
| +
|
| +MojoResult MessagePipe::WriteMessage(
|
| + unsigned port,
|
| + const void* bytes, uint32_t num_bytes,
|
| + const MojoHandle* handles, uint32_t num_handles,
|
| + MojoWriteMessageFlags /*flags*/) {
|
| + DCHECK(port == 0 || port == 1);
|
| +
|
| + unsigned destination_port = DestinationPortFromSourcePort(port);
|
| +
|
| + if (!VerifyUserPointer(bytes, num_bytes, 1))
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + if (num_bytes > kMaxMessageNumBytes)
|
| + return MOJO_RESULT_RESOURCE_EXHAUSTED;
|
| +
|
| + if (!VerifyUserPointer(handles, num_handles, sizeof(handles[0])))
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + if (num_handles > kMaxMessageNumHandles)
|
| + return MOJO_RESULT_RESOURCE_EXHAUSTED;
|
| + if (num_handles > 0) {
|
| + // TODO(vtl): Verify each handle.
|
| + NOTIMPLEMENTED();
|
| + return MOJO_RESULT_UNIMPLEMENTED;
|
| + }
|
| +
|
| + // TODO(vtl): Handle flags.
|
| +
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(is_open_[port]);
|
| +
|
| + // The destination port need not be open, unlike the source port.
|
| + if (!is_open_[destination_port])
|
| + return MOJO_RESULT_FAILED_PRECONDITION;
|
| +
|
| + bool dest_was_empty = message_queues_[destination_port].empty();
|
| +
|
| + // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
|
| + message_queues_[destination_port].push_back(
|
| + new MessageInTransit(bytes, num_bytes));
|
| + // TODO(vtl): Support sending handles.
|
| +
|
| + // The other (destination) port was empty and now isn't, so it should now be
|
| + // readable. Wake up anyone waiting on this.
|
| + if (dest_was_empty) {
|
| + waiter_lists_[destination_port].AwakeWaitersForStateChange(
|
| + SatisfiedFlagsNoLock(destination_port),
|
| + SatisfiableFlagsNoLock(destination_port));
|
| + }
|
| +
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +MojoResult MessagePipe::ReadMessage(unsigned port,
|
| + void* bytes, uint32_t* num_bytes,
|
| + MojoHandle* handles, uint32_t* num_handles,
|
| + MojoReadMessageFlags flags) {
|
| + DCHECK(port == 0 || port == 1);
|
| +
|
| + const size_t max_bytes = num_bytes ? *num_bytes : 0;
|
| + if (!VerifyUserPointer(bytes, max_bytes, 1))
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + const size_t max_handles = num_handles ? *num_handles : 0;
|
| + if (!VerifyUserPointer(handles, max_handles, sizeof(handles[0])))
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(is_open_[port]);
|
| +
|
| + if (message_queues_[port].empty())
|
| + return MOJO_RESULT_NOT_FOUND;
|
| +
|
| + // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
|
| + // and release the lock immediately.
|
| + bool not_enough_space = false;
|
| + MessageInTransit* const message = message_queues_[port].front();
|
| + const size_t message_size = message->data.size();
|
| + if (num_bytes)
|
| + *num_bytes = static_cast<uint32_t>(message_size);
|
| + if (message_size <= max_bytes)
|
| + memcpy(bytes, message->data.data(), message_size);
|
| + else
|
| + not_enough_space = true;
|
| +
|
| + // TODO(vtl): Support receiving handles.
|
| + if (num_handles)
|
| + *num_handles = 0;
|
| +
|
| + if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
|
| + message_queues_[port].pop_front();
|
| + delete message;
|
| +
|
| + // Now it's empty, thus no longer readable.
|
| + if (message_queues_[port].empty()) {
|
| + // It's currently not possible to wait for non-readability, but we should
|
| + // do the state change anyway.
|
| + waiter_lists_[port].AwakeWaitersForStateChange(
|
| + SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port));
|
| + }
|
| + }
|
| +
|
| + if (not_enough_space)
|
| + return MOJO_RESULT_RESOURCE_EXHAUSTED;
|
| +
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +MojoResult MessagePipe::AddWaiter(unsigned port,
|
| + Waiter* waiter,
|
| + MojoWaitFlags flags,
|
| + MojoResult wake_result) {
|
| + DCHECK(port == 0 || port == 1);
|
| +
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(is_open_[port]);
|
| +
|
| + if ((flags & SatisfiedFlagsNoLock(port)))
|
| + return MOJO_RESULT_ALREADY_EXISTS;
|
| + if (!(flags & SatisfiableFlagsNoLock(port)))
|
| + return MOJO_RESULT_FAILED_PRECONDITION;
|
| +
|
| + waiter_lists_[port].AddWaiter(waiter, flags, wake_result);
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
|
| + DCHECK(port == 0 || port == 1);
|
| +
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(is_open_[port]);
|
| +
|
| + waiter_lists_[port].RemoveWaiter(waiter);
|
| +}
|
| +
|
| +MessagePipe::~MessagePipe() {
|
| + // Owned by the dispatchers. The owning dispatchers should only release us via
|
| + // their |Close()| method, which should inform us of being closed via our
|
| + // |Close()|. Thus these should already be null.
|
| + DCHECK(!is_open_[0]);
|
| + DCHECK(!is_open_[1]);
|
| +}
|
| +
|
| +MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) {
|
| + DCHECK(port == 0 || port == 1);
|
| +
|
| + unsigned destination_port = DestinationPortFromSourcePort(port);
|
| +
|
| + lock_.AssertAcquired();
|
| +
|
| + MojoWaitFlags satisfied_flags = 0;
|
| + if (!message_queues_[port].empty())
|
| + satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
|
| + if (is_open_[destination_port])
|
| + satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
|
| +
|
| + return satisfied_flags;
|
| +}
|
| +
|
| +MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) {
|
| + DCHECK(port == 0 || port == 1);
|
| +
|
| + unsigned destination_port = DestinationPortFromSourcePort(port);
|
| +
|
| + lock_.AssertAcquired();
|
| +
|
| + MojoWaitFlags satisfiable_flags = 0;
|
| + if (!message_queues_[port].empty() || is_open_[destination_port])
|
| + satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
|
| + if (is_open_[destination_port])
|
| + satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
|
| +
|
| + return satisfiable_flags;
|
| +}
|
| +
|
| +} // namespace system
|
| +} // namespace mojo
|
|
|