Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(142)

Unified Diff: ipc/mojo/ipc_message_pipe_reader.cc

Issue 382333002: Introduce ChannelMojo (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: ipc/mojo/ipc_message_pipe_reader.cc
diff --git a/ipc/mojo/ipc_message_pipe_reader.cc b/ipc/mojo/ipc_message_pipe_reader.cc
new file mode 100644
index 0000000000000000000000000000000000000000..91022ac7f7be7d5617beec9b67fa0b8b8dd3cc9a
--- /dev/null
+++ b/ipc/mojo/ipc_message_pipe_reader.cc
@@ -0,0 +1,144 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/mojo/ipc_message_pipe_reader.h"
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop_proxy.h"
+#include "mojo/public/cpp/environment/environment.h"
+
+namespace IPC {
+namespace internal {
+
+MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle)
+ : pipe_wait_id_(0),
+ pipe_(handle.Pass()) {
+ StartWaiting();
+}
+
+MessagePipeReader::~MessagePipeReader() {
+ CHECK(!IsValid());
+}
+
+void MessagePipeReader::Close() {
+ StopWaiting();
+ pipe_.reset();
+ OnPipeClosed();
+}
+
+void MessagePipeReader::CloseWithError(MojoResult error) {
+ OnPipeError(error);
+ Close();
+}
+
+// static
+void MessagePipeReader::InvokePipeIsReady(void* closure, MojoResult result) {
+ reinterpret_cast<MessagePipeReader*>(closure)->PipeIsReady(result);
+}
+
+void MessagePipeReader::StartWaiting() {
+ DCHECK(pipe_.is_valid());
+ DCHECK(!pipe_wait_id_);
+ // Not using MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
+ // MessagePipe.
+ //
+ // TODO(morrita): Should we re-set the signal when we get new
+ // message to send?
+ pipe_wait_id_ = mojo::Environment::GetDefaultAsyncWaiter()->AsyncWait(
+ pipe_.get().value(),
+ MOJO_HANDLE_SIGNAL_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ &InvokePipeIsReady,
+ this);
+}
+
+void MessagePipeReader::StopWaiting() {
+ if (!pipe_wait_id_)
+ return;
+ mojo::Environment::GetDefaultAsyncWaiter()->CancelWait(pipe_wait_id_);
+ pipe_wait_id_ = 0;
+}
+
+void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
+ pipe_wait_id_ = 0;
+
+ if (wait_result != MOJO_RESULT_OK) {
+ // FAILED_PRECONDITION happens when the pipe is
+ // closed before the waiter is scheduled in a backend thread.
+ if (wait_result != MOJO_RESULT_ABORTED &&
+ wait_result != MOJO_RESULT_FAILED_PRECONDITION) {
+ DLOG(WARNING) << "Pipe got error from the waiter. Closing: "
+ << wait_result;
+ OnPipeError(wait_result);
+ }
+
+ Close();
+ return;
+ }
+
+ while (pipe_.is_valid()) {
+ MojoResult read_result = ReadMessageBytes();
+ if (read_result == MOJO_RESULT_SHOULD_WAIT)
+ break;
+ if (read_result != MOJO_RESULT_OK) {
+ // FAILED_PRECONDITION means that all the received messages
+ // got consumed and the peer is already closed.
+ if (read_result != MOJO_RESULT_FAILED_PRECONDITION) {
+ DLOG(WARNING)
+ << "Pipe got error from ReadMessage(). Closing: " << read_result;
+ OnPipeError(read_result);
+ }
+
+ Close();
+ break;
+ }
+
+ OnMessageReceived();
+ }
+
+ if (pipe_.is_valid())
+ StartWaiting();
+}
+
+MojoResult MessagePipeReader::ReadMessageBytes() {
+ DCHECK(handle_buffer_.empty());
+
+ uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
+ uint32_t num_handles = 0;
+ MojoResult result = MojoReadMessage(pipe_.get().value(),
+ num_bytes ? &data_buffer_[0] : NULL,
+ &num_bytes,
+ NULL,
+ &num_handles,
+ MOJO_READ_MESSAGE_FLAG_NONE);
+ data_buffer_.resize(num_bytes);
+ handle_buffer_.resize(num_handles);
+ if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
+ // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
+ // it needs more bufer. So we re-read it with resized buffers.
+ result = MojoReadMessage(pipe_.get().value(),
+ num_bytes ? &data_buffer_[0] : NULL,
+ &num_bytes,
+ num_handles ? &handle_buffer_[0] : NULL,
+ &num_handles,
+ MOJO_READ_MESSAGE_FLAG_NONE);
+ }
+
+ DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
+ DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
+ return result;
+}
+
+void MessagePipeReader::DelayedDeleter::operator()(
+ MessagePipeReader* ptr) const {
+ ptr->Close();
+ base::MessageLoopProxy::current()->PostTask(
+ FROM_HERE, base::Bind(&DeleteNow, ptr));
+}
+
+} // namespace internal
+} // namespace IPC

Powered by Google App Engine
This is Rietveld 408576698