Index: mojo/system/local_message_pipe_endpoint.cc |
diff --git a/mojo/system/local_message_pipe_endpoint.cc b/mojo/system/local_message_pipe_endpoint.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..2dafe0a6d2ab1b802d1b8b3348c3790e2046031b |
--- /dev/null |
+++ b/mojo/system/local_message_pipe_endpoint.cc |
@@ -0,0 +1,160 @@ |
+// Copyright 2013 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/system/local_message_pipe_endpoint.h" |
+ |
+#include <string.h> |
+ |
+#include "base/logging.h" |
+#include "base/stl_util.h" |
+#include "mojo/system/message_in_transit.h" |
+ |
+namespace mojo { |
+namespace system { |
+ |
+LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() |
+ : is_open_(true), |
+ is_peer_open_(true) { |
+} |
+ |
+LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() { |
+ DCHECK(!is_open_); |
+} |
+ |
+void LocalMessagePipeEndpoint::OnPeerClose() { |
+ DCHECK(is_open_); |
+ DCHECK(is_peer_open_); |
+ |
+ MojoWaitFlags old_satisfied_flags = SatisfiedFlags(); |
+ MojoWaitFlags old_satisfiable_flags = SatisfiableFlags(); |
+ is_peer_open_ = false; |
+ MojoWaitFlags new_satisfied_flags = SatisfiedFlags(); |
+ MojoWaitFlags new_satisfiable_flags = SatisfiableFlags(); |
+ |
+ if (new_satisfied_flags != old_satisfied_flags || |
+ new_satisfiable_flags != old_satisfiable_flags) { |
+ waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags, |
+ new_satisfiable_flags); |
+ } |
+} |
+ |
+MojoResult LocalMessagePipeEndpoint::EnqueueMessage( |
+ const void* bytes, uint32_t num_bytes, |
+ const MojoHandle* handles, uint32_t num_handles, |
+ MojoWriteMessageFlags /*flags*/) { |
+ DCHECK(is_open_); |
+ DCHECK(is_peer_open_); |
+ |
+ bool was_empty = message_queue_.empty(); |
+ |
+ // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|. |
+ message_queue_.push_back(MessageInTransit::Create(bytes, num_bytes)); |
+ // TODO(vtl): Support sending handles. |
+ |
+ if (was_empty) { |
+ waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), |
+ SatisfiableFlags()); |
+ } |
+ |
+ return MOJO_RESULT_OK; |
+} |
+ |
+void LocalMessagePipeEndpoint::CancelAllWaiters() { |
+ DCHECK(is_open_); |
+ waiter_list_.CancelAllWaiters(); |
+} |
+ |
+void LocalMessagePipeEndpoint::Close() { |
+ DCHECK(is_open_); |
+ is_open_ = false; |
+ STLDeleteElements(&message_queue_); |
+} |
+ |
+MojoResult LocalMessagePipeEndpoint::ReadMessage( |
+ void* bytes, uint32_t* num_bytes, |
+ MojoHandle* handles, uint32_t* num_handles, |
+ MojoReadMessageFlags flags) { |
+ DCHECK(is_open_); |
+ |
+ const uint32_t max_bytes = num_bytes ? *num_bytes : 0; |
+ // TODO(vtl): We'll need this later: |
+ // const uint32_t max_handles = num_handles ? *num_handles : 0; |
+ |
+ if (message_queue_.empty()) |
+ return MOJO_RESULT_NOT_FOUND; |
+ |
+ // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
+ // and release the lock immediately. |
+ bool not_enough_space = false; |
+ MessageInTransit* const message = message_queue_.front(); |
+ if (num_bytes) |
+ *num_bytes = message->data_size(); |
+ if (message->data_size() <= max_bytes) |
+ memcpy(bytes, message->data(), message->data_size()); |
+ else |
+ not_enough_space = true; |
+ |
+ // TODO(vtl): Support receiving handles. |
+ if (num_handles) |
+ *num_handles = 0; |
+ |
+ if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { |
+ message_queue_.pop_front(); |
+ message->Destroy(); |
+ |
+ // Now it's empty, thus no longer readable. |
+ if (message_queue_.empty()) { |
+ // It's currently not possible to wait for non-readability, but we should |
+ // do the state change anyway. |
+ waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), |
+ SatisfiableFlags()); |
+ } |
+ } |
+ |
+ if (not_enough_space) |
+ return MOJO_RESULT_RESOURCE_EXHAUSTED; |
+ |
+ return MOJO_RESULT_OK; |
+} |
+ |
+MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter, |
+ MojoWaitFlags flags, |
+ MojoResult wake_result) { |
+ DCHECK(is_open_); |
+ |
+ if ((flags & SatisfiedFlags())) |
+ return MOJO_RESULT_ALREADY_EXISTS; |
+ if (!(flags & SatisfiableFlags())) |
+ return MOJO_RESULT_FAILED_PRECONDITION; |
+ |
+ waiter_list_.AddWaiter(waiter, flags, wake_result); |
+ return MOJO_RESULT_OK; |
+} |
+ |
+void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) { |
+ DCHECK(is_open_); |
+ waiter_list_.RemoveWaiter(waiter); |
+} |
+ |
+MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() { |
+ MojoWaitFlags satisfied_flags = 0; |
+ if (!message_queue_.empty()) |
+ satisfied_flags |= MOJO_WAIT_FLAG_READABLE; |
+ if (is_peer_open_) |
+ satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE; |
+ return satisfied_flags; |
+} |
+ |
+MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() { |
+ MojoWaitFlags satisfiable_flags = 0; |
+ if (!message_queue_.empty() || is_peer_open_) |
+ satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; |
+ if (is_peer_open_) |
+ satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; |
+ return satisfiable_flags; |
+} |
+ |
+} // namespace system |
+} // namespace mojo |
+ |