| Index: mojo/edk/system/raw_channel.cc
|
| diff --git a/third_party/mojo/src/mojo/edk/system/raw_channel.cc b/mojo/edk/system/raw_channel.cc
|
| similarity index 55%
|
| copy from third_party/mojo/src/mojo/edk/system/raw_channel.cc
|
| copy to mojo/edk/system/raw_channel.cc
|
| index ac9f3ffc32a603119d5d2d0fb21173fac00a8960..a5b191be579f6abced8da986cc63ae1a9994120f 100644
|
| --- a/third_party/mojo/src/mojo/edk/system/raw_channel.cc
|
| +++ b/mojo/edk/system/raw_channel.cc
|
| @@ -12,11 +12,12 @@
|
| #include "base/location.h"
|
| #include "base/logging.h"
|
| #include "base/message_loop/message_loop.h"
|
| +#include "mojo/edk/embedder/embedder_internal.h"
|
| #include "mojo/edk/system/message_in_transit.h"
|
| #include "mojo/edk/system/transport_data.h"
|
|
|
| namespace mojo {
|
| -namespace system {
|
| +namespace edk {
|
|
|
| const size_t kReadSize = 4096;
|
|
|
| @@ -36,8 +37,8 @@ void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
|
|
|
| // RawChannel::WriteBuffer -----------------------------------------------------
|
|
|
| -RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size)
|
| - : serialized_platform_handle_size_(serialized_platform_handle_size),
|
| +RawChannel::WriteBuffer::WriteBuffer()
|
| + : serialized_platform_handle_size_(0),
|
| platform_handles_offset_(0),
|
| data_offset_(0) {
|
| }
|
| @@ -55,7 +56,7 @@ bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
|
| if (!transport_data)
|
| return false;
|
|
|
| - const embedder::PlatformHandleVector* all_platform_handles =
|
| + const PlatformHandleVector* all_platform_handles =
|
| transport_data->platform_handles();
|
| if (!all_platform_handles) {
|
| DCHECK_EQ(platform_handles_offset_, 0u);
|
| @@ -71,13 +72,13 @@ bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
|
|
|
| void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
|
| size_t* num_platform_handles,
|
| - embedder::PlatformHandle** platform_handles,
|
| + PlatformHandle** platform_handles,
|
| void** serialization_data) {
|
| DCHECK(HavePlatformHandlesToSend());
|
|
|
| MessageInTransit* message = message_queue_.PeekMessage();
|
| TransportData* transport_data = message->transport_data();
|
| - embedder::PlatformHandleVector* all_platform_handles =
|
| + PlatformHandleVector* all_platform_handles =
|
| transport_data->platform_handles();
|
| *num_platform_handles =
|
| all_platform_handles->size() - platform_handles_offset_;
|
| @@ -115,6 +116,7 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
|
| Buffer buffer = {
|
| static_cast<const char*>(message->main_buffer()) + data_offset_,
|
| bytes_to_write};
|
| +
|
| buffers->push_back(buffer);
|
| return;
|
| }
|
| @@ -128,6 +130,7 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
|
| static_cast<const char*>(message->transport_data()->buffer()) +
|
| (data_offset_ - message->main_buffer_size()),
|
| bytes_to_write};
|
| +
|
| buffers->push_back(buffer);
|
| return;
|
| }
|
| @@ -153,41 +156,61 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
|
|
|
| RawChannel::RawChannel()
|
| : message_loop_for_io_(nullptr),
|
| - delegate_(nullptr),
|
| set_on_shutdown_(nullptr),
|
| + delegate_(nullptr),
|
| + write_ready_(false),
|
| write_stopped_(false),
|
| + error_occurred_(false),
|
| weak_ptr_factory_(this) {
|
| + read_buffer_.reset(new ReadBuffer);
|
| + write_buffer_.reset(new WriteBuffer());
|
| }
|
|
|
| RawChannel::~RawChannel() {
|
| DCHECK(!read_buffer_);
|
| DCHECK(!write_buffer_);
|
|
|
| - // No need to take the |write_lock_| here -- if there are still weak pointers
|
| - // outstanding, then we're hosed anyway (since we wouldn't be able to
|
| - // invalidate them cleanly, since we might not be on the I/O thread).
|
| - DCHECK(!weak_ptr_factory_.HasWeakPtrs());
|
| + // Only want to decrement counter if Init was called.
|
| + if (message_loop_for_io_) {
|
| + // No need to take the |write_lock_| here -- if there are still weak
|
| + // pointers outstanding, then we're hosed anyway (since we wouldn't be able
|
| + // to invalidate them cleanly, since we might not be on the I/O thread).
|
| + // DCHECK(!weak_ptr_factory_.HasWeakPtrs());
|
| + internal::ChannelShutdown();
|
| + }
|
| }
|
|
|
| void RawChannel::Init(Delegate* delegate) {
|
| + internal::ChannelStarted();
|
| DCHECK(delegate);
|
|
|
| + base::AutoLock read_locker(read_lock_);
|
| + // solves race where initialiing on io thread while main thread is serializing
|
| + // this channel and releases handle.
|
| + base::AutoLock locker(write_lock_);
|
| +
|
| DCHECK(!delegate_);
|
| delegate_ = delegate;
|
|
|
| - CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
|
| + //CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
|
| DCHECK(!message_loop_for_io_);
|
| message_loop_for_io_ =
|
| static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
|
|
|
| - // No need to take the lock. No one should be using us yet.
|
| - DCHECK(!read_buffer_);
|
| - read_buffer_.reset(new ReadBuffer);
|
| - DCHECK(!write_buffer_);
|
| - write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
|
| -
|
| OnInit();
|
|
|
| + // Although this means that we can call back sync into the caller, that's
|
| + // easier than posting a task to do this, because there might also be pending
|
| + // read calls and we can't modify the buffer.
|
| + if (read_buffer_->num_valid_bytes()) {
|
| + // We had serialized read buffer data through SetInitialReadBufferData call.
|
| + // Make sure we read messages out of it now, otherwise the delegate won't
|
| + // get notified if no other data gets written to the pipe.
|
| + bool did_dispatch_message = false;
|
| + bool stop_dispatching = false;
|
| + DispatchMessages(&did_dispatch_message, &stop_dispatching);
|
| + }
|
| +
|
| IOResult io_result = ScheduleRead();
|
| if (io_result != IO_PENDING) {
|
| // This will notify the delegate about the read failure. Although we're on
|
| @@ -198,15 +221,40 @@ void RawChannel::Init(Delegate* delegate) {
|
| }
|
| // Note: |ScheduleRead()| failure is treated as a read failure (by notifying
|
| // the delegate), not an initialization failure.
|
| +
|
| + write_ready_ = true;
|
| + write_buffer_->serialized_platform_handle_size_ =
|
| + GetSerializedPlatformHandleSize();
|
| + if (!write_buffer_->message_queue_.IsEmpty())
|
| + SendQueuedMessagesNoLock();
|
| }
|
|
|
| void RawChannel::Shutdown() {
|
| - DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| + weak_ptr_factory_.InvalidateWeakPtrs();
|
|
|
| + // Normally, we want to flush any pending writes before shutting down. This
|
| + // doesn't apply when 1) we don't have a handle (for obvious reasons) or
|
| + // 2) when the other side already quit and asked us to close the handle to
|
| + // ensure that we read everything out of the pipe first.
|
| + if (!HandleForDebuggingNoLock().is_valid() || error_occurred_) {
|
| + {
|
| + base::AutoLock read_locker(read_lock_);
|
| + base::AutoLock locker(write_lock_);
|
| + OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
|
| + }
|
| + delete this;
|
| + return;
|
| + }
|
| +
|
| + base::AutoLock read_locker(read_lock_);
|
| base::AutoLock locker(write_lock_);
|
| + DCHECK(read_buffer_->num_valid_bytes() == 0) <<
|
| + "RawChannel::Shutdown called but there is pending data to be read";
|
|
|
| - LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty())
|
| - << "Shutting down RawChannel with write buffer nonempty";
|
| + // happens on shutdown if didn't call init when doing createduplicate
|
| + if (message_loop_for_io()) {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| + }
|
|
|
| // Reset the delegate so that it won't receive further calls.
|
| delegate_ = nullptr;
|
| @@ -214,33 +262,79 @@ void RawChannel::Shutdown() {
|
| *set_on_shutdown_ = true;
|
| set_on_shutdown_ = nullptr;
|
| }
|
| - write_stopped_ = true;
|
| - weak_ptr_factory_.InvalidateWeakPtrs();
|
|
|
| - OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
|
| + // TODO(jam): probably remove this since it doesn't make sense now that we
|
| + // wait and flush pending messages.
|
| + // write_stopped_ = true;
|
| +
|
| +
|
| + bool empty = write_buffer_->message_queue_.IsEmpty();
|
| +
|
| + // We may have no messages to write. However just because our end of the pipe
|
| + // wrote everything doesn't mean that the other end read it. We don't want to
|
| + // call FlushFileBuffers since a) that only works for server end of the pipe,
|
| + // and b) it pauses this thread (which can block a process on another, or
|
| + // worse hang if both pipes are in the same process).
|
| + scoped_ptr<MessageInTransit> quit_message(new MessageInTransit(
|
| + MessageInTransit::Type::RAW_CHANNEL_QUIT, 0, nullptr));
|
| + EnqueueMessageNoLock(quit_message.Pass());
|
| +
|
| + if (empty)
|
| + SendQueuedMessagesNoLock();
|
| +}
|
| +
|
| +ScopedPlatformHandle RawChannel::ReleaseHandle(
|
| + std::vector<char>* read_buffer) {
|
| + //LOG(ERROR) << "RawChannel::ReleaseHandle( " << this;
|
| +
|
| + ScopedPlatformHandle rv;
|
| + {
|
| + base::AutoLock read_locker(read_lock_);
|
| + base::AutoLock locker(write_lock_);
|
| + rv = ReleaseHandleNoLock(read_buffer);
|
| +
|
| + // TODO(jam); if we use these, use nolock versions of these methods that are
|
| + // copied.
|
| + if (write_buffer_.get() && !write_buffer_->message_queue_.IsEmpty()) {
|
| + NOTREACHED() << "TODO(JAM)";
|
| + }
|
| +
|
| + delegate_ = nullptr;
|
| +
|
| + // The Unretained is safe because above cancelled IO so we shouldn't get any
|
| + // channel errors.
|
| + // |message_loop_for_io_| might not be set yet
|
| + internal::g_io_thread_task_runner->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&RawChannel::Shutdown, base::Unretained(this)));
|
| + }
|
| +
|
| + return rv;
|
| }
|
|
|
| // Reminder: This must be thread-safe.
|
| bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
|
| DCHECK(message);
|
| -
|
| base::AutoLock locker(write_lock_);
|
| if (write_stopped_)
|
| return false;
|
|
|
| - if (!write_buffer_->message_queue_.IsEmpty()) {
|
| - EnqueueMessageNoLock(message.Pass());
|
| - return true;
|
| - }
|
| -
|
| + bool queue_was_empty = write_buffer_->message_queue_.IsEmpty();
|
| EnqueueMessageNoLock(message.Pass());
|
| + if (queue_was_empty && write_ready_)
|
| + SendQueuedMessagesNoLock();
|
| +
|
| + return true;
|
| +}
|
| +
|
| +void RawChannel::SendQueuedMessagesNoLock() {
|
| DCHECK_EQ(write_buffer_->data_offset_, 0u);
|
|
|
| size_t platform_handles_written = 0;
|
| size_t bytes_written = 0;
|
| IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
|
| if (io_result == IO_PENDING)
|
| - return true;
|
| + return;
|
|
|
| bool result = OnWriteCompletedNoLock(io_result, platform_handles_written,
|
| bytes_written);
|
| @@ -249,11 +343,10 @@ bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
|
| // context.
|
| message_loop_for_io_->PostTask(
|
| FROM_HERE,
|
| - base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(),
|
| + base::Bind(&RawChannel::LockAndCallOnError,
|
| + weak_ptr_factory_.GetWeakPtr(),
|
| Delegate::ERROR_WRITE));
|
| }
|
| -
|
| - return result;
|
| }
|
|
|
| // Reminder: This must be thread-safe.
|
| @@ -262,9 +355,24 @@ bool RawChannel::IsWriteBufferEmpty() {
|
| return write_buffer_->message_queue_.IsEmpty();
|
| }
|
|
|
| +bool RawChannel::IsReadBufferEmpty() {
|
| + base::AutoLock locker(read_lock_);
|
| + return read_buffer_->num_valid_bytes_ != 0;
|
| +}
|
| +
|
| +void RawChannel::SetInitialReadBufferData(char* data, size_t size) {
|
| + base::AutoLock locker(read_lock_);
|
| + // TODO(jam): copy power of 2 algorithm below? or share.
|
| + read_buffer_->buffer_.resize(size+kReadSize);
|
| + memcpy(&read_buffer_->buffer_[0], data, size);
|
| + read_buffer_->num_valid_bytes_ = size;
|
| +}
|
| +
|
| void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
|
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
|
|
| + base::AutoLock locker(read_lock_);
|
| +
|
| // Keep reading data in a loop, and dispatch messages if enough data is
|
| // received. Exit the loop if any of the following happens:
|
| // - one or more messages were dispatched;
|
| @@ -288,93 +396,10 @@ void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
|
|
|
| // Dispatch all the messages that we can.
|
| bool did_dispatch_message = false;
|
| - // Tracks the offset of the first undispatched message in |read_buffer_|.
|
| - // Currently, we copy data to ensure that this is zero at the beginning.
|
| - size_t read_buffer_start = 0;
|
| - size_t remaining_bytes = read_buffer_->num_valid_bytes_;
|
| - size_t message_size;
|
| - // Note that we rely on short-circuit evaluation here:
|
| - // - |read_buffer_start| may be an invalid index into
|
| - // |read_buffer_->buffer_| if |remaining_bytes| is zero.
|
| - // - |message_size| is only valid if |GetNextMessageSize()| returns true.
|
| - // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
|
| - // next read).
|
| - // TODO(vtl): Validate that |message_size| is sane.
|
| - while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
|
| - &read_buffer_->buffer_[read_buffer_start],
|
| - remaining_bytes, &message_size) &&
|
| - remaining_bytes >= message_size) {
|
| - MessageInTransit::View message_view(
|
| - message_size, &read_buffer_->buffer_[read_buffer_start]);
|
| - DCHECK_EQ(message_view.total_size(), message_size);
|
| -
|
| - const char* error_message = nullptr;
|
| - if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
|
| - &error_message)) {
|
| - DCHECK(error_message);
|
| - LOG(ERROR) << "Received invalid message: " << error_message;
|
| - CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
|
| - return; // |this| may have been destroyed in |CallOnError()|.
|
| - }
|
| -
|
| - if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL) {
|
| - if (!OnReadMessageForRawChannel(message_view)) {
|
| - CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
|
| - return; // |this| may have been destroyed in |CallOnError()|.
|
| - }
|
| - } else {
|
| - embedder::ScopedPlatformHandleVectorPtr platform_handles;
|
| - if (message_view.transport_data_buffer()) {
|
| - size_t num_platform_handles;
|
| - const void* platform_handle_table;
|
| - TransportData::GetPlatformHandleTable(
|
| - message_view.transport_data_buffer(), &num_platform_handles,
|
| - &platform_handle_table);
|
| -
|
| - if (num_platform_handles > 0) {
|
| - platform_handles =
|
| - GetReadPlatformHandles(num_platform_handles,
|
| - platform_handle_table).Pass();
|
| - if (!platform_handles) {
|
| - LOG(ERROR) << "Invalid number of platform handles received";
|
| - CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
|
| - return; // |this| may have been destroyed in |CallOnError()|.
|
| - }
|
| - }
|
| - }
|
| -
|
| - // TODO(vtl): In the case that we aren't expecting any platform handles,
|
| - // for the POSIX implementation, we should confirm that none are stored.
|
| -
|
| - // Dispatch the message.
|
| - // Detect the case when |Shutdown()| is called; subsequent destruction
|
| - // is also permitted then.
|
| - bool shutdown_called = false;
|
| - DCHECK(!set_on_shutdown_);
|
| - set_on_shutdown_ = &shutdown_called;
|
| - DCHECK(delegate_);
|
| - delegate_->OnReadMessage(message_view, platform_handles.Pass());
|
| - if (shutdown_called)
|
| - return;
|
| - set_on_shutdown_ = nullptr;
|
| - }
|
| -
|
| - did_dispatch_message = true;
|
| -
|
| - // Update our state.
|
| - read_buffer_start += message_size;
|
| - remaining_bytes -= message_size;
|
| - }
|
| -
|
| - if (read_buffer_start > 0) {
|
| - // Move data back to start.
|
| - read_buffer_->num_valid_bytes_ = remaining_bytes;
|
| - if (read_buffer_->num_valid_bytes_ > 0) {
|
| - memmove(&read_buffer_->buffer_[0],
|
| - &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
|
| - }
|
| - read_buffer_start = 0;
|
| - }
|
| + bool stop_dispatching = false;
|
| + DispatchMessages(&did_dispatch_message, &stop_dispatching);
|
| + if (stop_dispatching)
|
| + return;
|
|
|
| if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
|
| kReadSize) {
|
| @@ -425,6 +450,7 @@ void RawChannel::OnWriteCompleted(IOResult io_result,
|
| }
|
|
|
| if (did_fail) {
|
| + base::AutoLock locker(read_lock_);
|
| CallOnError(Delegate::ERROR_WRITE);
|
| return; // |this| may have been destroyed in |CallOnError()|.
|
| }
|
| @@ -432,18 +458,26 @@ void RawChannel::OnWriteCompleted(IOResult io_result,
|
|
|
| void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
|
| write_lock_.AssertAcquired();
|
| + DCHECK(HandleForDebuggingNoLock().is_valid());
|
| write_buffer_->message_queue_.AddMessage(message.Pass());
|
| }
|
|
|
| bool RawChannel::OnReadMessageForRawChannel(
|
| const MessageInTransit::View& message_view) {
|
| + if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) {
|
| + message_loop_for_io_->PostTask(
|
| + FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError,
|
| + weak_ptr_factory_.GetWeakPtr(),
|
| + Delegate::ERROR_READ_SHUTDOWN));
|
| + return true;
|
| + }
|
| +
|
| // No non-implementation specific |RawChannel| control messages.
|
| - LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype()
|
| + LOG(ERROR) << "Invalid control message (type " << message_view.type()
|
| << ")";
|
| return false;
|
| }
|
|
|
| -// static
|
| RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
|
| IOResult io_result) {
|
| switch (io_result) {
|
| @@ -463,13 +497,24 @@ RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
|
|
|
| void RawChannel::CallOnError(Delegate::Error error) {
|
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| - // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
|
| + read_lock_.AssertAcquired();
|
| + error_occurred_ = true;
|
| if (delegate_) {
|
| delegate_->OnError(error);
|
| - return; // |this| may have been destroyed in |OnError()|.
|
| + } else {
|
| + // We depend on delegate to delete since it could be waiting to call
|
| + // ReleaseHandle.
|
| + base::MessageLoop::current()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
|
| }
|
| }
|
|
|
| +void RawChannel::LockAndCallOnError(Delegate::Error error) {
|
| + base::AutoLock locker(read_lock_);
|
| + CallOnError(error);
|
| +}
|
| +
|
| bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
|
| size_t platform_handles_written,
|
| size_t bytes_written) {
|
| @@ -508,5 +553,107 @@ bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
|
| return false;
|
| }
|
|
|
| -} // namespace system
|
| +void RawChannel::DispatchMessages(bool* did_dispatch_message,
|
| + bool* stop_dispatching) {
|
| + *did_dispatch_message = false;
|
| + *stop_dispatching = false;
|
| + // Tracks the offset of the first undispatched message in |read_buffer_|.
|
| + // Currently, we copy data to ensure that this is zero at the beginning.
|
| + size_t read_buffer_start = 0;
|
| + size_t remaining_bytes = read_buffer_->num_valid_bytes_;
|
| + size_t message_size;
|
| + // Note that we rely on short-circuit evaluation here:
|
| + // - |read_buffer_start| may be an invalid index into
|
| + // |read_buffer_->buffer_| if |remaining_bytes| is zero.
|
| + // - |message_size| is only valid if |GetNextMessageSize()| returns true.
|
| + // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
|
| + // next read).
|
| + // TODO(vtl): Validate that |message_size| is sane.
|
| + while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
|
| + &read_buffer_->buffer_[read_buffer_start],
|
| + remaining_bytes, &message_size) &&
|
| + remaining_bytes >= message_size) {
|
| + MessageInTransit::View message_view(
|
| + message_size, &read_buffer_->buffer_[read_buffer_start]);
|
| + DCHECK_EQ(message_view.total_size(), message_size);
|
| +
|
| + const char* error_message = nullptr;
|
| + if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
|
| + &error_message)) {
|
| + DCHECK(error_message);
|
| + LOG(ERROR) << "Received invalid message: " << error_message;
|
| + CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
|
| + *stop_dispatching = true;
|
| + return; // |this| may have been destroyed in |CallOnError()|.
|
| + }
|
| +
|
| + if (message_view.type() != MessageInTransit::Type::MESSAGE) {
|
| + if (!OnReadMessageForRawChannel(message_view)) {
|
| + CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
|
| + *stop_dispatching = true;
|
| + return; // |this| may have been destroyed in |CallOnError()|.
|
| + }
|
| + } else {
|
| + ScopedPlatformHandleVectorPtr platform_handles;
|
| + if (message_view.transport_data_buffer()) {
|
| + size_t num_platform_handles;
|
| + const void* platform_handle_table;
|
| + TransportData::GetPlatformHandleTable(
|
| + message_view.transport_data_buffer(), &num_platform_handles,
|
| + &platform_handle_table);
|
| +
|
| + if (num_platform_handles > 0) {
|
| + platform_handles =
|
| + GetReadPlatformHandles(num_platform_handles,
|
| + platform_handle_table).Pass();
|
| + if (!platform_handles) {
|
| + LOG(ERROR) << "Invalid number of platform handles received";
|
| + CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
|
| + *stop_dispatching = true;
|
| + return; // |this| may have been destroyed in |CallOnError()|.
|
| + }
|
| + }
|
| + }
|
| +
|
| + // TODO(vtl): In the case that we aren't expecting any platform handles,
|
| + // for the POSIX implementation, we should confirm that none are stored.
|
| +
|
| + // Dispatch the message.
|
| + // Detect the case when |Shutdown()| is called; subsequent destruction
|
| + // is also permitted then.
|
| + bool shutdown_called = false;
|
| + DCHECK(!set_on_shutdown_);
|
| + set_on_shutdown_ = &shutdown_called;
|
| + // Note: it's valid to get here without a delegate. i.e. after Shutdown
|
| + // is called, if this object still has a valid handle we keep it alive
|
| + // until the other side closes it in response to the RAW_CHANNEL_QUIT
|
| + // message. In the meantime the sender could have sent us a message.
|
| + if (delegate_)
|
| + delegate_->OnReadMessage(message_view, platform_handles.Pass());
|
| + if (shutdown_called) {
|
| + *stop_dispatching = true;
|
| + return;
|
| + }
|
| + set_on_shutdown_ = nullptr;
|
| + }
|
| +
|
| + *did_dispatch_message = true;
|
| +
|
| + // Update our state.
|
| + read_buffer_start += message_size;
|
| + remaining_bytes -= message_size;
|
| + }
|
| +
|
| + if (read_buffer_start > 0) {
|
| + // Move data back to start.
|
| + read_buffer_->num_valid_bytes_ = remaining_bytes;
|
| + if (read_buffer_->num_valid_bytes_ > 0) {
|
| + memmove(&read_buffer_->buffer_[0],
|
| + &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
|
| + }
|
| + read_buffer_start = 0;
|
| + }
|
| +}
|
| +
|
| +} // namespace edk
|
| } // namespace mojo
|
|
|