Index: mojo/edk/system/raw_channel.cc |
diff --git a/third_party/mojo/src/mojo/edk/system/raw_channel.cc b/mojo/edk/system/raw_channel.cc |
similarity index 56% |
copy from third_party/mojo/src/mojo/edk/system/raw_channel.cc |
copy to mojo/edk/system/raw_channel.cc |
index ac9f3ffc32a603119d5d2d0fb21173fac00a8960..8b47d338c7854da8f5286f081044200a36a01e50 100644 |
--- a/third_party/mojo/src/mojo/edk/system/raw_channel.cc |
+++ b/mojo/edk/system/raw_channel.cc |
@@ -12,6 +12,7 @@ |
#include "base/location.h" |
#include "base/logging.h" |
#include "base/message_loop/message_loop.h" |
+#include "mojo/edk/embedder/embedder_internal.h" |
#include "mojo/edk/system/message_in_transit.h" |
#include "mojo/edk/system/transport_data.h" |
@@ -36,8 +37,8 @@ void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { |
// RawChannel::WriteBuffer ----------------------------------------------------- |
-RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) |
- : serialized_platform_handle_size_(serialized_platform_handle_size), |
+RawChannel::WriteBuffer::WriteBuffer() |
+ : serialized_platform_handle_size_(0), |
platform_handles_offset_(0), |
data_offset_(0) { |
} |
@@ -115,6 +116,7 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { |
Buffer buffer = { |
static_cast<const char*>(message->main_buffer()) + data_offset_, |
bytes_to_write}; |
+ |
buffers->push_back(buffer); |
return; |
} |
@@ -128,6 +130,7 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { |
static_cast<const char*>(message->transport_data()->buffer()) + |
(data_offset_ - message->main_buffer_size()), |
bytes_to_write}; |
+ |
buffers->push_back(buffer); |
return; |
} |
@@ -153,41 +156,61 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { |
RawChannel::RawChannel() |
: message_loop_for_io_(nullptr), |
- delegate_(nullptr), |
set_on_shutdown_(nullptr), |
+ delegate_(nullptr), |
+ write_ready_(false), |
write_stopped_(false), |
+ error_occurred_(false), |
weak_ptr_factory_(this) { |
+ read_buffer_.reset(new ReadBuffer); |
+ write_buffer_.reset(new WriteBuffer()); |
} |
RawChannel::~RawChannel() { |
DCHECK(!read_buffer_); |
DCHECK(!write_buffer_); |
- // No need to take the |write_lock_| here -- if there are still weak pointers |
- // outstanding, then we're hosed anyway (since we wouldn't be able to |
- // invalidate them cleanly, since we might not be on the I/O thread). |
- DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
+ // Only want to decrement counter if Init was called. |
+ if (message_loop_for_io_) { |
+ // No need to take the |write_lock_| here -- if there are still weak |
+ // pointers outstanding, then we're hosed anyway (since we wouldn't be able |
+ // to invalidate them cleanly, since we might not be on the I/O thread). |
+ // DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
+ embedder::internal::ChannelShutdown(); |
+ } |
} |
void RawChannel::Init(Delegate* delegate) { |
+ embedder::internal::ChannelStarted(); |
DCHECK(delegate); |
+ base::AutoLock read_locker(read_lock_); |
+ // solves race where initialiing on io thread while main thread is serializing |
+ // this channel and releases handle. |
+ base::AutoLock locker(write_lock_); |
+ |
DCHECK(!delegate_); |
delegate_ = delegate; |
- CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); |
+ //CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); |
DCHECK(!message_loop_for_io_); |
message_loop_for_io_ = |
static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); |
- // No need to take the lock. No one should be using us yet. |
- DCHECK(!read_buffer_); |
- read_buffer_.reset(new ReadBuffer); |
- DCHECK(!write_buffer_); |
- write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize())); |
- |
OnInit(); |
+ // Although this means that we can call back sync into the caller, that's |
+ // easier than posting a task to do this, because there might also be pending |
+ // read calls and we can't modify the buffer. |
+ if (read_buffer_->num_valid_bytes()) { |
+ // We had serialized read buffer data through SetInitialReadBufferData call. |
+ // Make sure we read messages out of it now, otherwise the delegate won't |
+ // get notified if no other data gets written to the pipe. |
+ bool did_dispatch_message = false; |
+ bool stop_dispatching = false; |
+ DispatchMessages(&did_dispatch_message, &stop_dispatching); |
+ } |
+ |
IOResult io_result = ScheduleRead(); |
if (io_result != IO_PENDING) { |
// This will notify the delegate about the read failure. Although we're on |
@@ -198,15 +221,40 @@ void RawChannel::Init(Delegate* delegate) { |
} |
// Note: |ScheduleRead()| failure is treated as a read failure (by notifying |
// the delegate), not an initialization failure. |
+ |
+ write_ready_ = true; |
+ write_buffer_->serialized_platform_handle_size_ = |
+ GetSerializedPlatformHandleSize(); |
+ if (!write_buffer_->message_queue_.IsEmpty()) |
+ SendQueuedMessagesNoLock(); |
} |
void RawChannel::Shutdown() { |
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ weak_ptr_factory_.InvalidateWeakPtrs(); |
+ // Normally, we want to flush any pending writes before shutting down. This |
+ // doesn't apply when 1) we don't have a handle (for obvious reasons) or |
+ // 2) when the other side already quit and asked us to close the handle to |
+ // ensure that we read everything out of the pipe first. |
+ if (!HandleForDebuggingNoLock().is_valid() || error_occurred_) { |
+ { |
+ base::AutoLock read_locker(read_lock_); |
+ base::AutoLock locker(write_lock_); |
+ OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); |
+ } |
+ delete this; |
+ return; |
+ } |
+ |
+ base::AutoLock read_locker(read_lock_); |
base::AutoLock locker(write_lock_); |
+ DCHECK(read_buffer_->num_valid_bytes() == 0) << |
+ "RawChannel::Shutdown called but there is pending data to be read"; |
- LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty()) |
- << "Shutting down RawChannel with write buffer nonempty"; |
+ // happens on shutdown if didn't call init when doing createduplicate |
+ if (message_loop_for_io()) { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ } |
// Reset the delegate so that it won't receive further calls. |
delegate_ = nullptr; |
@@ -214,33 +262,79 @@ void RawChannel::Shutdown() { |
*set_on_shutdown_ = true; |
set_on_shutdown_ = nullptr; |
} |
- write_stopped_ = true; |
- weak_ptr_factory_.InvalidateWeakPtrs(); |
- OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); |
+ // TODO(jam): probably remove this since it doesn't make sense now that we |
+ // wait and flush pending messages. |
+ // write_stopped_ = true; |
+ |
+ |
+ bool empty = write_buffer_->message_queue_.IsEmpty(); |
+ |
+ // We may have no messages to write. However just because our end of the pipe |
+ // wrote everything doesn't mean that the other end read it. We don't want to |
+ // call FlushFileBuffers since a) that only works for server end of the pipe, |
+ // and b) it pauses this thread (which can block a process on another, or |
+ // worse hang if both pipes are in the same process). |
+ scoped_ptr<MessageInTransit> quit_message(new MessageInTransit( |
+ MessageInTransit::Type::RAW_CHANNEL_QUIT, 0, nullptr)); |
+ EnqueueMessageNoLock(quit_message.Pass()); |
+ |
+ if (empty) |
+ SendQueuedMessagesNoLock(); |
+} |
+ |
+embedder::ScopedPlatformHandle RawChannel::ReleaseHandle( |
+ std::vector<char>* read_buffer) { |
+ //LOG(ERROR) << "RawChannel::ReleaseHandle( " << this; |
+ |
+ embedder::ScopedPlatformHandle rv; |
+ { |
+ base::AutoLock read_locker(read_lock_); |
+ base::AutoLock locker(write_lock_); |
+ rv = ReleaseHandleNoLock(read_buffer); |
+ |
+ // TODO(jam); if we use these, use nolock versions of these methods that are |
+ // copied. |
+ if (write_buffer_.get() && !write_buffer_->message_queue_.IsEmpty()) { |
+ NOTREACHED() << "TODO(JAM)"; |
+ } |
+ |
+ delegate_ = nullptr; |
+ |
+ // The Unretained is safe because above cancelled IO so we shouldn't get any |
+ // channel errors. |
+ // |message_loop_for_io_| might not be set yet |
+ embedder::internal::g_io_thread_task_runner->PostTask( |
+ FROM_HERE, |
+ base::Bind(&RawChannel::Shutdown, base::Unretained(this))); |
+ } |
+ |
+ return rv; |
} |
// Reminder: This must be thread-safe. |
bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
DCHECK(message); |
- |
base::AutoLock locker(write_lock_); |
if (write_stopped_) |
return false; |
- if (!write_buffer_->message_queue_.IsEmpty()) { |
- EnqueueMessageNoLock(message.Pass()); |
- return true; |
- } |
- |
+ bool queue_was_empty = write_buffer_->message_queue_.IsEmpty(); |
EnqueueMessageNoLock(message.Pass()); |
+ if (queue_was_empty && write_ready_) |
+ SendQueuedMessagesNoLock(); |
+ |
+ return true; |
+} |
+ |
+void RawChannel::SendQueuedMessagesNoLock() { |
DCHECK_EQ(write_buffer_->data_offset_, 0u); |
size_t platform_handles_written = 0; |
size_t bytes_written = 0; |
IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); |
if (io_result == IO_PENDING) |
- return true; |
+ return; |
bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, |
bytes_written); |
@@ -249,11 +343,10 @@ bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
// context. |
message_loop_for_io_->PostTask( |
FROM_HERE, |
- base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(), |
+ base::Bind(&RawChannel::LockAndCallOnError, |
+ weak_ptr_factory_.GetWeakPtr(), |
Delegate::ERROR_WRITE)); |
} |
- |
- return result; |
} |
// Reminder: This must be thread-safe. |
@@ -262,9 +355,24 @@ bool RawChannel::IsWriteBufferEmpty() { |
return write_buffer_->message_queue_.IsEmpty(); |
} |
+bool RawChannel::IsReadBufferEmpty() { |
+ base::AutoLock locker(read_lock_); |
+ return read_buffer_->num_valid_bytes_ != 0; |
+} |
+ |
+void RawChannel::SetInitialReadBufferData(char* data, size_t size) { |
+ base::AutoLock locker(read_lock_); |
+ // TODO(jam): copy power of 2 algorithm below? or share. |
+ read_buffer_->buffer_.resize(size+kReadSize); |
+ memcpy(&read_buffer_->buffer_[0], data, size); |
+ read_buffer_->num_valid_bytes_ = size; |
+} |
+ |
void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { |
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ base::AutoLock locker(read_lock_); |
+ |
// Keep reading data in a loop, and dispatch messages if enough data is |
// received. Exit the loop if any of the following happens: |
// - one or more messages were dispatched; |
@@ -288,93 +396,10 @@ void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { |
// Dispatch all the messages that we can. |
bool did_dispatch_message = false; |
- // Tracks the offset of the first undispatched message in |read_buffer_|. |
- // Currently, we copy data to ensure that this is zero at the beginning. |
- size_t read_buffer_start = 0; |
- size_t remaining_bytes = read_buffer_->num_valid_bytes_; |
- size_t message_size; |
- // Note that we rely on short-circuit evaluation here: |
- // - |read_buffer_start| may be an invalid index into |
- // |read_buffer_->buffer_| if |remaining_bytes| is zero. |
- // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
- // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
- // next read). |
- // TODO(vtl): Validate that |message_size| is sane. |
- while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( |
- &read_buffer_->buffer_[read_buffer_start], |
- remaining_bytes, &message_size) && |
- remaining_bytes >= message_size) { |
- MessageInTransit::View message_view( |
- message_size, &read_buffer_->buffer_[read_buffer_start]); |
- DCHECK_EQ(message_view.total_size(), message_size); |
- |
- const char* error_message = nullptr; |
- if (!message_view.IsValid(GetSerializedPlatformHandleSize(), |
- &error_message)) { |
- DCHECK(error_message); |
- LOG(ERROR) << "Received invalid message: " << error_message; |
- CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
- return; // |this| may have been destroyed in |CallOnError()|. |
- } |
- |
- if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL) { |
- if (!OnReadMessageForRawChannel(message_view)) { |
- CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
- return; // |this| may have been destroyed in |CallOnError()|. |
- } |
- } else { |
- embedder::ScopedPlatformHandleVectorPtr platform_handles; |
- if (message_view.transport_data_buffer()) { |
- size_t num_platform_handles; |
- const void* platform_handle_table; |
- TransportData::GetPlatformHandleTable( |
- message_view.transport_data_buffer(), &num_platform_handles, |
- &platform_handle_table); |
- |
- if (num_platform_handles > 0) { |
- platform_handles = |
- GetReadPlatformHandles(num_platform_handles, |
- platform_handle_table).Pass(); |
- if (!platform_handles) { |
- LOG(ERROR) << "Invalid number of platform handles received"; |
- CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
- return; // |this| may have been destroyed in |CallOnError()|. |
- } |
- } |
- } |
- |
- // TODO(vtl): In the case that we aren't expecting any platform handles, |
- // for the POSIX implementation, we should confirm that none are stored. |
- |
- // Dispatch the message. |
- // Detect the case when |Shutdown()| is called; subsequent destruction |
- // is also permitted then. |
- bool shutdown_called = false; |
- DCHECK(!set_on_shutdown_); |
- set_on_shutdown_ = &shutdown_called; |
- DCHECK(delegate_); |
- delegate_->OnReadMessage(message_view, platform_handles.Pass()); |
- if (shutdown_called) |
- return; |
- set_on_shutdown_ = nullptr; |
- } |
- |
- did_dispatch_message = true; |
- |
- // Update our state. |
- read_buffer_start += message_size; |
- remaining_bytes -= message_size; |
- } |
- |
- if (read_buffer_start > 0) { |
- // Move data back to start. |
- read_buffer_->num_valid_bytes_ = remaining_bytes; |
- if (read_buffer_->num_valid_bytes_ > 0) { |
- memmove(&read_buffer_->buffer_[0], |
- &read_buffer_->buffer_[read_buffer_start], remaining_bytes); |
- } |
- read_buffer_start = 0; |
- } |
+ bool stop_dispatching = false; |
+ DispatchMessages(&did_dispatch_message, &stop_dispatching); |
+ if (stop_dispatching) |
+ return; |
if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < |
kReadSize) { |
@@ -425,6 +450,7 @@ void RawChannel::OnWriteCompleted(IOResult io_result, |
} |
if (did_fail) { |
+ base::AutoLock locker(read_lock_); |
yzshen1
2015/09/23 22:47:09
nit: use LoadAndCallOnError instead?
|
CallOnError(Delegate::ERROR_WRITE); |
return; // |this| may have been destroyed in |CallOnError()|. |
} |
@@ -432,18 +458,26 @@ void RawChannel::OnWriteCompleted(IOResult io_result, |
void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { |
write_lock_.AssertAcquired(); |
+ DCHECK(HandleForDebuggingNoLock().is_valid()); |
write_buffer_->message_queue_.AddMessage(message.Pass()); |
} |
bool RawChannel::OnReadMessageForRawChannel( |
const MessageInTransit::View& message_view) { |
+ if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) { |
+ message_loop_for_io_->PostTask( |
+ FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError, |
+ weak_ptr_factory_.GetWeakPtr(), |
+ Delegate::ERROR_READ_SHUTDOWN)); |
+ return true; |
+ } |
+ |
// No non-implementation specific |RawChannel| control messages. |
- LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() |
+ LOG(ERROR) << "Invalid control message (type " << message_view.type() |
<< ")"; |
return false; |
} |
-// static |
RawChannel::Delegate::Error RawChannel::ReadIOResultToError( |
IOResult io_result) { |
switch (io_result) { |
@@ -463,13 +497,24 @@ RawChannel::Delegate::Error RawChannel::ReadIOResultToError( |
void RawChannel::CallOnError(Delegate::Error error) { |
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
- // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
+ read_lock_.AssertAcquired(); |
+ error_occurred_ = true; |
if (delegate_) { |
delegate_->OnError(error); |
- return; // |this| may have been destroyed in |OnError()|. |
+ } else { |
+ // We depend on delegate to delete since it could be waiting to call |
+ // ReleaseHandle. |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); |
} |
} |
+void RawChannel::LockAndCallOnError(Delegate::Error error) { |
+ base::AutoLock locker(read_lock_); |
+ CallOnError(error); |
+} |
+ |
bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, |
size_t platform_handles_written, |
size_t bytes_written) { |
@@ -508,5 +553,107 @@ bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, |
return false; |
} |
+void RawChannel::DispatchMessages(bool* did_dispatch_message, |
+ bool* stop_dispatching) { |
+ *did_dispatch_message = false; |
+ *stop_dispatching = false; |
+ // Tracks the offset of the first undispatched message in |read_buffer_|. |
+ // Currently, we copy data to ensure that this is zero at the beginning. |
+ size_t read_buffer_start = 0; |
+ size_t remaining_bytes = read_buffer_->num_valid_bytes_; |
+ size_t message_size; |
+ // Note that we rely on short-circuit evaluation here: |
+ // - |read_buffer_start| may be an invalid index into |
+ // |read_buffer_->buffer_| if |remaining_bytes| is zero. |
+ // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
+ // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
+ // next read). |
+ // TODO(vtl): Validate that |message_size| is sane. |
+ while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( |
+ &read_buffer_->buffer_[read_buffer_start], |
+ remaining_bytes, &message_size) && |
+ remaining_bytes >= message_size) { |
+ MessageInTransit::View message_view( |
+ message_size, &read_buffer_->buffer_[read_buffer_start]); |
+ DCHECK_EQ(message_view.total_size(), message_size); |
+ |
+ const char* error_message = nullptr; |
+ if (!message_view.IsValid(GetSerializedPlatformHandleSize(), |
+ &error_message)) { |
+ DCHECK(error_message); |
+ LOG(ERROR) << "Received invalid message: " << error_message; |
+ CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
+ *stop_dispatching = true; |
+ return; // |this| may have been destroyed in |CallOnError()|. |
+ } |
+ |
+ if (message_view.type() != MessageInTransit::Type::MESSAGE) { |
+ if (!OnReadMessageForRawChannel(message_view)) { |
+ CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
+ *stop_dispatching = true; |
+ return; // |this| may have been destroyed in |CallOnError()|. |
+ } |
+ } else { |
+ embedder::ScopedPlatformHandleVectorPtr platform_handles; |
+ if (message_view.transport_data_buffer()) { |
+ size_t num_platform_handles; |
+ const void* platform_handle_table; |
+ TransportData::GetPlatformHandleTable( |
+ message_view.transport_data_buffer(), &num_platform_handles, |
+ &platform_handle_table); |
+ |
+ if (num_platform_handles > 0) { |
+ platform_handles = |
+ GetReadPlatformHandles(num_platform_handles, |
+ platform_handle_table).Pass(); |
+ if (!platform_handles) { |
+ LOG(ERROR) << "Invalid number of platform handles received"; |
+ CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
+ *stop_dispatching = true; |
+ return; // |this| may have been destroyed in |CallOnError()|. |
+ } |
+ } |
+ } |
+ |
+ // TODO(vtl): In the case that we aren't expecting any platform handles, |
+ // for the POSIX implementation, we should confirm that none are stored. |
+ |
+ // Dispatch the message. |
+ // Detect the case when |Shutdown()| is called; subsequent destruction |
+ // is also permitted then. |
+ bool shutdown_called = false; |
yzshen1
2015/09/23 22:47:09
Shutdown() is no longer possible to be called from
|
+ DCHECK(!set_on_shutdown_); |
+ set_on_shutdown_ = &shutdown_called; |
+ // Note: it's valid to get here without a delegate. i.e. after Shutdown |
+ // is called, if this object still has a valid handle we keep it alive |
+ // until the other side closes it in response to the RAW_CHANNEL_QUIT |
+ // message. In the meantime the sender could have sent us a message. |
+ if (delegate_) |
+ delegate_->OnReadMessage(message_view, platform_handles.Pass()); |
+ if (shutdown_called) { |
+ *stop_dispatching = true; |
+ return; |
+ } |
+ set_on_shutdown_ = nullptr; |
+ } |
+ |
+ *did_dispatch_message = true; |
+ |
+ // Update our state. |
+ read_buffer_start += message_size; |
+ remaining_bytes -= message_size; |
+ } |
+ |
+ if (read_buffer_start > 0) { |
+ // Move data back to start. |
+ read_buffer_->num_valid_bytes_ = remaining_bytes; |
+ if (read_buffer_->num_valid_bytes_ > 0) { |
+ memmove(&read_buffer_->buffer_[0], |
+ &read_buffer_->buffer_[read_buffer_start], remaining_bytes); |
+ } |
+ read_buffer_start = 0; |
+ } |
+} |
+ |
} // namespace system |
} // namespace mojo |