Chromium Code Reviews| Index: mojo/edk/system/raw_channel.cc |
| diff --git a/third_party/mojo/src/mojo/edk/system/raw_channel.cc b/mojo/edk/system/raw_channel.cc |
| similarity index 56% |
| copy from third_party/mojo/src/mojo/edk/system/raw_channel.cc |
| copy to mojo/edk/system/raw_channel.cc |
| index ac9f3ffc32a603119d5d2d0fb21173fac00a8960..8b47d338c7854da8f5286f081044200a36a01e50 100644 |
| --- a/third_party/mojo/src/mojo/edk/system/raw_channel.cc |
| +++ b/mojo/edk/system/raw_channel.cc |
| @@ -12,6 +12,7 @@ |
| #include "base/location.h" |
| #include "base/logging.h" |
| #include "base/message_loop/message_loop.h" |
| +#include "mojo/edk/embedder/embedder_internal.h" |
| #include "mojo/edk/system/message_in_transit.h" |
| #include "mojo/edk/system/transport_data.h" |
| @@ -36,8 +37,8 @@ void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { |
| // RawChannel::WriteBuffer ----------------------------------------------------- |
| -RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) |
| - : serialized_platform_handle_size_(serialized_platform_handle_size), |
| +RawChannel::WriteBuffer::WriteBuffer() |
| + : serialized_platform_handle_size_(0), |
| platform_handles_offset_(0), |
| data_offset_(0) { |
| } |
| @@ -115,6 +116,7 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { |
| Buffer buffer = { |
| static_cast<const char*>(message->main_buffer()) + data_offset_, |
| bytes_to_write}; |
| + |
| buffers->push_back(buffer); |
| return; |
| } |
| @@ -128,6 +130,7 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { |
| static_cast<const char*>(message->transport_data()->buffer()) + |
| (data_offset_ - message->main_buffer_size()), |
| bytes_to_write}; |
| + |
| buffers->push_back(buffer); |
| return; |
| } |
| @@ -153,41 +156,61 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { |
| RawChannel::RawChannel() |
| : message_loop_for_io_(nullptr), |
| - delegate_(nullptr), |
| set_on_shutdown_(nullptr), |
| + delegate_(nullptr), |
| + write_ready_(false), |
| write_stopped_(false), |
| + error_occurred_(false), |
| weak_ptr_factory_(this) { |
| + read_buffer_.reset(new ReadBuffer); |
| + write_buffer_.reset(new WriteBuffer()); |
| } |
| RawChannel::~RawChannel() { |
| DCHECK(!read_buffer_); |
| DCHECK(!write_buffer_); |
| - // No need to take the |write_lock_| here -- if there are still weak pointers |
| - // outstanding, then we're hosed anyway (since we wouldn't be able to |
| - // invalidate them cleanly, since we might not be on the I/O thread). |
| - DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
| + // Only want to decrement counter if Init was called. |
| + if (message_loop_for_io_) { |
| + // No need to take the |write_lock_| here -- if there are still weak |
| + // pointers outstanding, then we're hosed anyway (since we wouldn't be able |
| + // to invalidate them cleanly, since we might not be on the I/O thread). |
| + // DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
| + embedder::internal::ChannelShutdown(); |
| + } |
| } |
| void RawChannel::Init(Delegate* delegate) { |
| + embedder::internal::ChannelStarted(); |
| DCHECK(delegate); |
| + base::AutoLock read_locker(read_lock_); |
| + // solves race where initialiing on io thread while main thread is serializing |
| + // this channel and releases handle. |
| + base::AutoLock locker(write_lock_); |
| + |
| DCHECK(!delegate_); |
| delegate_ = delegate; |
| - CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); |
| + //CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); |
| DCHECK(!message_loop_for_io_); |
| message_loop_for_io_ = |
| static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); |
| - // No need to take the lock. No one should be using us yet. |
| - DCHECK(!read_buffer_); |
| - read_buffer_.reset(new ReadBuffer); |
| - DCHECK(!write_buffer_); |
| - write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize())); |
| - |
| OnInit(); |
| + // Although this means that we can call back sync into the caller, that's |
| + // easier than posting a task to do this, because there might also be pending |
| + // read calls and we can't modify the buffer. |
| + if (read_buffer_->num_valid_bytes()) { |
| + // We had serialized read buffer data through SetInitialReadBufferData call. |
| + // Make sure we read messages out of it now, otherwise the delegate won't |
| + // get notified if no other data gets written to the pipe. |
| + bool did_dispatch_message = false; |
| + bool stop_dispatching = false; |
| + DispatchMessages(&did_dispatch_message, &stop_dispatching); |
| + } |
| + |
| IOResult io_result = ScheduleRead(); |
| if (io_result != IO_PENDING) { |
| // This will notify the delegate about the read failure. Although we're on |
| @@ -198,15 +221,40 @@ void RawChannel::Init(Delegate* delegate) { |
| } |
| // Note: |ScheduleRead()| failure is treated as a read failure (by notifying |
| // the delegate), not an initialization failure. |
| + |
| + write_ready_ = true; |
| + write_buffer_->serialized_platform_handle_size_ = |
| + GetSerializedPlatformHandleSize(); |
| + if (!write_buffer_->message_queue_.IsEmpty()) |
| + SendQueuedMessagesNoLock(); |
| } |
| void RawChannel::Shutdown() { |
| - DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| + weak_ptr_factory_.InvalidateWeakPtrs(); |
| + // Normally, we want to flush any pending writes before shutting down. This |
| + // doesn't apply when 1) we don't have a handle (for obvious reasons) or |
| + // 2) when the other side already quit and asked us to close the handle to |
| + // ensure that we read everything out of the pipe first. |
| + if (!HandleForDebuggingNoLock().is_valid() || error_occurred_) { |
| + { |
| + base::AutoLock read_locker(read_lock_); |
| + base::AutoLock locker(write_lock_); |
| + OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); |
| + } |
| + delete this; |
| + return; |
| + } |
| + |
| + base::AutoLock read_locker(read_lock_); |
| base::AutoLock locker(write_lock_); |
| + DCHECK(read_buffer_->num_valid_bytes() == 0) << |
| + "RawChannel::Shutdown called but there is pending data to be read"; |
| - LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty()) |
| - << "Shutting down RawChannel with write buffer nonempty"; |
| + // happens on shutdown if didn't call init when doing createduplicate |
| + if (message_loop_for_io()) { |
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| + } |
| // Reset the delegate so that it won't receive further calls. |
| delegate_ = nullptr; |
| @@ -214,33 +262,79 @@ void RawChannel::Shutdown() { |
| *set_on_shutdown_ = true; |
| set_on_shutdown_ = nullptr; |
| } |
| - write_stopped_ = true; |
| - weak_ptr_factory_.InvalidateWeakPtrs(); |
| - OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); |
| + // TODO(jam): probably remove this since it doesn't make sense now that we |
| + // wait and flush pending messages. |
| + // write_stopped_ = true; |
| + |
| + |
| + bool empty = write_buffer_->message_queue_.IsEmpty(); |
| + |
| + // We may have no messages to write. However just because our end of the pipe |
| + // wrote everything doesn't mean that the other end read it. We don't want to |
| + // call FlushFileBuffers since a) that only works for server end of the pipe, |
| + // and b) it pauses this thread (which can block a process on another, or |
| + // worse hang if both pipes are in the same process). |
| + scoped_ptr<MessageInTransit> quit_message(new MessageInTransit( |
| + MessageInTransit::Type::RAW_CHANNEL_QUIT, 0, nullptr)); |
| + EnqueueMessageNoLock(quit_message.Pass()); |
| + |
| + if (empty) |
| + SendQueuedMessagesNoLock(); |
| +} |
| + |
| +embedder::ScopedPlatformHandle RawChannel::ReleaseHandle( |
| + std::vector<char>* read_buffer) { |
| + //LOG(ERROR) << "RawChannel::ReleaseHandle( " << this; |
| + |
| + embedder::ScopedPlatformHandle rv; |
| + { |
| + base::AutoLock read_locker(read_lock_); |
| + base::AutoLock locker(write_lock_); |
| + rv = ReleaseHandleNoLock(read_buffer); |
| + |
| + // TODO(jam); if we use these, use nolock versions of these methods that are |
| + // copied. |
| + if (write_buffer_.get() && !write_buffer_->message_queue_.IsEmpty()) { |
| + NOTREACHED() << "TODO(JAM)"; |
| + } |
| + |
| + delegate_ = nullptr; |
| + |
| + // The Unretained is safe because above cancelled IO so we shouldn't get any |
| + // channel errors. |
| + // |message_loop_for_io_| might not be set yet |
| + embedder::internal::g_io_thread_task_runner->PostTask( |
| + FROM_HERE, |
| + base::Bind(&RawChannel::Shutdown, base::Unretained(this))); |
| + } |
| + |
| + return rv; |
| } |
| // Reminder: This must be thread-safe. |
| bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
| DCHECK(message); |
| - |
| base::AutoLock locker(write_lock_); |
| if (write_stopped_) |
| return false; |
| - if (!write_buffer_->message_queue_.IsEmpty()) { |
| - EnqueueMessageNoLock(message.Pass()); |
| - return true; |
| - } |
| - |
| + bool queue_was_empty = write_buffer_->message_queue_.IsEmpty(); |
| EnqueueMessageNoLock(message.Pass()); |
| + if (queue_was_empty && write_ready_) |
| + SendQueuedMessagesNoLock(); |
| + |
| + return true; |
| +} |
| + |
| +void RawChannel::SendQueuedMessagesNoLock() { |
| DCHECK_EQ(write_buffer_->data_offset_, 0u); |
| size_t platform_handles_written = 0; |
| size_t bytes_written = 0; |
| IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); |
| if (io_result == IO_PENDING) |
| - return true; |
| + return; |
| bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, |
| bytes_written); |
| @@ -249,11 +343,10 @@ bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
| // context. |
| message_loop_for_io_->PostTask( |
| FROM_HERE, |
| - base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(), |
| + base::Bind(&RawChannel::LockAndCallOnError, |
| + weak_ptr_factory_.GetWeakPtr(), |
| Delegate::ERROR_WRITE)); |
| } |
| - |
| - return result; |
| } |
| // Reminder: This must be thread-safe. |
| @@ -262,9 +355,24 @@ bool RawChannel::IsWriteBufferEmpty() { |
| return write_buffer_->message_queue_.IsEmpty(); |
| } |
| +bool RawChannel::IsReadBufferEmpty() { |
| + base::AutoLock locker(read_lock_); |
| + return read_buffer_->num_valid_bytes_ != 0; |
| +} |
| + |
| +void RawChannel::SetInitialReadBufferData(char* data, size_t size) { |
| + base::AutoLock locker(read_lock_); |
| + // TODO(jam): copy power of 2 algorithm below? or share. |
| + read_buffer_->buffer_.resize(size+kReadSize); |
| + memcpy(&read_buffer_->buffer_[0], data, size); |
| + read_buffer_->num_valid_bytes_ = size; |
| +} |
| + |
| void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { |
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| + base::AutoLock locker(read_lock_); |
| + |
| // Keep reading data in a loop, and dispatch messages if enough data is |
| // received. Exit the loop if any of the following happens: |
| // - one or more messages were dispatched; |
| @@ -288,93 +396,10 @@ void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { |
| // Dispatch all the messages that we can. |
| bool did_dispatch_message = false; |
| - // Tracks the offset of the first undispatched message in |read_buffer_|. |
| - // Currently, we copy data to ensure that this is zero at the beginning. |
| - size_t read_buffer_start = 0; |
| - size_t remaining_bytes = read_buffer_->num_valid_bytes_; |
| - size_t message_size; |
| - // Note that we rely on short-circuit evaluation here: |
| - // - |read_buffer_start| may be an invalid index into |
| - // |read_buffer_->buffer_| if |remaining_bytes| is zero. |
| - // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
| - // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
| - // next read). |
| - // TODO(vtl): Validate that |message_size| is sane. |
| - while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( |
| - &read_buffer_->buffer_[read_buffer_start], |
| - remaining_bytes, &message_size) && |
| - remaining_bytes >= message_size) { |
| - MessageInTransit::View message_view( |
| - message_size, &read_buffer_->buffer_[read_buffer_start]); |
| - DCHECK_EQ(message_view.total_size(), message_size); |
| - |
| - const char* error_message = nullptr; |
| - if (!message_view.IsValid(GetSerializedPlatformHandleSize(), |
| - &error_message)) { |
| - DCHECK(error_message); |
| - LOG(ERROR) << "Received invalid message: " << error_message; |
| - CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
| - return; // |this| may have been destroyed in |CallOnError()|. |
| - } |
| - |
| - if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL) { |
| - if (!OnReadMessageForRawChannel(message_view)) { |
| - CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
| - return; // |this| may have been destroyed in |CallOnError()|. |
| - } |
| - } else { |
| - embedder::ScopedPlatformHandleVectorPtr platform_handles; |
| - if (message_view.transport_data_buffer()) { |
| - size_t num_platform_handles; |
| - const void* platform_handle_table; |
| - TransportData::GetPlatformHandleTable( |
| - message_view.transport_data_buffer(), &num_platform_handles, |
| - &platform_handle_table); |
| - |
| - if (num_platform_handles > 0) { |
| - platform_handles = |
| - GetReadPlatformHandles(num_platform_handles, |
| - platform_handle_table).Pass(); |
| - if (!platform_handles) { |
| - LOG(ERROR) << "Invalid number of platform handles received"; |
| - CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
| - return; // |this| may have been destroyed in |CallOnError()|. |
| - } |
| - } |
| - } |
| - |
| - // TODO(vtl): In the case that we aren't expecting any platform handles, |
| - // for the POSIX implementation, we should confirm that none are stored. |
| - |
| - // Dispatch the message. |
| - // Detect the case when |Shutdown()| is called; subsequent destruction |
| - // is also permitted then. |
| - bool shutdown_called = false; |
| - DCHECK(!set_on_shutdown_); |
| - set_on_shutdown_ = &shutdown_called; |
| - DCHECK(delegate_); |
| - delegate_->OnReadMessage(message_view, platform_handles.Pass()); |
| - if (shutdown_called) |
| - return; |
| - set_on_shutdown_ = nullptr; |
| - } |
| - |
| - did_dispatch_message = true; |
| - |
| - // Update our state. |
| - read_buffer_start += message_size; |
| - remaining_bytes -= message_size; |
| - } |
| - |
| - if (read_buffer_start > 0) { |
| - // Move data back to start. |
| - read_buffer_->num_valid_bytes_ = remaining_bytes; |
| - if (read_buffer_->num_valid_bytes_ > 0) { |
| - memmove(&read_buffer_->buffer_[0], |
| - &read_buffer_->buffer_[read_buffer_start], remaining_bytes); |
| - } |
| - read_buffer_start = 0; |
| - } |
| + bool stop_dispatching = false; |
| + DispatchMessages(&did_dispatch_message, &stop_dispatching); |
| + if (stop_dispatching) |
| + return; |
| if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < |
| kReadSize) { |
| @@ -425,6 +450,7 @@ void RawChannel::OnWriteCompleted(IOResult io_result, |
| } |
| if (did_fail) { |
| + base::AutoLock locker(read_lock_); |
|
yzshen1
2015/09/23 22:47:09
nit: use LoadAndCallOnError instead?
|
| CallOnError(Delegate::ERROR_WRITE); |
| return; // |this| may have been destroyed in |CallOnError()|. |
| } |
| @@ -432,18 +458,26 @@ void RawChannel::OnWriteCompleted(IOResult io_result, |
| void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { |
| write_lock_.AssertAcquired(); |
| + DCHECK(HandleForDebuggingNoLock().is_valid()); |
| write_buffer_->message_queue_.AddMessage(message.Pass()); |
| } |
| bool RawChannel::OnReadMessageForRawChannel( |
| const MessageInTransit::View& message_view) { |
| + if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) { |
| + message_loop_for_io_->PostTask( |
| + FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError, |
| + weak_ptr_factory_.GetWeakPtr(), |
| + Delegate::ERROR_READ_SHUTDOWN)); |
| + return true; |
| + } |
| + |
| // No non-implementation specific |RawChannel| control messages. |
| - LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() |
| + LOG(ERROR) << "Invalid control message (type " << message_view.type() |
| << ")"; |
| return false; |
| } |
| -// static |
| RawChannel::Delegate::Error RawChannel::ReadIOResultToError( |
| IOResult io_result) { |
| switch (io_result) { |
| @@ -463,13 +497,24 @@ RawChannel::Delegate::Error RawChannel::ReadIOResultToError( |
| void RawChannel::CallOnError(Delegate::Error error) { |
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| - // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
| + read_lock_.AssertAcquired(); |
| + error_occurred_ = true; |
| if (delegate_) { |
| delegate_->OnError(error); |
| - return; // |this| may have been destroyed in |OnError()|. |
| + } else { |
| + // We depend on delegate to delete since it could be waiting to call |
| + // ReleaseHandle. |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); |
| } |
| } |
| +void RawChannel::LockAndCallOnError(Delegate::Error error) { |
| + base::AutoLock locker(read_lock_); |
| + CallOnError(error); |
| +} |
| + |
| bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, |
| size_t platform_handles_written, |
| size_t bytes_written) { |
| @@ -508,5 +553,107 @@ bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, |
| return false; |
| } |
| +void RawChannel::DispatchMessages(bool* did_dispatch_message, |
| + bool* stop_dispatching) { |
| + *did_dispatch_message = false; |
| + *stop_dispatching = false; |
| + // Tracks the offset of the first undispatched message in |read_buffer_|. |
| + // Currently, we copy data to ensure that this is zero at the beginning. |
| + size_t read_buffer_start = 0; |
| + size_t remaining_bytes = read_buffer_->num_valid_bytes_; |
| + size_t message_size; |
| + // Note that we rely on short-circuit evaluation here: |
| + // - |read_buffer_start| may be an invalid index into |
| + // |read_buffer_->buffer_| if |remaining_bytes| is zero. |
| + // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
| + // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
| + // next read). |
| + // TODO(vtl): Validate that |message_size| is sane. |
| + while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( |
| + &read_buffer_->buffer_[read_buffer_start], |
| + remaining_bytes, &message_size) && |
| + remaining_bytes >= message_size) { |
| + MessageInTransit::View message_view( |
| + message_size, &read_buffer_->buffer_[read_buffer_start]); |
| + DCHECK_EQ(message_view.total_size(), message_size); |
| + |
| + const char* error_message = nullptr; |
| + if (!message_view.IsValid(GetSerializedPlatformHandleSize(), |
| + &error_message)) { |
| + DCHECK(error_message); |
| + LOG(ERROR) << "Received invalid message: " << error_message; |
| + CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
| + *stop_dispatching = true; |
| + return; // |this| may have been destroyed in |CallOnError()|. |
| + } |
| + |
| + if (message_view.type() != MessageInTransit::Type::MESSAGE) { |
| + if (!OnReadMessageForRawChannel(message_view)) { |
| + CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
| + *stop_dispatching = true; |
| + return; // |this| may have been destroyed in |CallOnError()|. |
| + } |
| + } else { |
| + embedder::ScopedPlatformHandleVectorPtr platform_handles; |
| + if (message_view.transport_data_buffer()) { |
| + size_t num_platform_handles; |
| + const void* platform_handle_table; |
| + TransportData::GetPlatformHandleTable( |
| + message_view.transport_data_buffer(), &num_platform_handles, |
| + &platform_handle_table); |
| + |
| + if (num_platform_handles > 0) { |
| + platform_handles = |
| + GetReadPlatformHandles(num_platform_handles, |
| + platform_handle_table).Pass(); |
| + if (!platform_handles) { |
| + LOG(ERROR) << "Invalid number of platform handles received"; |
| + CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
| + *stop_dispatching = true; |
| + return; // |this| may have been destroyed in |CallOnError()|. |
| + } |
| + } |
| + } |
| + |
| + // TODO(vtl): In the case that we aren't expecting any platform handles, |
| + // for the POSIX implementation, we should confirm that none are stored. |
| + |
| + // Dispatch the message. |
| + // Detect the case when |Shutdown()| is called; subsequent destruction |
| + // is also permitted then. |
| + bool shutdown_called = false; |
|
yzshen1
2015/09/23 22:47:09
Shutdown() is no longer possible to be called from
|
| + DCHECK(!set_on_shutdown_); |
| + set_on_shutdown_ = &shutdown_called; |
| + // Note: it's valid to get here without a delegate. i.e. after Shutdown |
| + // is called, if this object still has a valid handle we keep it alive |
| + // until the other side closes it in response to the RAW_CHANNEL_QUIT |
| + // message. In the meantime the sender could have sent us a message. |
| + if (delegate_) |
| + delegate_->OnReadMessage(message_view, platform_handles.Pass()); |
| + if (shutdown_called) { |
| + *stop_dispatching = true; |
| + return; |
| + } |
| + set_on_shutdown_ = nullptr; |
| + } |
| + |
| + *did_dispatch_message = true; |
| + |
| + // Update our state. |
| + read_buffer_start += message_size; |
| + remaining_bytes -= message_size; |
| + } |
| + |
| + if (read_buffer_start > 0) { |
| + // Move data back to start. |
| + read_buffer_->num_valid_bytes_ = remaining_bytes; |
| + if (read_buffer_->num_valid_bytes_ > 0) { |
| + memmove(&read_buffer_->buffer_[0], |
| + &read_buffer_->buffer_[read_buffer_start], remaining_bytes); |
| + } |
| + read_buffer_start = 0; |
| + } |
| +} |
| + |
| } // namespace system |
| } // namespace mojo |