Chromium Code Reviews| Index: mojo/system/raw_channel_posix.cc |
| diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc |
| index 433ea4f8066dc03595c61db341e041727417f16c..699b327ac274d7b50f84f4f26bc6ad79c9443394 100644 |
| --- a/mojo/system/raw_channel_posix.cc |
| +++ b/mojo/system/raw_channel_posix.cc |
| @@ -5,13 +5,8 @@ |
| #include "mojo/system/raw_channel.h" |
| #include <errno.h> |
| -#include <string.h> |
| #include <unistd.h> |
| -#include <algorithm> |
| -#include <deque> |
| -#include <vector> |
| - |
| #include "base/basictypes.h" |
| #include "base/bind.h" |
| #include "base/compiler_specific.h" |
| @@ -21,18 +16,14 @@ |
| #include "base/memory/weak_ptr.h" |
| #include "base/message_loop/message_loop.h" |
| #include "base/posix/eintr_wrapper.h" |
| -#include "base/stl_util.h" |
| #include "base/synchronization/lock.h" |
| #include "mojo/system/embedder/platform_handle.h" |
| -#include "mojo/system/message_in_transit.h" |
| namespace mojo { |
| namespace system { |
| namespace { |
| -const size_t kReadSize = 4096; |
| - |
| class RawChannelPosix : public RawChannel, |
| public base::MessageLoopForIO::Watcher { |
| public: |
| @@ -41,12 +32,17 @@ class RawChannelPosix : public RawChannel, |
| base::MessageLoopForIO* message_loop_for_io); |
| virtual ~RawChannelPosix(); |
| + private: |
| // |RawChannel| implementation: |
| - virtual bool Init() OVERRIDE; |
| - virtual void Shutdown() OVERRIDE; |
| - virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; |
| + virtual IOResult Read(size_t* bytes_read) OVERRIDE; |
| + virtual IOResult ScheduleRead() OVERRIDE; |
| + virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE; |
| + virtual IOResult ScheduleWriteNoLock() OVERRIDE; |
| + virtual bool OnInit() OVERRIDE; |
| + virtual void OnShutdownNoLock( |
| + scoped_ptr<ReadBuffer> read_buffer, |
| + scoped_ptr<WriteBuffer> write_buffer) OVERRIDE; |
| - private: |
| // |base::MessageLoopForIO::Watcher| implementation: |
| virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; |
| virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; |
| @@ -54,40 +50,18 @@ class RawChannelPosix : public RawChannel, |
| // Watches for |fd_| to become writable. Must be called on the I/O thread. |
| void WaitToWrite(); |
| - // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O |
| - // thread WITHOUT |write_lock_| held. |
| - void CallOnFatalError(Delegate::FatalError fatal_error); |
| - |
| - // Writes the message at the front of |write_message_queue_|, starting at |
| - // |write_message_offset_|. It removes and destroys if the write completes and |
| - // otherwise updates |write_message_offset_|. Returns true on success. Must be |
| - // called under |write_lock_|. |
| - bool WriteFrontMessageNoLock(); |
| - |
| - // Cancels all pending writes and destroys the contents of |
| - // |write_message_queue_|. Should only be called if |write_stopped_| is false; |
| - // sets |write_stopped_| to true. Must be called under |write_lock_|. |
| - void CancelPendingWritesNoLock(); |
| - |
| embedder::ScopedPlatformHandle fd_; |
| - // Only used on the I/O thread: |
| + // The following members are only used on the I/O thread: |
| scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; |
| scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; |
| - // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| |
| - // is always aligned with a message boundary (we will copy memory to ensure |
| - // this), but |read_buffer_| may be larger than the actual number of bytes we |
| - // have. |
| - std::vector<char> read_buffer_; |
| - size_t read_buffer_num_valid_bytes_; |
| - |
| - base::Lock write_lock_; // Protects the following members. |
| - bool write_stopped_; |
| - // TODO(vtl): When C++11 is available, switch this to a deque of |
| - // |scoped_ptr|/|unique_ptr|s. |
| - std::deque<MessageInTransit*> write_message_queue_; |
| - size_t write_message_offset_; |
| + bool pending_read_; |
| + |
| + // The following members are used on multiple threads and protected by |
| + // |write_lock()|: |
| + bool pending_write_; |
| + |
| // This is used for posting tasks from write threads to the I/O thread. It |
| // must only be accessed under |write_lock_|. The weak pointers it produces |
| // are only used/invalidated on the I/O thread. |
| @@ -101,20 +75,17 @@ RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, |
| base::MessageLoopForIO* message_loop_for_io) |
| : RawChannel(delegate, message_loop_for_io), |
| fd_(handle.Pass()), |
| - read_buffer_num_valid_bytes_(0), |
| - write_stopped_(false), |
| - write_message_offset_(0), |
| + pending_read_(false), |
| + pending_write_(false), |
| weak_ptr_factory_(this) { |
| - CHECK_EQ(RawChannel::message_loop_for_io()->type(), |
| - base::MessageLoop::TYPE_IO); |
| DCHECK(fd_.is_valid()); |
| } |
| RawChannelPosix::~RawChannelPosix() { |
| - DCHECK(write_stopped_); |
| - DCHECK(!fd_.is_valid()); |
| + DCHECK(!pending_read_); |
| + DCHECK(!pending_write_); |
| - // No need to take the |write_lock_| here -- if there are still weak pointers |
| + // No need to take the |write_lock()| here -- if there are still weak pointers |
| // outstanding, then we're hosed anyway (since we wouldn't be able to |
| // invalidate them cleanly, since we might not be on the I/O thread). |
| DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
| @@ -124,280 +95,173 @@ RawChannelPosix::~RawChannelPosix() { |
| DCHECK(!write_watcher_.get()); |
| } |
| -bool RawChannelPosix::Init() { |
| +RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { |
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| + DCHECK(!pending_read_); |
| - DCHECK(!read_watcher_.get()); |
| - read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
| - DCHECK(!write_watcher_.get()); |
| - write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
| + ssize_t read_result = HANDLE_EINTR( |
| + read(fd_.get().fd, read_buffer()->GetPosition(), |
| + read_buffer()->GetBytesToRead())); |
| - // No need to take the lock. No one should be using us yet. |
| - DCHECK(write_message_queue_.empty()); |
| + if (read_result >= 0) { |
| + *bytes_read = static_cast<size_t>(read_result); |
| + return IO_SUCCEEDED; |
| + } else if (errno != EAGAIN && errno != EWOULDBLOCK) { |
| + PLOG(ERROR) << "read"; |
| - if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, |
| - base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { |
| - // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly |
| - // (in the sense of returning the message loop's state to what it was before |
| - // it was called). |
| + // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again. |
| read_watcher_.reset(); |
| - write_watcher_.reset(); |
| - return false; |
| + |
| + return IO_FAILED; |
| } |
| - return true; |
| + return ScheduleRead(); |
| } |
| -void RawChannelPosix::Shutdown() { |
| +RawChannel::IOResult RawChannelPosix::ScheduleRead() { |
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| + DCHECK(!pending_read_); |
| - base::AutoLock locker(write_lock_); |
| - if (!write_stopped_) |
| - CancelPendingWritesNoLock(); |
| + pending_read_ = true; |
| - read_watcher_.reset(); // This will stop watching (if necessary). |
| - write_watcher_.reset(); // This will stop watching (if necessary). |
| + return IO_PENDING; |
| +} |
| - DCHECK(fd_.is_valid()); |
| - fd_.reset(); |
| +RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) { |
| + write_lock().AssertAcquired(); |
| - weak_ptr_factory_.InvalidateWeakPtrs(); |
| -} |
| + DCHECK(!pending_write_); |
| -// Reminder: This must be thread-safe, and takes ownership of |message|. |
| -bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) { |
| - base::AutoLock locker(write_lock_); |
| - if (write_stopped_) |
| - return false; |
| + ssize_t write_result = HANDLE_EINTR( |
| + write(fd_.get().fd, write_buffer()->GetPosition(), |
| + write_buffer()->GetBytesToWrite())); |
| - if (!write_message_queue_.empty()) { |
| - write_message_queue_.push_back(message.release()); |
| - return true; |
| + if (write_result >= 0) { |
| + *bytes_written = static_cast<size_t>(write_result); |
| + return IO_SUCCEEDED; |
| + } else if (errno != EAGAIN && errno != EWOULDBLOCK) { |
| + PLOG(ERROR) << "write of size " << write_buffer()->GetBytesToWrite(); |
| + return IO_FAILED; |
| } |
| - write_message_queue_.push_front(message.release()); |
| - DCHECK_EQ(write_message_offset_, 0u); |
| - bool result = WriteFrontMessageNoLock(); |
| - DCHECK(result || write_message_queue_.empty()); |
| + return ScheduleWriteNoLock(); |
| +} |
| + |
| +RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() { |
| + write_lock().AssertAcquired(); |
| + |
| + DCHECK(!pending_write_); |
| - if (!result) { |
| - // Even if we're on the I/O thread, don't call |OnFatalError()| in the |
| - // nested context. |
| + // Set up to wait for the FD to become writable. |
| + // If we're not on the I/O thread, we have to post a task to do this. |
| + if (base::MessageLoop::current() != message_loop_for_io()) { |
| message_loop_for_io()->PostTask( |
| FROM_HERE, |
| - base::Bind(&RawChannelPosix::CallOnFatalError, |
| - weak_ptr_factory_.GetWeakPtr(), |
| - Delegate::FATAL_ERROR_FAILED_WRITE)); |
| - } else if (!write_message_queue_.empty()) { |
| - // Set up to wait for the FD to become writable. If we're not on the I/O |
| - // thread, we have to post a task to do this. |
| - if (base::MessageLoop::current() == message_loop_for_io()) { |
| - WaitToWrite(); |
| - } else { |
| - message_loop_for_io()->PostTask( |
| - FROM_HERE, |
| - base::Bind(&RawChannelPosix::WaitToWrite, |
| - weak_ptr_factory_.GetWeakPtr())); |
| - } |
| + base::Bind(&RawChannelPosix::WaitToWrite, |
| + weak_ptr_factory_.GetWeakPtr())); |
| + pending_write_ = true; |
| + return IO_PENDING; |
| + } |
| + |
| + if (message_loop_for_io()->WatchFileDescriptor( |
| + fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
| + write_watcher_.get(), this)) { |
| + pending_write_ = true; |
| + return IO_PENDING; |
| } |
| - return result; |
| + return IO_FAILED; |
| } |
| -void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { |
| - DCHECK_EQ(fd, fd_.get().fd); |
| +bool RawChannelPosix::OnInit() { |
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| - bool did_dispatch_message = false; |
| - // Tracks the offset of the first undispatched message in |read_buffer_|. |
| - // Currently, we copy data to ensure that this is zero at the beginning. |
| - size_t read_buffer_start = 0; |
| - for (;;) { |
| - if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) |
| - < kReadSize) { |
| - // Use power-of-2 buffer sizes. |
| - // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
| - // maximum message size to whatever extent necessary). |
| - // TODO(vtl): We may often be able to peek at the header and get the real |
| - // required extra space (which may be much bigger than |kReadSize|). |
| - size_t new_size = std::max(read_buffer_.size(), kReadSize); |
| - while (new_size < |
| - read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) |
| - new_size *= 2; |
| - |
| - // TODO(vtl): It's suboptimal to zero out the fresh memory. |
| - read_buffer_.resize(new_size, 0); |
| - } |
| + DCHECK(!read_watcher_.get()); |
| + read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
| + DCHECK(!write_watcher_.get()); |
| + write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
| - ssize_t bytes_read = HANDLE_EINTR( |
| - read(fd_.get().fd, |
| - &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], |
| - kReadSize)); |
| - if (bytes_read < 0) { |
| - if (errno != EAGAIN && errno != EWOULDBLOCK) { |
| - PLOG(ERROR) << "read"; |
| + if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, |
| + base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { |
| + // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly |
| + // (in the sense of returning the message loop's state to what it was before |
| + // it was called). |
| + read_watcher_.reset(); |
| + write_watcher_.reset(); |
| + return false; |
| + } |
| - // Make sure that |OnFileCanReadWithoutBlocking()| won't be called |
| - // again. |
| - read_watcher_.reset(); |
| + return true; |
| +} |
| - CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); |
| - return; |
| - } |
| +void RawChannelPosix::OnShutdownNoLock( |
| + scoped_ptr<ReadBuffer> /* read_buffer */, |
|
viettrungluu
2014/02/26 23:03:39
nit: In this directory, I've mostly left out space
yzshen1
2014/02/27 02:00:30
Done.
|
| + scoped_ptr<WriteBuffer> /* write_buffer */) { |
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| + write_lock().AssertAcquired(); |
| - break; |
| - } |
| + read_watcher_.reset(); // This will stop watching (if necessary). |
| + write_watcher_.reset(); // This will stop watching (if necessary). |
| - read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); |
| - |
| - // Dispatch all the messages that we can. |
| - size_t message_size; |
| - // Note that we rely on short-circuit evaluation here: |
| - // - |read_buffer_start| may be an invalid index into |read_buffer_| if |
| - // |read_buffer_num_valid_bytes_| is zero. |
| - // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
| - // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
| - // next read). |
| - while (read_buffer_num_valid_bytes_ > 0 && |
| - MessageInTransit::GetNextMessageSize( |
| - &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_, |
| - &message_size) && |
| - read_buffer_num_valid_bytes_ >= message_size) { |
| - MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, |
| - &read_buffer_[read_buffer_start]); |
| - DCHECK_EQ(message.main_buffer_size(), message_size); |
| - |
| - // Dispatch the message. |
| - delegate()->OnReadMessage(message); |
| - if (!read_watcher_.get()) { |
| - // |Shutdown()| was called in |OnReadMessage()|. |
| - // TODO(vtl): Add test for this case. |
| - return; |
| - } |
| - did_dispatch_message = true; |
| - |
| - // Update our state. |
| - read_buffer_start += message_size; |
| - read_buffer_num_valid_bytes_ -= message_size; |
| - } |
| + pending_read_ = false; |
| + pending_write_ = false; |
| - // If we dispatched any messages, stop reading for now (and let the message |
| - // loop do its thing for another round). |
| - // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only |
| - // a single message. Risks: slower, more complex if we want to avoid lots of |
| - // copying. ii. Keep reading until there's no more data and dispatch all the |
| - // messages we can. Risks: starvation of other users of the message loop.) |
| - if (did_dispatch_message) |
| - break; |
| + DCHECK(fd_.is_valid()); |
| + fd_.reset(); |
| - // If we didn't max out |kReadSize|, stop reading for now. |
| - if (static_cast<size_t>(bytes_read) < kReadSize) |
| - break; |
| + weak_ptr_factory_.InvalidateWeakPtrs(); |
| +} |
| - // Else try to read some more.... |
| - } |
| +void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { |
| + DCHECK_EQ(fd, fd_.get().fd); |
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| - // Move data back to start. |
| - if (read_buffer_start > 0) { |
| - if (read_buffer_num_valid_bytes_ > 0) { |
| - memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], |
| - read_buffer_num_valid_bytes_); |
| - } |
| - read_buffer_start = 0; |
| - } |
| + if (!pending_read_) |
| + return; |
|
viettrungluu
2014/02/26 23:03:39
You should probably DCHECK that read_waiter_ is nu
yzshen1
2014/02/27 02:00:30
Thanks! :)
|
| + |
| + pending_read_ = false; |
| + size_t bytes_read = 0; |
| + IOResult result = Read(&bytes_read); |
| + if (result != IO_PENDING) |
| + OnReadCompleted(result == IO_SUCCEEDED, bytes_read); |
| } |
| void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { |
| DCHECK_EQ(fd, fd_.get().fd); |
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| - bool did_fail = false; |
| + IOResult result = IO_FAILED; |
| + size_t bytes_written = 0; |
| { |
| - base::AutoLock locker(write_lock_); |
| - DCHECK_EQ(write_stopped_, write_message_queue_.empty()); |
| + base::AutoLock locker(write_lock()); |
| - if (write_stopped_) { |
| - write_watcher_.reset(); |
| + if (!pending_write_) |
| return; |
|
viettrungluu
2014/02/26 23:03:39
"
(though the situation for pending_write_ isn't
yzshen1
2014/02/27 02:00:30
Right. I made it a DCHECK().
|
| - } |
| - bool result = WriteFrontMessageNoLock(); |
| - DCHECK(result || write_message_queue_.empty()); |
| - |
| - if (!result) { |
| - did_fail = true; |
| - write_watcher_.reset(); |
| - } else if (!write_message_queue_.empty()) { |
| - WaitToWrite(); |
| - } |
| + pending_write_ = false; |
| + result = WriteNoLock(&bytes_written); |
| } |
| - if (did_fail) |
| - CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); |
| + |
| + if (result != IO_PENDING) |
| + OnWriteCompleted(result == IO_SUCCEEDED, bytes_written); |
| } |
| void RawChannelPosix::WaitToWrite() { |
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| DCHECK(write_watcher_.get()); |
| - bool result = message_loop_for_io()->WatchFileDescriptor( |
| - fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
| - write_watcher_.get(), this); |
| - DCHECK(result); |
| -} |
| -void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { |
| - DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| - // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
| - delegate()->OnFatalError(fatal_error); |
| -} |
| + if (!message_loop_for_io()->WatchFileDescriptor( |
| + fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
| + write_watcher_.get(), this)) { |
| + { |
| + base::AutoLock locker(write_lock()); |
| -bool RawChannelPosix::WriteFrontMessageNoLock() { |
| - write_lock_.AssertAcquired(); |
| - |
| - DCHECK(!write_stopped_); |
| - DCHECK(!write_message_queue_.empty()); |
| - |
| - MessageInTransit* message = write_message_queue_.front(); |
| - DCHECK_LT(write_message_offset_, message->main_buffer_size()); |
| - size_t bytes_to_write = message->main_buffer_size() - write_message_offset_; |
| - ssize_t bytes_written = HANDLE_EINTR( |
| - write(fd_.get().fd, |
| - static_cast<const char*>(message->main_buffer()) + |
| - write_message_offset_, |
| - bytes_to_write)); |
| - if (bytes_written < 0) { |
| - if (errno != EAGAIN && errno != EWOULDBLOCK) { |
| - PLOG(ERROR) << "write of size " << bytes_to_write; |
| - CancelPendingWritesNoLock(); |
| - return false; |
| + DCHECK(pending_write_); |
| + pending_write_ = false; |
| } |
| - |
| - // We simply failed to write since we'd block. The logic is the same as if |
| - // we got a partial write. |
| - bytes_written = 0; |
| - } |
| - |
| - DCHECK_GE(bytes_written, 0); |
| - if (static_cast<size_t>(bytes_written) < bytes_to_write) { |
| - // Partial (or no) write. |
| - write_message_offset_ += static_cast<size_t>(bytes_written); |
| - } else { |
| - // Complete write. |
| - DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); |
| - write_message_queue_.pop_front(); |
| - delete message; |
| - write_message_offset_ = 0; |
| + OnWriteCompleted(false, 0); |
| } |
| - |
| - return true; |
| -} |
| - |
| -void RawChannelPosix::CancelPendingWritesNoLock() { |
| - write_lock_.AssertAcquired(); |
| - DCHECK(!write_stopped_); |
| - |
| - write_stopped_ = true; |
| - STLDeleteElements(&write_message_queue_); |
| } |
| } // namespace |