Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(902)

Unified Diff: mojo/edk/system/raw_channel.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: convert remaining MP tests and simplify RawChannel destruction Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/raw_channel.cc
diff --git a/third_party/mojo/src/mojo/edk/system/raw_channel.cc b/mojo/edk/system/raw_channel.cc
similarity index 56%
copy from third_party/mojo/src/mojo/edk/system/raw_channel.cc
copy to mojo/edk/system/raw_channel.cc
index ac9f3ffc32a603119d5d2d0fb21173fac00a8960..8b47d338c7854da8f5286f081044200a36a01e50 100644
--- a/third_party/mojo/src/mojo/edk/system/raw_channel.cc
+++ b/mojo/edk/system/raw_channel.cc
@@ -12,6 +12,7 @@
#include "base/location.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
+#include "mojo/edk/embedder/embedder_internal.h"
#include "mojo/edk/system/message_in_transit.h"
#include "mojo/edk/system/transport_data.h"
@@ -36,8 +37,8 @@ void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
// RawChannel::WriteBuffer -----------------------------------------------------
-RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size)
- : serialized_platform_handle_size_(serialized_platform_handle_size),
+RawChannel::WriteBuffer::WriteBuffer()
+ : serialized_platform_handle_size_(0),
platform_handles_offset_(0),
data_offset_(0) {
}
@@ -115,6 +116,7 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
Buffer buffer = {
static_cast<const char*>(message->main_buffer()) + data_offset_,
bytes_to_write};
+
buffers->push_back(buffer);
return;
}
@@ -128,6 +130,7 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
static_cast<const char*>(message->transport_data()->buffer()) +
(data_offset_ - message->main_buffer_size()),
bytes_to_write};
+
buffers->push_back(buffer);
return;
}
@@ -153,41 +156,61 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
RawChannel::RawChannel()
: message_loop_for_io_(nullptr),
- delegate_(nullptr),
set_on_shutdown_(nullptr),
+ delegate_(nullptr),
+ write_ready_(false),
write_stopped_(false),
+ error_occurred_(false),
weak_ptr_factory_(this) {
+ read_buffer_.reset(new ReadBuffer);
+ write_buffer_.reset(new WriteBuffer());
}
RawChannel::~RawChannel() {
DCHECK(!read_buffer_);
DCHECK(!write_buffer_);
- // No need to take the |write_lock_| here -- if there are still weak pointers
- // outstanding, then we're hosed anyway (since we wouldn't be able to
- // invalidate them cleanly, since we might not be on the I/O thread).
- DCHECK(!weak_ptr_factory_.HasWeakPtrs());
+ // Only want to decrement counter if Init was called.
+ if (message_loop_for_io_) {
+ // No need to take the |write_lock_| here -- if there are still weak
+ // pointers outstanding, then we're hosed anyway (since we wouldn't be able
+ // to invalidate them cleanly, since we might not be on the I/O thread).
+ // DCHECK(!weak_ptr_factory_.HasWeakPtrs());
+ embedder::internal::ChannelShutdown();
+ }
}
void RawChannel::Init(Delegate* delegate) {
+ embedder::internal::ChannelStarted();
DCHECK(delegate);
+ base::AutoLock read_locker(read_lock_);
+ // solves race where initialiing on io thread while main thread is serializing
+ // this channel and releases handle.
+ base::AutoLock locker(write_lock_);
+
DCHECK(!delegate_);
delegate_ = delegate;
- CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
+ //CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
DCHECK(!message_loop_for_io_);
message_loop_for_io_ =
static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
- // No need to take the lock. No one should be using us yet.
- DCHECK(!read_buffer_);
- read_buffer_.reset(new ReadBuffer);
- DCHECK(!write_buffer_);
- write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
-
OnInit();
+ // Although this means that we can call back sync into the caller, that's
+ // easier than posting a task to do this, because there might also be pending
+ // read calls and we can't modify the buffer.
+ if (read_buffer_->num_valid_bytes()) {
+ // We had serialized read buffer data through SetInitialReadBufferData call.
+ // Make sure we read messages out of it now, otherwise the delegate won't
+ // get notified if no other data gets written to the pipe.
+ bool did_dispatch_message = false;
+ bool stop_dispatching = false;
+ DispatchMessages(&did_dispatch_message, &stop_dispatching);
+ }
+
IOResult io_result = ScheduleRead();
if (io_result != IO_PENDING) {
// This will notify the delegate about the read failure. Although we're on
@@ -198,15 +221,40 @@ void RawChannel::Init(Delegate* delegate) {
}
// Note: |ScheduleRead()| failure is treated as a read failure (by notifying
// the delegate), not an initialization failure.
+
+ write_ready_ = true;
+ write_buffer_->serialized_platform_handle_size_ =
+ GetSerializedPlatformHandleSize();
+ if (!write_buffer_->message_queue_.IsEmpty())
+ SendQueuedMessagesNoLock();
}
void RawChannel::Shutdown() {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+ weak_ptr_factory_.InvalidateWeakPtrs();
+ // Normally, we want to flush any pending writes before shutting down. This
+ // doesn't apply when 1) we don't have a handle (for obvious reasons) or
+ // 2) when the other side already quit and asked us to close the handle to
+ // ensure that we read everything out of the pipe first.
+ if (!HandleForDebuggingNoLock().is_valid() || error_occurred_) {
+ {
+ base::AutoLock read_locker(read_lock_);
+ base::AutoLock locker(write_lock_);
+ OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
+ }
+ delete this;
+ return;
+ }
+
+ base::AutoLock read_locker(read_lock_);
base::AutoLock locker(write_lock_);
+ DCHECK(read_buffer_->num_valid_bytes() == 0) <<
+ "RawChannel::Shutdown called but there is pending data to be read";
- LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty())
- << "Shutting down RawChannel with write buffer nonempty";
+ // happens on shutdown if didn't call init when doing createduplicate
+ if (message_loop_for_io()) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+ }
// Reset the delegate so that it won't receive further calls.
delegate_ = nullptr;
@@ -214,33 +262,79 @@ void RawChannel::Shutdown() {
*set_on_shutdown_ = true;
set_on_shutdown_ = nullptr;
}
- write_stopped_ = true;
- weak_ptr_factory_.InvalidateWeakPtrs();
- OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
+ // TODO(jam): probably remove this since it doesn't make sense now that we
+ // wait and flush pending messages.
+ // write_stopped_ = true;
+
+
+ bool empty = write_buffer_->message_queue_.IsEmpty();
+
+ // We may have no messages to write. However just because our end of the pipe
+ // wrote everything doesn't mean that the other end read it. We don't want to
+ // call FlushFileBuffers since a) that only works for server end of the pipe,
+ // and b) it pauses this thread (which can block a process on another, or
+ // worse hang if both pipes are in the same process).
+ scoped_ptr<MessageInTransit> quit_message(new MessageInTransit(
+ MessageInTransit::Type::RAW_CHANNEL_QUIT, 0, nullptr));
+ EnqueueMessageNoLock(quit_message.Pass());
+
+ if (empty)
+ SendQueuedMessagesNoLock();
+}
+
+embedder::ScopedPlatformHandle RawChannel::ReleaseHandle(
+ std::vector<char>* read_buffer) {
+ //LOG(ERROR) << "RawChannel::ReleaseHandle( " << this;
+
+ embedder::ScopedPlatformHandle rv;
+ {
+ base::AutoLock read_locker(read_lock_);
+ base::AutoLock locker(write_lock_);
+ rv = ReleaseHandleNoLock(read_buffer);
+
+ // TODO(jam); if we use these, use nolock versions of these methods that are
+ // copied.
+ if (write_buffer_.get() && !write_buffer_->message_queue_.IsEmpty()) {
+ NOTREACHED() << "TODO(JAM)";
+ }
+
+ delegate_ = nullptr;
+
+ // The Unretained is safe because above cancelled IO so we shouldn't get any
+ // channel errors.
+ // |message_loop_for_io_| might not be set yet
+ embedder::internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown, base::Unretained(this)));
+ }
+
+ return rv;
}
// Reminder: This must be thread-safe.
bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
DCHECK(message);
-
base::AutoLock locker(write_lock_);
if (write_stopped_)
return false;
- if (!write_buffer_->message_queue_.IsEmpty()) {
- EnqueueMessageNoLock(message.Pass());
- return true;
- }
-
+ bool queue_was_empty = write_buffer_->message_queue_.IsEmpty();
EnqueueMessageNoLock(message.Pass());
+ if (queue_was_empty && write_ready_)
+ SendQueuedMessagesNoLock();
+
+ return true;
+}
+
+void RawChannel::SendQueuedMessagesNoLock() {
DCHECK_EQ(write_buffer_->data_offset_, 0u);
size_t platform_handles_written = 0;
size_t bytes_written = 0;
IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
if (io_result == IO_PENDING)
- return true;
+ return;
bool result = OnWriteCompletedNoLock(io_result, platform_handles_written,
bytes_written);
@@ -249,11 +343,10 @@ bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
// context.
message_loop_for_io_->PostTask(
FROM_HERE,
- base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(),
+ base::Bind(&RawChannel::LockAndCallOnError,
+ weak_ptr_factory_.GetWeakPtr(),
Delegate::ERROR_WRITE));
}
-
- return result;
}
// Reminder: This must be thread-safe.
@@ -262,9 +355,24 @@ bool RawChannel::IsWriteBufferEmpty() {
return write_buffer_->message_queue_.IsEmpty();
}
+bool RawChannel::IsReadBufferEmpty() {
+ base::AutoLock locker(read_lock_);
+ return read_buffer_->num_valid_bytes_ != 0;
+}
+
+void RawChannel::SetInitialReadBufferData(char* data, size_t size) {
+ base::AutoLock locker(read_lock_);
+ // TODO(jam): copy power of 2 algorithm below? or share.
+ read_buffer_->buffer_.resize(size+kReadSize);
+ memcpy(&read_buffer_->buffer_[0], data, size);
+ read_buffer_->num_valid_bytes_ = size;
+}
+
void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+ base::AutoLock locker(read_lock_);
+
// Keep reading data in a loop, and dispatch messages if enough data is
// received. Exit the loop if any of the following happens:
// - one or more messages were dispatched;
@@ -288,93 +396,10 @@ void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
// Dispatch all the messages that we can.
bool did_dispatch_message = false;
- // Tracks the offset of the first undispatched message in |read_buffer_|.
- // Currently, we copy data to ensure that this is zero at the beginning.
- size_t read_buffer_start = 0;
- size_t remaining_bytes = read_buffer_->num_valid_bytes_;
- size_t message_size;
- // Note that we rely on short-circuit evaluation here:
- // - |read_buffer_start| may be an invalid index into
- // |read_buffer_->buffer_| if |remaining_bytes| is zero.
- // - |message_size| is only valid if |GetNextMessageSize()| returns true.
- // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
- // next read).
- // TODO(vtl): Validate that |message_size| is sane.
- while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
- &read_buffer_->buffer_[read_buffer_start],
- remaining_bytes, &message_size) &&
- remaining_bytes >= message_size) {
- MessageInTransit::View message_view(
- message_size, &read_buffer_->buffer_[read_buffer_start]);
- DCHECK_EQ(message_view.total_size(), message_size);
-
- const char* error_message = nullptr;
- if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
- &error_message)) {
- DCHECK(error_message);
- LOG(ERROR) << "Received invalid message: " << error_message;
- CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
- return; // |this| may have been destroyed in |CallOnError()|.
- }
-
- if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL) {
- if (!OnReadMessageForRawChannel(message_view)) {
- CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
- return; // |this| may have been destroyed in |CallOnError()|.
- }
- } else {
- embedder::ScopedPlatformHandleVectorPtr platform_handles;
- if (message_view.transport_data_buffer()) {
- size_t num_platform_handles;
- const void* platform_handle_table;
- TransportData::GetPlatformHandleTable(
- message_view.transport_data_buffer(), &num_platform_handles,
- &platform_handle_table);
-
- if (num_platform_handles > 0) {
- platform_handles =
- GetReadPlatformHandles(num_platform_handles,
- platform_handle_table).Pass();
- if (!platform_handles) {
- LOG(ERROR) << "Invalid number of platform handles received";
- CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
- return; // |this| may have been destroyed in |CallOnError()|.
- }
- }
- }
-
- // TODO(vtl): In the case that we aren't expecting any platform handles,
- // for the POSIX implementation, we should confirm that none are stored.
-
- // Dispatch the message.
- // Detect the case when |Shutdown()| is called; subsequent destruction
- // is also permitted then.
- bool shutdown_called = false;
- DCHECK(!set_on_shutdown_);
- set_on_shutdown_ = &shutdown_called;
- DCHECK(delegate_);
- delegate_->OnReadMessage(message_view, platform_handles.Pass());
- if (shutdown_called)
- return;
- set_on_shutdown_ = nullptr;
- }
-
- did_dispatch_message = true;
-
- // Update our state.
- read_buffer_start += message_size;
- remaining_bytes -= message_size;
- }
-
- if (read_buffer_start > 0) {
- // Move data back to start.
- read_buffer_->num_valid_bytes_ = remaining_bytes;
- if (read_buffer_->num_valid_bytes_ > 0) {
- memmove(&read_buffer_->buffer_[0],
- &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
- }
- read_buffer_start = 0;
- }
+ bool stop_dispatching = false;
+ DispatchMessages(&did_dispatch_message, &stop_dispatching);
+ if (stop_dispatching)
+ return;
if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
kReadSize) {
@@ -425,6 +450,7 @@ void RawChannel::OnWriteCompleted(IOResult io_result,
}
if (did_fail) {
+ base::AutoLock locker(read_lock_);
yzshen1 2015/09/23 22:47:09 nit: use LoadAndCallOnError instead?
CallOnError(Delegate::ERROR_WRITE);
return; // |this| may have been destroyed in |CallOnError()|.
}
@@ -432,18 +458,26 @@ void RawChannel::OnWriteCompleted(IOResult io_result,
void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
write_lock_.AssertAcquired();
+ DCHECK(HandleForDebuggingNoLock().is_valid());
write_buffer_->message_queue_.AddMessage(message.Pass());
}
bool RawChannel::OnReadMessageForRawChannel(
const MessageInTransit::View& message_view) {
+ if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) {
+ message_loop_for_io_->PostTask(
+ FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError,
+ weak_ptr_factory_.GetWeakPtr(),
+ Delegate::ERROR_READ_SHUTDOWN));
+ return true;
+ }
+
// No non-implementation specific |RawChannel| control messages.
- LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype()
+ LOG(ERROR) << "Invalid control message (type " << message_view.type()
<< ")";
return false;
}
-// static
RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
IOResult io_result) {
switch (io_result) {
@@ -463,13 +497,24 @@ RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
void RawChannel::CallOnError(Delegate::Error error) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
- // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
+ read_lock_.AssertAcquired();
+ error_occurred_ = true;
if (delegate_) {
delegate_->OnError(error);
- return; // |this| may have been destroyed in |OnError()|.
+ } else {
+ // We depend on delegate to delete since it could be waiting to call
+ // ReleaseHandle.
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
}
}
+void RawChannel::LockAndCallOnError(Delegate::Error error) {
+ base::AutoLock locker(read_lock_);
+ CallOnError(error);
+}
+
bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
size_t platform_handles_written,
size_t bytes_written) {
@@ -508,5 +553,107 @@ bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
return false;
}
+void RawChannel::DispatchMessages(bool* did_dispatch_message,
+ bool* stop_dispatching) {
+ *did_dispatch_message = false;
+ *stop_dispatching = false;
+ // Tracks the offset of the first undispatched message in |read_buffer_|.
+ // Currently, we copy data to ensure that this is zero at the beginning.
+ size_t read_buffer_start = 0;
+ size_t remaining_bytes = read_buffer_->num_valid_bytes_;
+ size_t message_size;
+ // Note that we rely on short-circuit evaluation here:
+ // - |read_buffer_start| may be an invalid index into
+ // |read_buffer_->buffer_| if |remaining_bytes| is zero.
+ // - |message_size| is only valid if |GetNextMessageSize()| returns true.
+ // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
+ // next read).
+ // TODO(vtl): Validate that |message_size| is sane.
+ while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
+ &read_buffer_->buffer_[read_buffer_start],
+ remaining_bytes, &message_size) &&
+ remaining_bytes >= message_size) {
+ MessageInTransit::View message_view(
+ message_size, &read_buffer_->buffer_[read_buffer_start]);
+ DCHECK_EQ(message_view.total_size(), message_size);
+
+ const char* error_message = nullptr;
+ if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
+ &error_message)) {
+ DCHECK(error_message);
+ LOG(ERROR) << "Received invalid message: " << error_message;
+ CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
+ *stop_dispatching = true;
+ return; // |this| may have been destroyed in |CallOnError()|.
+ }
+
+ if (message_view.type() != MessageInTransit::Type::MESSAGE) {
+ if (!OnReadMessageForRawChannel(message_view)) {
+ CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
+ *stop_dispatching = true;
+ return; // |this| may have been destroyed in |CallOnError()|.
+ }
+ } else {
+ embedder::ScopedPlatformHandleVectorPtr platform_handles;
+ if (message_view.transport_data_buffer()) {
+ size_t num_platform_handles;
+ const void* platform_handle_table;
+ TransportData::GetPlatformHandleTable(
+ message_view.transport_data_buffer(), &num_platform_handles,
+ &platform_handle_table);
+
+ if (num_platform_handles > 0) {
+ platform_handles =
+ GetReadPlatformHandles(num_platform_handles,
+ platform_handle_table).Pass();
+ if (!platform_handles) {
+ LOG(ERROR) << "Invalid number of platform handles received";
+ CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
+ *stop_dispatching = true;
+ return; // |this| may have been destroyed in |CallOnError()|.
+ }
+ }
+ }
+
+ // TODO(vtl): In the case that we aren't expecting any platform handles,
+ // for the POSIX implementation, we should confirm that none are stored.
+
+ // Dispatch the message.
+ // Detect the case when |Shutdown()| is called; subsequent destruction
+ // is also permitted then.
+ bool shutdown_called = false;
yzshen1 2015/09/23 22:47:09 Shutdown() is no longer possible to be called from
+ DCHECK(!set_on_shutdown_);
+ set_on_shutdown_ = &shutdown_called;
+ // Note: it's valid to get here without a delegate. i.e. after Shutdown
+ // is called, if this object still has a valid handle we keep it alive
+ // until the other side closes it in response to the RAW_CHANNEL_QUIT
+ // message. In the meantime the sender could have sent us a message.
+ if (delegate_)
+ delegate_->OnReadMessage(message_view, platform_handles.Pass());
+ if (shutdown_called) {
+ *stop_dispatching = true;
+ return;
+ }
+ set_on_shutdown_ = nullptr;
+ }
+
+ *did_dispatch_message = true;
+
+ // Update our state.
+ read_buffer_start += message_size;
+ remaining_bytes -= message_size;
+ }
+
+ if (read_buffer_start > 0) {
+ // Move data back to start.
+ read_buffer_->num_valid_bytes_ = remaining_bytes;
+ if (read_buffer_->num_valid_bytes_ > 0) {
+ memmove(&read_buffer_->buffer_[0],
+ &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
+ }
+ read_buffer_start = 0;
+ }
+}
+
} // namespace system
} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698