| Index: mojo/system/raw_channel_posix.cc
|
| diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc
|
| index 433ea4f8066dc03595c61db341e041727417f16c..5c53ec58c16c46221dc2935340a59be3a7b8a0a7 100644
|
| --- a/mojo/system/raw_channel_posix.cc
|
| +++ b/mojo/system/raw_channel_posix.cc
|
| @@ -5,13 +5,8 @@
|
| #include "mojo/system/raw_channel.h"
|
|
|
| #include <errno.h>
|
| -#include <string.h>
|
| #include <unistd.h>
|
|
|
| -#include <algorithm>
|
| -#include <deque>
|
| -#include <vector>
|
| -
|
| #include "base/basictypes.h"
|
| #include "base/bind.h"
|
| #include "base/compiler_specific.h"
|
| @@ -21,18 +16,14 @@
|
| #include "base/memory/weak_ptr.h"
|
| #include "base/message_loop/message_loop.h"
|
| #include "base/posix/eintr_wrapper.h"
|
| -#include "base/stl_util.h"
|
| #include "base/synchronization/lock.h"
|
| #include "mojo/system/embedder/platform_handle.h"
|
| -#include "mojo/system/message_in_transit.h"
|
|
|
| namespace mojo {
|
| namespace system {
|
|
|
| namespace {
|
|
|
| -const size_t kReadSize = 4096;
|
| -
|
| class RawChannelPosix : public RawChannel,
|
| public base::MessageLoopForIO::Watcher {
|
| public:
|
| @@ -41,12 +32,20 @@ class RawChannelPosix : public RawChannel,
|
| base::MessageLoopForIO* message_loop_for_io);
|
| virtual ~RawChannelPosix();
|
|
|
| + private:
|
| // |RawChannel| implementation:
|
| - virtual bool Init() OVERRIDE;
|
| - virtual void Shutdown() OVERRIDE;
|
| - virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE;
|
| + virtual IOResult Read(bool schedule_for_later,
|
| + char* buffer,
|
| + size_t bytes_to_read,
|
| + size_t* bytes_read) OVERRIDE;
|
| + virtual IOResult WriteNoLock(bool schedule_for_later,
|
| + const char* buffer,
|
| + size_t bytes_to_write,
|
| + size_t* bytes_written) OVERRIDE;
|
| + virtual bool OnInit() OVERRIDE;
|
| + virtual void OnShutdownNoLock(
|
| + scoped_ptr<IOBufferPreserver> buffer_preserver) OVERRIDE;
|
|
|
| - private:
|
| // |base::MessageLoopForIO::Watcher| implementation:
|
| virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
|
| virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
|
| @@ -54,40 +53,20 @@ class RawChannelPosix : public RawChannel,
|
| // Watches for |fd_| to become writable. Must be called on the I/O thread.
|
| void WaitToWrite();
|
|
|
| - // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
|
| - // thread WITHOUT |write_lock_| held.
|
| - void CallOnFatalError(Delegate::FatalError fatal_error);
|
| -
|
| - // Writes the message at the front of |write_message_queue_|, starting at
|
| - // |write_message_offset_|. It removes and destroys if the write completes and
|
| - // otherwise updates |write_message_offset_|. Returns true on success. Must be
|
| - // called under |write_lock_|.
|
| - bool WriteFrontMessageNoLock();
|
| -
|
| - // Cancels all pending writes and destroys the contents of
|
| - // |write_message_queue_|. Should only be called if |write_stopped_| is false;
|
| - // sets |write_stopped_| to true. Must be called under |write_lock_|.
|
| - void CancelPendingWritesNoLock();
|
| -
|
| embedder::ScopedPlatformHandle fd_;
|
|
|
| - // Only used on the I/O thread:
|
| + // The following members are only used on the I/O thread:
|
| scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
|
| scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
|
|
|
| - // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_|
|
| - // is always aligned with a message boundary (we will copy memory to ensure
|
| - // this), but |read_buffer_| may be larger than the actual number of bytes we
|
| - // have.
|
| - std::vector<char> read_buffer_;
|
| - size_t read_buffer_num_valid_bytes_;
|
| -
|
| - base::Lock write_lock_; // Protects the following members.
|
| - bool write_stopped_;
|
| - // TODO(vtl): When C++11 is available, switch this to a deque of
|
| - // |scoped_ptr|/|unique_ptr|s.
|
| - std::deque<MessageInTransit*> write_message_queue_;
|
| - size_t write_message_offset_;
|
| + char* pending_read_buffer_;
|
| + size_t pending_bytes_to_read_;
|
| +
|
| + // The following members are used on multiple threads and protected by
|
| + // |write_lock()|:
|
| + const char* pending_write_buffer_;
|
| + size_t pending_bytes_to_write_;
|
| +
|
| // This is used for posting tasks from write threads to the I/O thread. It
|
| // must only be accessed under |write_lock_|. The weak pointers it produces
|
| // are only used/invalidated on the I/O thread.
|
| @@ -101,20 +80,19 @@ RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle,
|
| base::MessageLoopForIO* message_loop_for_io)
|
| : RawChannel(delegate, message_loop_for_io),
|
| fd_(handle.Pass()),
|
| - read_buffer_num_valid_bytes_(0),
|
| - write_stopped_(false),
|
| - write_message_offset_(0),
|
| + pending_read_buffer_(NULL),
|
| + pending_bytes_to_read_(0),
|
| + pending_write_buffer_(NULL),
|
| + pending_bytes_to_write_(0),
|
| weak_ptr_factory_(this) {
|
| - CHECK_EQ(RawChannel::message_loop_for_io()->type(),
|
| - base::MessageLoop::TYPE_IO);
|
| DCHECK(fd_.is_valid());
|
| }
|
|
|
| RawChannelPosix::~RawChannelPosix() {
|
| - DCHECK(write_stopped_);
|
| - DCHECK(!fd_.is_valid());
|
| + DCHECK(!pending_read_buffer_);
|
| + DCHECK(!pending_write_buffer_);
|
|
|
| - // No need to take the |write_lock_| here -- if there are still weak pointers
|
| + // No need to take the |write_lock()| here -- if there are still weak pointers
|
| // outstanding, then we're hosed anyway (since we wouldn't be able to
|
| // invalidate them cleanly, since we might not be on the I/O thread).
|
| DCHECK(!weak_ptr_factory_.HasWeakPtrs());
|
| @@ -124,7 +102,79 @@ RawChannelPosix::~RawChannelPosix() {
|
| DCHECK(!write_watcher_.get());
|
| }
|
|
|
| -bool RawChannelPosix::Init() {
|
| +RawChannel::IOResult RawChannelPosix::Read(bool schedule_for_later,
|
| + char* buffer,
|
| + size_t bytes_to_read,
|
| + size_t* bytes_read) {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
|
| + DCHECK(!pending_read_buffer_);
|
| +
|
| + if (!schedule_for_later) {
|
| + ssize_t read_result = HANDLE_EINTR(
|
| + read(fd_.get().fd, buffer, bytes_to_read));
|
| +
|
| + if (read_result >= 0) {
|
| + *bytes_read = static_cast<size_t>(read_result);
|
| + return IO_SUCCEEDED;
|
| + } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
|
| + PLOG(ERROR) << "read";
|
| +
|
| + // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
|
| + read_watcher_.reset();
|
| +
|
| + return IO_FAILED;
|
| + }
|
| + }
|
| +
|
| + // Either |schedule_for_later| is true or |read()| above would block.
|
| + pending_read_buffer_ = buffer;
|
| + pending_bytes_to_read_ = bytes_to_read;
|
| +
|
| + return IO_PENDING;
|
| +}
|
| +
|
| +RawChannel::IOResult RawChannelPosix::WriteNoLock(bool schedule_for_later,
|
| + const char* buffer,
|
| + size_t bytes_to_write,
|
| + size_t* bytes_written) {
|
| + write_lock().AssertAcquired();
|
| +
|
| + DCHECK(!pending_write_buffer_);
|
| +
|
| + if (!schedule_for_later) {
|
| + ssize_t write_result = HANDLE_EINTR(
|
| + write(fd_.get().fd, buffer, bytes_to_write));
|
| +
|
| + if (write_result >= 0) {
|
| + *bytes_written = static_cast<size_t>(write_result);
|
| + return IO_SUCCEEDED;
|
| + } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
|
| + PLOG(ERROR) << "write of size " << bytes_to_write;
|
| + return IO_FAILED;
|
| + }
|
| + }
|
| +
|
| + pending_write_buffer_ = buffer;
|
| + pending_bytes_to_write_ = bytes_to_write;
|
| +
|
| + // Either |schedule_for_later| is true or |write()| above would block, set up
|
| + // to wait for the FD to become writable.
|
| + // If we're not on the I/O thread, we have to post a task to do this.
|
| + if (base::MessageLoop::current() != message_loop_for_io()) {
|
| + message_loop_for_io()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&RawChannelPosix::WaitToWrite,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| + return IO_PENDING;
|
| + }
|
| +
|
| + bool result = message_loop_for_io()->WatchFileDescriptor(
|
| + fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
|
| + write_watcher_.get(), this);
|
| + return result ? IO_PENDING : IO_FAILED;
|
| +}
|
| +
|
| +bool RawChannelPosix::OnInit() {
|
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
|
|
|
| DCHECK(!read_watcher_.get());
|
| @@ -132,9 +182,6 @@ bool RawChannelPosix::Init() {
|
| DCHECK(!write_watcher_.get());
|
| write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
|
|
|
| - // No need to take the lock. No one should be using us yet.
|
| - DCHECK(write_message_queue_.empty());
|
| -
|
| if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true,
|
| base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
|
| // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
|
| @@ -148,256 +195,88 @@ bool RawChannelPosix::Init() {
|
| return true;
|
| }
|
|
|
| -void RawChannelPosix::Shutdown() {
|
| +void RawChannelPosix::OnShutdownNoLock(
|
| + scoped_ptr<IOBufferPreserver> /* buffer_preserver */) {
|
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
|
| -
|
| - base::AutoLock locker(write_lock_);
|
| - if (!write_stopped_)
|
| - CancelPendingWritesNoLock();
|
| + write_lock().AssertAcquired();
|
|
|
| read_watcher_.reset(); // This will stop watching (if necessary).
|
| write_watcher_.reset(); // This will stop watching (if necessary).
|
|
|
| + pending_read_buffer_ = NULL;
|
| + pending_bytes_to_read_ = 0;
|
| +
|
| + pending_write_buffer_ = NULL;
|
| + pending_bytes_to_write_ = 0;
|
| +
|
| DCHECK(fd_.is_valid());
|
| fd_.reset();
|
|
|
| weak_ptr_factory_.InvalidateWeakPtrs();
|
| }
|
|
|
| -// Reminder: This must be thread-safe, and takes ownership of |message|.
|
| -bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) {
|
| - base::AutoLock locker(write_lock_);
|
| - if (write_stopped_)
|
| - return false;
|
| -
|
| - if (!write_message_queue_.empty()) {
|
| - write_message_queue_.push_back(message.release());
|
| - return true;
|
| - }
|
| -
|
| - write_message_queue_.push_front(message.release());
|
| - DCHECK_EQ(write_message_offset_, 0u);
|
| - bool result = WriteFrontMessageNoLock();
|
| - DCHECK(result || write_message_queue_.empty());
|
| -
|
| - if (!result) {
|
| - // Even if we're on the I/O thread, don't call |OnFatalError()| in the
|
| - // nested context.
|
| - message_loop_for_io()->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&RawChannelPosix::CallOnFatalError,
|
| - weak_ptr_factory_.GetWeakPtr(),
|
| - Delegate::FATAL_ERROR_FAILED_WRITE));
|
| - } else if (!write_message_queue_.empty()) {
|
| - // Set up to wait for the FD to become writable. If we're not on the I/O
|
| - // thread, we have to post a task to do this.
|
| - if (base::MessageLoop::current() == message_loop_for_io()) {
|
| - WaitToWrite();
|
| - } else {
|
| - message_loop_for_io()->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&RawChannelPosix::WaitToWrite,
|
| - weak_ptr_factory_.GetWeakPtr()));
|
| - }
|
| - }
|
| -
|
| - return result;
|
| -}
|
| -
|
| void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
|
| DCHECK_EQ(fd, fd_.get().fd);
|
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
|
|
|
| - bool did_dispatch_message = false;
|
| - // Tracks the offset of the first undispatched message in |read_buffer_|.
|
| - // Currently, we copy data to ensure that this is zero at the beginning.
|
| - size_t read_buffer_start = 0;
|
| - for (;;) {
|
| - if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_)
|
| - < kReadSize) {
|
| - // Use power-of-2 buffer sizes.
|
| - // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
|
| - // maximum message size to whatever extent necessary).
|
| - // TODO(vtl): We may often be able to peek at the header and get the real
|
| - // required extra space (which may be much bigger than |kReadSize|).
|
| - size_t new_size = std::max(read_buffer_.size(), kReadSize);
|
| - while (new_size <
|
| - read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize)
|
| - new_size *= 2;
|
| -
|
| - // TODO(vtl): It's suboptimal to zero out the fresh memory.
|
| - read_buffer_.resize(new_size, 0);
|
| - }
|
| -
|
| - ssize_t bytes_read = HANDLE_EINTR(
|
| - read(fd_.get().fd,
|
| - &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
|
| - kReadSize));
|
| - if (bytes_read < 0) {
|
| - if (errno != EAGAIN && errno != EWOULDBLOCK) {
|
| - PLOG(ERROR) << "read";
|
| -
|
| - // Make sure that |OnFileCanReadWithoutBlocking()| won't be called
|
| - // again.
|
| - read_watcher_.reset();
|
| -
|
| - CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
|
| - return;
|
| - }
|
| -
|
| - break;
|
| - }
|
| -
|
| - read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read);
|
| -
|
| - // Dispatch all the messages that we can.
|
| - size_t message_size;
|
| - // Note that we rely on short-circuit evaluation here:
|
| - // - |read_buffer_start| may be an invalid index into |read_buffer_| if
|
| - // |read_buffer_num_valid_bytes_| is zero.
|
| - // - |message_size| is only valid if |GetNextMessageSize()| returns true.
|
| - // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
|
| - // next read).
|
| - while (read_buffer_num_valid_bytes_ > 0 &&
|
| - MessageInTransit::GetNextMessageSize(
|
| - &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_,
|
| - &message_size) &&
|
| - read_buffer_num_valid_bytes_ >= message_size) {
|
| - MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
|
| - &read_buffer_[read_buffer_start]);
|
| - DCHECK_EQ(message.main_buffer_size(), message_size);
|
| -
|
| - // Dispatch the message.
|
| - delegate()->OnReadMessage(message);
|
| - if (!read_watcher_.get()) {
|
| - // |Shutdown()| was called in |OnReadMessage()|.
|
| - // TODO(vtl): Add test for this case.
|
| - return;
|
| - }
|
| - did_dispatch_message = true;
|
| -
|
| - // Update our state.
|
| - read_buffer_start += message_size;
|
| - read_buffer_num_valid_bytes_ -= message_size;
|
| - }
|
| + if (!pending_read_buffer_)
|
| + return;
|
|
|
| - // If we dispatched any messages, stop reading for now (and let the message
|
| - // loop do its thing for another round).
|
| - // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
|
| - // a single message. Risks: slower, more complex if we want to avoid lots of
|
| - // copying. ii. Keep reading until there's no more data and dispatch all the
|
| - // messages we can. Risks: starvation of other users of the message loop.)
|
| - if (did_dispatch_message)
|
| - break;
|
| + char* temp_buffer = pending_read_buffer_;
|
| + size_t temp_bytes_to_read = pending_bytes_to_read_;
|
| + size_t bytes_read = 0;
|
|
|
| - // If we didn't max out |kReadSize|, stop reading for now.
|
| - if (static_cast<size_t>(bytes_read) < kReadSize)
|
| - break;
|
| -
|
| - // Else try to read some more....
|
| - }
|
| + pending_read_buffer_ = NULL;
|
| + pending_bytes_to_read_ = 0;
|
|
|
| - // Move data back to start.
|
| - if (read_buffer_start > 0) {
|
| - if (read_buffer_num_valid_bytes_ > 0) {
|
| - memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
|
| - read_buffer_num_valid_bytes_);
|
| - }
|
| - read_buffer_start = 0;
|
| - }
|
| + IOResult result = Read(false, temp_buffer, temp_bytes_to_read,
|
| + &bytes_read);
|
| + if (result != IO_PENDING)
|
| + OnReadCompleted(result == IO_SUCCEEDED, bytes_read);
|
| }
|
|
|
| void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
|
| DCHECK_EQ(fd, fd_.get().fd);
|
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
|
|
|
| - bool did_fail = false;
|
| + IOResult result = IO_FAILED;
|
| + size_t bytes_written = 0;
|
| {
|
| - base::AutoLock locker(write_lock_);
|
| - DCHECK_EQ(write_stopped_, write_message_queue_.empty());
|
| + base::AutoLock locker(write_lock());
|
|
|
| - if (write_stopped_) {
|
| - write_watcher_.reset();
|
| + if (!pending_write_buffer_)
|
| return;
|
| - }
|
|
|
| - bool result = WriteFrontMessageNoLock();
|
| - DCHECK(result || write_message_queue_.empty());
|
| + const char* temp_write_buffer = pending_write_buffer_;
|
| + size_t temp_bytes_to_write = pending_bytes_to_write_;
|
| + pending_write_buffer_ = NULL;
|
| + pending_bytes_to_write_ = 0;
|
|
|
| - if (!result) {
|
| - did_fail = true;
|
| - write_watcher_.reset();
|
| - } else if (!write_message_queue_.empty()) {
|
| - WaitToWrite();
|
| - }
|
| + result = WriteNoLock(false, temp_write_buffer, temp_bytes_to_write,
|
| + &bytes_written);
|
| }
|
| - if (did_fail)
|
| - CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
|
| +
|
| + if (result != IO_PENDING)
|
| + OnWriteCompleted(result == IO_SUCCEEDED, bytes_written);
|
| }
|
|
|
| void RawChannelPosix::WaitToWrite() {
|
| DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
|
|
|
| DCHECK(write_watcher_.get());
|
| - bool result = message_loop_for_io()->WatchFileDescriptor(
|
| - fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
|
| - write_watcher_.get(), this);
|
| - DCHECK(result);
|
| -}
|
|
|
| -void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
|
| - DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
|
| - // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
|
| - delegate()->OnFatalError(fatal_error);
|
| -}
|
| + if (!message_loop_for_io()->WatchFileDescriptor(
|
| + fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
|
| + write_watcher_.get(), this)) {
|
| + {
|
| + base::AutoLock locker(write_lock());
|
|
|
| -bool RawChannelPosix::WriteFrontMessageNoLock() {
|
| - write_lock_.AssertAcquired();
|
| -
|
| - DCHECK(!write_stopped_);
|
| - DCHECK(!write_message_queue_.empty());
|
| -
|
| - MessageInTransit* message = write_message_queue_.front();
|
| - DCHECK_LT(write_message_offset_, message->main_buffer_size());
|
| - size_t bytes_to_write = message->main_buffer_size() - write_message_offset_;
|
| - ssize_t bytes_written = HANDLE_EINTR(
|
| - write(fd_.get().fd,
|
| - static_cast<const char*>(message->main_buffer()) +
|
| - write_message_offset_,
|
| - bytes_to_write));
|
| - if (bytes_written < 0) {
|
| - if (errno != EAGAIN && errno != EWOULDBLOCK) {
|
| - PLOG(ERROR) << "write of size " << bytes_to_write;
|
| - CancelPendingWritesNoLock();
|
| - return false;
|
| + DCHECK(pending_write_buffer_);
|
| + pending_write_buffer_ = NULL;
|
| + pending_bytes_to_write_ = 0;
|
| }
|
| -
|
| - // We simply failed to write since we'd block. The logic is the same as if
|
| - // we got a partial write.
|
| - bytes_written = 0;
|
| - }
|
| -
|
| - DCHECK_GE(bytes_written, 0);
|
| - if (static_cast<size_t>(bytes_written) < bytes_to_write) {
|
| - // Partial (or no) write.
|
| - write_message_offset_ += static_cast<size_t>(bytes_written);
|
| - } else {
|
| - // Complete write.
|
| - DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write);
|
| - write_message_queue_.pop_front();
|
| - delete message;
|
| - write_message_offset_ = 0;
|
| + OnWriteCompleted(false, 0);
|
| }
|
| -
|
| - return true;
|
| -}
|
| -
|
| -void RawChannelPosix::CancelPendingWritesNoLock() {
|
| - write_lock_.AssertAcquired();
|
| - DCHECK(!write_stopped_);
|
| -
|
| - write_stopped_ = true;
|
| - STLDeleteElements(&write_message_queue_);
|
| }
|
|
|
| } // namespace
|
|
|