Index: mojo/system/raw_channel_posix.cc |
diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc |
index fa1a5ce6c02357c9282df974c364b3d9cb5c98b3..00a5448b4f5facb10bb862953b75f6ff3f516a67 100644 |
--- a/mojo/system/raw_channel_posix.cc |
+++ b/mojo/system/raw_channel_posix.cc |
@@ -5,13 +5,10 @@ |
#include "mojo/system/raw_channel.h" |
#include <errno.h> |
-#include <string.h> |
#include <sys/uio.h> |
#include <unistd.h> |
#include <algorithm> |
-#include <deque> |
-#include <vector> |
#include "base/basictypes.h" |
#include "base/bind.h" |
@@ -22,18 +19,14 @@ |
#include "base/memory/weak_ptr.h" |
#include "base/message_loop/message_loop.h" |
#include "base/posix/eintr_wrapper.h" |
-#include "base/stl_util.h" |
#include "base/synchronization/lock.h" |
#include "mojo/system/embedder/platform_handle.h" |
-#include "mojo/system/message_in_transit.h" |
namespace mojo { |
namespace system { |
namespace { |
-const size_t kReadSize = 4096; |
- |
class RawChannelPosix : public RawChannel, |
public base::MessageLoopForIO::Watcher { |
public: |
@@ -42,12 +35,17 @@ class RawChannelPosix : public RawChannel, |
base::MessageLoopForIO* message_loop_for_io); |
virtual ~RawChannelPosix(); |
+ private: |
// |RawChannel| implementation: |
- virtual bool Init() OVERRIDE; |
- virtual void Shutdown() OVERRIDE; |
- virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; |
+ virtual IOResult Read(size_t* bytes_read) OVERRIDE; |
+ virtual IOResult ScheduleRead() OVERRIDE; |
+ virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE; |
+ virtual IOResult ScheduleWriteNoLock() OVERRIDE; |
+ virtual bool OnInit() OVERRIDE; |
+ virtual void OnShutdownNoLock( |
+ scoped_ptr<ReadBuffer> read_buffer, |
+ scoped_ptr<WriteBuffer> write_buffer) OVERRIDE; |
- private: |
// |base::MessageLoopForIO::Watcher| implementation: |
virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; |
virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; |
@@ -55,40 +53,18 @@ class RawChannelPosix : public RawChannel, |
// Watches for |fd_| to become writable. Must be called on the I/O thread. |
void WaitToWrite(); |
- // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O |
- // thread WITHOUT |write_lock_| held. |
- void CallOnFatalError(Delegate::FatalError fatal_error); |
- |
- // Writes the message at the front of |write_message_queue_|, starting at |
- // |write_message_offset_|. It removes and destroys if the write completes and |
- // otherwise updates |write_message_offset_|. Returns true on success. Must be |
- // called under |write_lock_|. |
- bool WriteFrontMessageNoLock(); |
- |
- // Cancels all pending writes and destroys the contents of |
- // |write_message_queue_|. Should only be called if |write_stopped_| is false; |
- // sets |write_stopped_| to true. Must be called under |write_lock_|. |
- void CancelPendingWritesNoLock(); |
- |
embedder::ScopedPlatformHandle fd_; |
- // Only used on the I/O thread: |
+ // The following members are only used on the I/O thread: |
scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; |
scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; |
- // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| |
- // is always aligned with a message boundary (we will copy memory to ensure |
- // this), but |read_buffer_| may be larger than the actual number of bytes we |
- // have. |
- std::vector<char> read_buffer_; |
- size_t read_buffer_num_valid_bytes_; |
- |
- base::Lock write_lock_; // Protects the following members. |
- bool write_stopped_; |
- // TODO(vtl): When C++11 is available, switch this to a deque of |
- // |scoped_ptr|/|unique_ptr|s. |
- std::deque<MessageInTransit*> write_message_queue_; |
- size_t write_message_offset_; |
+ bool pending_read_; |
+ |
+ // The following members are used on multiple threads and protected by |
+ // |write_lock()|: |
+ bool pending_write_; |
+ |
// This is used for posting tasks from write threads to the I/O thread. It |
// must only be accessed under |write_lock_|. The weak pointers it produces |
// are only used/invalidated on the I/O thread. |
@@ -102,20 +78,17 @@ RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, |
base::MessageLoopForIO* message_loop_for_io) |
: RawChannel(delegate, message_loop_for_io), |
fd_(handle.Pass()), |
- read_buffer_num_valid_bytes_(0), |
- write_stopped_(false), |
- write_message_offset_(0), |
+ pending_read_(false), |
+ pending_write_(false), |
weak_ptr_factory_(this) { |
- CHECK_EQ(RawChannel::message_loop_for_io()->type(), |
- base::MessageLoop::TYPE_IO); |
DCHECK(fd_.is_valid()); |
} |
RawChannelPosix::~RawChannelPosix() { |
- DCHECK(write_stopped_); |
- DCHECK(!fd_.is_valid()); |
+ DCHECK(!pending_read_); |
+ DCHECK(!pending_write_); |
- // No need to take the |write_lock_| here -- if there are still weak pointers |
+ // No need to take the |write_lock()| here -- if there are still weak pointers |
// outstanding, then we're hosed anyway (since we wouldn't be able to |
// invalidate them cleanly, since we might not be on the I/O thread). |
DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
@@ -125,324 +98,211 @@ RawChannelPosix::~RawChannelPosix() { |
DCHECK(!write_watcher_.get()); |
} |
-bool RawChannelPosix::Init() { |
+RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { |
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
+ DCHECK(!pending_read_); |
- DCHECK(!read_watcher_.get()); |
- read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
- DCHECK(!write_watcher_.get()); |
- write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
+ char* buffer = NULL; |
+ size_t bytes_to_read = 0; |
+ read_buffer()->GetBuffer(&buffer, &bytes_to_read); |
- // No need to take the lock. No one should be using us yet. |
- DCHECK(write_message_queue_.empty()); |
+ ssize_t read_result = HANDLE_EINTR(read(fd_.get().fd, buffer, bytes_to_read)); |
- if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, |
- base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { |
- // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly |
- // (in the sense of returning the message loop's state to what it was before |
- // it was called). |
+ if (read_result >= 0) { |
+ *bytes_read = static_cast<size_t>(read_result); |
+ return IO_SUCCEEDED; |
+ } |
+ |
+ if (errno != EAGAIN && errno != EWOULDBLOCK) { |
+ PLOG(ERROR) << "read"; |
+ |
+ // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again. |
read_watcher_.reset(); |
- write_watcher_.reset(); |
- return false; |
+ |
+ return IO_FAILED; |
} |
- return true; |
+ return ScheduleRead(); |
} |
-void RawChannelPosix::Shutdown() { |
+RawChannel::IOResult RawChannelPosix::ScheduleRead() { |
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
+ DCHECK(!pending_read_); |
- base::AutoLock locker(write_lock_); |
- if (!write_stopped_) |
- CancelPendingWritesNoLock(); |
+ pending_read_ = true; |
- read_watcher_.reset(); // This will stop watching (if necessary). |
- write_watcher_.reset(); // This will stop watching (if necessary). |
+ return IO_PENDING; |
+} |
- DCHECK(fd_.is_valid()); |
- fd_.reset(); |
+RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) { |
+ write_lock().AssertAcquired(); |
- weak_ptr_factory_.InvalidateWeakPtrs(); |
-} |
+ DCHECK(!pending_write_); |
-// Reminder: This must be thread-safe, and takes ownership of |message|. |
-bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) { |
- base::AutoLock locker(write_lock_); |
- if (write_stopped_) |
- return false; |
+ std::vector<WriteBuffer::Buffer> buffers; |
+ write_buffer_no_lock()->GetBuffers(&buffers); |
+ DCHECK(!buffers.empty()); |
- if (!write_message_queue_.empty()) { |
- write_message_queue_.push_back(message.release()); |
- return true; |
+ ssize_t write_result = -1; |
+ if (buffers.size() == 1) { |
+ write_result = HANDLE_EINTR( |
+ write(fd_.get().fd, buffers[0].addr, buffers[0].size)); |
+ } else { |
+ // Note that using |writev()| is measurably slower than using |write()| -- |
+ // at least in a microbenchmark -- but much faster than using multiple |
+ // |write()|s. |
+ const size_t kMaxBufferCount = 10; |
+ iovec iov[kMaxBufferCount]; |
+ size_t buffer_count = std::min(buffers.size(), kMaxBufferCount); |
+ |
+ for (size_t i = 0; i < buffer_count; ++i) { |
+ iov[i].iov_base = const_cast<char*>(buffers[i].addr); |
+ iov[i].iov_len = buffers[i].size; |
+ } |
+ |
+ write_result = HANDLE_EINTR(writev(fd_.get().fd, iov, buffer_count)); |
} |
- write_message_queue_.push_front(message.release()); |
- DCHECK_EQ(write_message_offset_, 0u); |
- bool result = WriteFrontMessageNoLock(); |
- DCHECK(result || write_message_queue_.empty()); |
+ if (write_result >= 0) { |
+ *bytes_written = static_cast<size_t>(write_result); |
+ return IO_SUCCEEDED; |
+ } |
+ |
+ if (errno != EAGAIN && errno != EWOULDBLOCK) { |
+ PLOG(ERROR) << "write of size " |
+ << write_buffer_no_lock()->GetTotalBytesToWrite(); |
+ return IO_FAILED; |
+ } |
- if (!result) { |
- // Even if we're on the I/O thread, don't call |OnFatalError()| in the |
- // nested context. |
+ return ScheduleWriteNoLock(); |
+} |
+ |
+RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() { |
+ write_lock().AssertAcquired(); |
+ |
+ DCHECK(!pending_write_); |
+ |
+ // Set up to wait for the FD to become writable. |
+ // If we're not on the I/O thread, we have to post a task to do this. |
+ if (base::MessageLoop::current() != message_loop_for_io()) { |
message_loop_for_io()->PostTask( |
FROM_HERE, |
- base::Bind(&RawChannelPosix::CallOnFatalError, |
- weak_ptr_factory_.GetWeakPtr(), |
- Delegate::FATAL_ERROR_FAILED_WRITE)); |
- } else if (!write_message_queue_.empty()) { |
- // Set up to wait for the FD to become writable. If we're not on the I/O |
- // thread, we have to post a task to do this. |
- if (base::MessageLoop::current() == message_loop_for_io()) { |
- WaitToWrite(); |
- } else { |
- message_loop_for_io()->PostTask( |
- FROM_HERE, |
- base::Bind(&RawChannelPosix::WaitToWrite, |
- weak_ptr_factory_.GetWeakPtr())); |
- } |
+ base::Bind(&RawChannelPosix::WaitToWrite, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ pending_write_ = true; |
+ return IO_PENDING; |
+ } |
+ |
+ if (message_loop_for_io()->WatchFileDescriptor( |
+ fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
+ write_watcher_.get(), this)) { |
+ pending_write_ = true; |
+ return IO_PENDING; |
} |
- return result; |
+ return IO_FAILED; |
} |
-void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { |
- DCHECK_EQ(fd, fd_.get().fd); |
+bool RawChannelPosix::OnInit() { |
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
- bool did_dispatch_message = false; |
- // Tracks the offset of the first undispatched message in |read_buffer_|. |
- // Currently, we copy data to ensure that this is zero at the beginning. |
- size_t read_buffer_start = 0; |
- for (;;) { |
- if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) |
- < kReadSize) { |
- // Use power-of-2 buffer sizes. |
- // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
- // maximum message size to whatever extent necessary). |
- // TODO(vtl): We may often be able to peek at the header and get the real |
- // required extra space (which may be much bigger than |kReadSize|). |
- size_t new_size = std::max(read_buffer_.size(), kReadSize); |
- while (new_size < |
- read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) |
- new_size *= 2; |
- |
- // TODO(vtl): It's suboptimal to zero out the fresh memory. |
- read_buffer_.resize(new_size, 0); |
- } |
+ DCHECK(!read_watcher_.get()); |
+ read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
+ DCHECK(!write_watcher_.get()); |
+ write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
- ssize_t bytes_read = HANDLE_EINTR( |
- read(fd_.get().fd, |
- &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], |
- kReadSize)); |
- if (bytes_read < 0) { |
- if (errno != EAGAIN && errno != EWOULDBLOCK) { |
- PLOG(ERROR) << "read"; |
+ if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, |
+ base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { |
+ // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly |
+ // (in the sense of returning the message loop's state to what it was before |
+ // it was called). |
+ read_watcher_.reset(); |
+ write_watcher_.reset(); |
+ return false; |
+ } |
- // Make sure that |OnFileCanReadWithoutBlocking()| won't be called |
- // again. |
- read_watcher_.reset(); |
+ return true; |
+} |
- CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); |
- return; |
- } |
+void RawChannelPosix::OnShutdownNoLock( |
+ scoped_ptr<ReadBuffer> /*read_buffer*/, |
+ scoped_ptr<WriteBuffer> /*write_buffer*/) { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
+ write_lock().AssertAcquired(); |
- break; |
- } |
+ read_watcher_.reset(); // This will stop watching (if necessary). |
+ write_watcher_.reset(); // This will stop watching (if necessary). |
- read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); |
- |
- // Dispatch all the messages that we can. |
- size_t message_size; |
- // Note that we rely on short-circuit evaluation here: |
- // - |read_buffer_start| may be an invalid index into |read_buffer_| if |
- // |read_buffer_num_valid_bytes_| is zero. |
- // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
- // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
- // next read). |
- while (read_buffer_num_valid_bytes_ > 0 && |
- MessageInTransit::GetNextMessageSize( |
- &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_, |
- &message_size) && |
- read_buffer_num_valid_bytes_ >= message_size) { |
- // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with |
- // some sort of "view" abstraction. |
- MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, |
- &read_buffer_[read_buffer_start]); |
- DCHECK_EQ(message.total_size(), message_size); |
- |
- // Dispatch the message. |
- delegate()->OnReadMessage(message); |
- if (!read_watcher_.get()) { |
- // |Shutdown()| was called in |OnReadMessage()|. |
- // TODO(vtl): Add test for this case. |
- return; |
- } |
- did_dispatch_message = true; |
- |
- // Update our state. |
- read_buffer_start += message_size; |
- read_buffer_num_valid_bytes_ -= message_size; |
- } |
+ pending_read_ = false; |
+ pending_write_ = false; |
- // If we dispatched any messages, stop reading for now (and let the message |
- // loop do its thing for another round). |
- // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only |
- // a single message. Risks: slower, more complex if we want to avoid lots of |
- // copying. ii. Keep reading until there's no more data and dispatch all the |
- // messages we can. Risks: starvation of other users of the message loop.) |
- if (did_dispatch_message) |
- break; |
+ DCHECK(fd_.is_valid()); |
+ fd_.reset(); |
- // If we didn't max out |kReadSize|, stop reading for now. |
- if (static_cast<size_t>(bytes_read) < kReadSize) |
- break; |
+ weak_ptr_factory_.InvalidateWeakPtrs(); |
+} |
- // Else try to read some more.... |
- } |
+void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { |
+ DCHECK_EQ(fd, fd_.get().fd); |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
- // Move data back to start. |
- if (read_buffer_start > 0) { |
- if (read_buffer_num_valid_bytes_ > 0) { |
- memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], |
- read_buffer_num_valid_bytes_); |
- } |
- read_buffer_start = 0; |
+ if (!pending_read_) { |
+ NOTREACHED(); |
+ return; |
} |
+ |
+ pending_read_ = false; |
+ size_t bytes_read = 0; |
+ IOResult result = Read(&bytes_read); |
+ if (result != IO_PENDING) |
+ OnReadCompleted(result == IO_SUCCEEDED, bytes_read); |
+ |
+ // On failure, |read_watcher_| must have been reset; on success, |
+ // we assume that |OnReadCompleted()| always schedules another read. |
+ // Otherwise, we could end up spinning -- getting |
+ // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual |
+ // read. |
+ // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't |
+ // schedule a new read. But that code won't be reached under the current |
+ // RawChannel implementation. |
+ DCHECK(!read_watcher_.get() || pending_read_); |
} |
void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { |
DCHECK_EQ(fd, fd_.get().fd); |
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
- bool did_fail = false; |
+ IOResult result = IO_FAILED; |
+ size_t bytes_written = 0; |
{ |
- base::AutoLock locker(write_lock_); |
- DCHECK_EQ(write_stopped_, write_message_queue_.empty()); |
+ base::AutoLock locker(write_lock()); |
- if (write_stopped_) { |
- write_watcher_.reset(); |
- return; |
- } |
- |
- bool result = WriteFrontMessageNoLock(); |
- DCHECK(result || write_message_queue_.empty()); |
+ DCHECK(pending_write_); |
- if (!result) { |
- did_fail = true; |
- write_watcher_.reset(); |
- } else if (!write_message_queue_.empty()) { |
- WaitToWrite(); |
- } |
+ pending_write_ = false; |
+ result = WriteNoLock(&bytes_written); |
} |
- if (did_fail) |
- CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); |
+ |
+ if (result != IO_PENDING) |
+ OnWriteCompleted(result == IO_SUCCEEDED, bytes_written); |
} |
void RawChannelPosix::WaitToWrite() { |
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
DCHECK(write_watcher_.get()); |
- bool result = message_loop_for_io()->WatchFileDescriptor( |
- fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
- write_watcher_.get(), this); |
- DCHECK(result); |
-} |
-void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { |
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
- // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
- delegate()->OnFatalError(fatal_error); |
-} |
+ if (!message_loop_for_io()->WatchFileDescriptor( |
+ fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
+ write_watcher_.get(), this)) { |
+ { |
+ base::AutoLock locker(write_lock()); |
-// TODO(vtl): This will collide with yzshen's work. This is just a hacky, |
-// minimally-invasive function that does what I want (write/resume writing a |
-// |MessageInTransit| that may consist of more than one buffer). |
-ssize_t WriteMessageInTransit(int fd, |
- MessageInTransit* message, |
- size_t offset, |
- size_t* bytes_to_write) { |
- *bytes_to_write = message->total_size() - offset; |
- if (!message->secondary_buffer_size()) { |
- // Only write from the main buffer. |
- DCHECK_LT(offset, message->main_buffer_size()); |
- DCHECK_LE(*bytes_to_write, message->main_buffer_size()); |
- return HANDLE_EINTR( |
- write(fd, |
- static_cast<const char*>(message->main_buffer()) + offset, |
- *bytes_to_write)); |
- } |
- if (offset >= message->main_buffer_size()) { |
- // Only write from the secondary buffer. |
- DCHECK_LT(offset - message->main_buffer_size(), |
- message->secondary_buffer_size()); |
- DCHECK_LE(*bytes_to_write, message->secondary_buffer_size()); |
- return HANDLE_EINTR( |
- write(fd, |
- static_cast<const char*>(message->secondary_buffer()) + |
- (offset - message->main_buffer_size()), |
- *bytes_to_write)); |
- } |
- // Write from both buffers. (Note that using |writev()| is measurably slower |
- // than using |write()| -- at least in a microbenchmark -- but much faster |
- // than using two |write()|s.) |
- DCHECK_EQ(*bytes_to_write, message->main_buffer_size() - offset + |
- message->secondary_buffer_size()); |
- struct iovec iov[2] = { |
- { const_cast<char*>( |
- static_cast<const char*>(message->main_buffer()) + offset), |
- message->main_buffer_size() - offset }, |
- { const_cast<void*>(message->secondary_buffer()), |
- message->secondary_buffer_size() } |
- }; |
- return HANDLE_EINTR(writev(fd, iov, 2)); |
-} |
- |
-bool RawChannelPosix::WriteFrontMessageNoLock() { |
- write_lock_.AssertAcquired(); |
- |
- DCHECK(!write_stopped_); |
- DCHECK(!write_message_queue_.empty()); |
- |
- MessageInTransit* message = write_message_queue_.front(); |
- DCHECK_LT(write_message_offset_, message->total_size()); |
- size_t bytes_to_write; |
- ssize_t bytes_written = WriteMessageInTransit(fd_.get().fd, |
- message, |
- write_message_offset_, |
- &bytes_to_write); |
- if (bytes_written < 0) { |
- if (errno != EAGAIN && errno != EWOULDBLOCK) { |
- PLOG(ERROR) << "write of size " << bytes_to_write; |
- CancelPendingWritesNoLock(); |
- return false; |
+ DCHECK(pending_write_); |
+ pending_write_ = false; |
} |
- |
- // We simply failed to write since we'd block. The logic is the same as if |
- // we got a partial write. |
- bytes_written = 0; |
- } |
- |
- DCHECK_GE(bytes_written, 0); |
- if (static_cast<size_t>(bytes_written) < bytes_to_write) { |
- // Partial (or no) write. |
- write_message_offset_ += static_cast<size_t>(bytes_written); |
- } else { |
- // Complete write. |
- DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); |
- write_message_queue_.pop_front(); |
- delete message; |
- write_message_offset_ = 0; |
+ OnWriteCompleted(false, 0); |
} |
- |
- return true; |
-} |
- |
-void RawChannelPosix::CancelPendingWritesNoLock() { |
- write_lock_.AssertAcquired(); |
- DCHECK(!write_stopped_); |
- |
- write_stopped_ = true; |
- STLDeleteElements(&write_message_queue_); |
} |
} // namespace |