| Index: mojo/system/local_message_pipe_endpoint.cc
|
| diff --git a/mojo/system/local_message_pipe_endpoint.cc b/mojo/system/local_message_pipe_endpoint.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..2dafe0a6d2ab1b802d1b8b3348c3790e2046031b
|
| --- /dev/null
|
| +++ b/mojo/system/local_message_pipe_endpoint.cc
|
| @@ -0,0 +1,160 @@
|
| +// Copyright 2013 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "mojo/system/local_message_pipe_endpoint.h"
|
| +
|
| +#include <string.h>
|
| +
|
| +#include "base/logging.h"
|
| +#include "base/stl_util.h"
|
| +#include "mojo/system/message_in_transit.h"
|
| +
|
| +namespace mojo {
|
| +namespace system {
|
| +
|
| +LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
|
| + : is_open_(true),
|
| + is_peer_open_(true) {
|
| +}
|
| +
|
| +LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
|
| + DCHECK(!is_open_);
|
| +}
|
| +
|
| +void LocalMessagePipeEndpoint::OnPeerClose() {
|
| + DCHECK(is_open_);
|
| + DCHECK(is_peer_open_);
|
| +
|
| + MojoWaitFlags old_satisfied_flags = SatisfiedFlags();
|
| + MojoWaitFlags old_satisfiable_flags = SatisfiableFlags();
|
| + is_peer_open_ = false;
|
| + MojoWaitFlags new_satisfied_flags = SatisfiedFlags();
|
| + MojoWaitFlags new_satisfiable_flags = SatisfiableFlags();
|
| +
|
| + if (new_satisfied_flags != old_satisfied_flags ||
|
| + new_satisfiable_flags != old_satisfiable_flags) {
|
| + waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags,
|
| + new_satisfiable_flags);
|
| + }
|
| +}
|
| +
|
| +MojoResult LocalMessagePipeEndpoint::EnqueueMessage(
|
| + const void* bytes, uint32_t num_bytes,
|
| + const MojoHandle* handles, uint32_t num_handles,
|
| + MojoWriteMessageFlags /*flags*/) {
|
| + DCHECK(is_open_);
|
| + DCHECK(is_peer_open_);
|
| +
|
| + bool was_empty = message_queue_.empty();
|
| +
|
| + // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
|
| + message_queue_.push_back(MessageInTransit::Create(bytes, num_bytes));
|
| + // TODO(vtl): Support sending handles.
|
| +
|
| + if (was_empty) {
|
| + waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
|
| + SatisfiableFlags());
|
| + }
|
| +
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +void LocalMessagePipeEndpoint::CancelAllWaiters() {
|
| + DCHECK(is_open_);
|
| + waiter_list_.CancelAllWaiters();
|
| +}
|
| +
|
| +void LocalMessagePipeEndpoint::Close() {
|
| + DCHECK(is_open_);
|
| + is_open_ = false;
|
| + STLDeleteElements(&message_queue_);
|
| +}
|
| +
|
| +MojoResult LocalMessagePipeEndpoint::ReadMessage(
|
| + void* bytes, uint32_t* num_bytes,
|
| + MojoHandle* handles, uint32_t* num_handles,
|
| + MojoReadMessageFlags flags) {
|
| + DCHECK(is_open_);
|
| +
|
| + const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
|
| + // TODO(vtl): We'll need this later:
|
| + // const uint32_t max_handles = num_handles ? *num_handles : 0;
|
| +
|
| + if (message_queue_.empty())
|
| + return MOJO_RESULT_NOT_FOUND;
|
| +
|
| + // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
|
| + // and release the lock immediately.
|
| + bool not_enough_space = false;
|
| + MessageInTransit* const message = message_queue_.front();
|
| + if (num_bytes)
|
| + *num_bytes = message->data_size();
|
| + if (message->data_size() <= max_bytes)
|
| + memcpy(bytes, message->data(), message->data_size());
|
| + else
|
| + not_enough_space = true;
|
| +
|
| + // TODO(vtl): Support receiving handles.
|
| + if (num_handles)
|
| + *num_handles = 0;
|
| +
|
| + if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
|
| + message_queue_.pop_front();
|
| + message->Destroy();
|
| +
|
| + // Now it's empty, thus no longer readable.
|
| + if (message_queue_.empty()) {
|
| + // It's currently not possible to wait for non-readability, but we should
|
| + // do the state change anyway.
|
| + waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
|
| + SatisfiableFlags());
|
| + }
|
| + }
|
| +
|
| + if (not_enough_space)
|
| + return MOJO_RESULT_RESOURCE_EXHAUSTED;
|
| +
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter,
|
| + MojoWaitFlags flags,
|
| + MojoResult wake_result) {
|
| + DCHECK(is_open_);
|
| +
|
| + if ((flags & SatisfiedFlags()))
|
| + return MOJO_RESULT_ALREADY_EXISTS;
|
| + if (!(flags & SatisfiableFlags()))
|
| + return MOJO_RESULT_FAILED_PRECONDITION;
|
| +
|
| + waiter_list_.AddWaiter(waiter, flags, wake_result);
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) {
|
| + DCHECK(is_open_);
|
| + waiter_list_.RemoveWaiter(waiter);
|
| +}
|
| +
|
| +MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() {
|
| + MojoWaitFlags satisfied_flags = 0;
|
| + if (!message_queue_.empty())
|
| + satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
|
| + if (is_peer_open_)
|
| + satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
|
| + return satisfied_flags;
|
| +}
|
| +
|
| +MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() {
|
| + MojoWaitFlags satisfiable_flags = 0;
|
| + if (!message_queue_.empty() || is_peer_open_)
|
| + satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
|
| + if (is_peer_open_)
|
| + satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
|
| + return satisfiable_flags;
|
| +}
|
| +
|
| +} // namespace system
|
| +} // namespace mojo
|
| +
|
|
|