Index: mojo/system/raw_channel_posix.cc |
diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc |
index 433ea4f8066dc03595c61db341e041727417f16c..5c53ec58c16c46221dc2935340a59be3a7b8a0a7 100644 |
--- a/mojo/system/raw_channel_posix.cc |
+++ b/mojo/system/raw_channel_posix.cc |
@@ -5,13 +5,8 @@ |
#include "mojo/system/raw_channel.h" |
#include <errno.h> |
-#include <string.h> |
#include <unistd.h> |
-#include <algorithm> |
-#include <deque> |
-#include <vector> |
- |
#include "base/basictypes.h" |
#include "base/bind.h" |
#include "base/compiler_specific.h" |
@@ -21,18 +16,14 @@ |
#include "base/memory/weak_ptr.h" |
#include "base/message_loop/message_loop.h" |
#include "base/posix/eintr_wrapper.h" |
-#include "base/stl_util.h" |
#include "base/synchronization/lock.h" |
#include "mojo/system/embedder/platform_handle.h" |
-#include "mojo/system/message_in_transit.h" |
namespace mojo { |
namespace system { |
namespace { |
-const size_t kReadSize = 4096; |
- |
class RawChannelPosix : public RawChannel, |
public base::MessageLoopForIO::Watcher { |
public: |
@@ -41,12 +32,20 @@ class RawChannelPosix : public RawChannel, |
base::MessageLoopForIO* message_loop_for_io); |
virtual ~RawChannelPosix(); |
+ private: |
// |RawChannel| implementation: |
- virtual bool Init() OVERRIDE; |
- virtual void Shutdown() OVERRIDE; |
- virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; |
+ virtual IOResult Read(bool schedule_for_later, |
+ char* buffer, |
+ size_t bytes_to_read, |
+ size_t* bytes_read) OVERRIDE; |
+ virtual IOResult WriteNoLock(bool schedule_for_later, |
+ const char* buffer, |
+ size_t bytes_to_write, |
+ size_t* bytes_written) OVERRIDE; |
+ virtual bool OnInit() OVERRIDE; |
+ virtual void OnShutdownNoLock( |
+ scoped_ptr<IOBufferPreserver> buffer_preserver) OVERRIDE; |
- private: |
// |base::MessageLoopForIO::Watcher| implementation: |
virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; |
virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; |
@@ -54,40 +53,20 @@ class RawChannelPosix : public RawChannel, |
// Watches for |fd_| to become writable. Must be called on the I/O thread. |
void WaitToWrite(); |
- // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O |
- // thread WITHOUT |write_lock_| held. |
- void CallOnFatalError(Delegate::FatalError fatal_error); |
- |
- // Writes the message at the front of |write_message_queue_|, starting at |
- // |write_message_offset_|. It removes and destroys if the write completes and |
- // otherwise updates |write_message_offset_|. Returns true on success. Must be |
- // called under |write_lock_|. |
- bool WriteFrontMessageNoLock(); |
- |
- // Cancels all pending writes and destroys the contents of |
- // |write_message_queue_|. Should only be called if |write_stopped_| is false; |
- // sets |write_stopped_| to true. Must be called under |write_lock_|. |
- void CancelPendingWritesNoLock(); |
- |
embedder::ScopedPlatformHandle fd_; |
- // Only used on the I/O thread: |
+ // The following members are only used on the I/O thread: |
scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; |
scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; |
- // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| |
- // is always aligned with a message boundary (we will copy memory to ensure |
- // this), but |read_buffer_| may be larger than the actual number of bytes we |
- // have. |
- std::vector<char> read_buffer_; |
- size_t read_buffer_num_valid_bytes_; |
- |
- base::Lock write_lock_; // Protects the following members. |
- bool write_stopped_; |
- // TODO(vtl): When C++11 is available, switch this to a deque of |
- // |scoped_ptr|/|unique_ptr|s. |
- std::deque<MessageInTransit*> write_message_queue_; |
- size_t write_message_offset_; |
+ char* pending_read_buffer_; |
+ size_t pending_bytes_to_read_; |
+ |
+ // The following members are used on multiple threads and protected by |
+ // |write_lock()|: |
+ const char* pending_write_buffer_; |
+ size_t pending_bytes_to_write_; |
+ |
// This is used for posting tasks from write threads to the I/O thread. It |
// must only be accessed under |write_lock_|. The weak pointers it produces |
// are only used/invalidated on the I/O thread. |
@@ -101,20 +80,19 @@ RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, |
base::MessageLoopForIO* message_loop_for_io) |
: RawChannel(delegate, message_loop_for_io), |
fd_(handle.Pass()), |
- read_buffer_num_valid_bytes_(0), |
- write_stopped_(false), |
- write_message_offset_(0), |
+ pending_read_buffer_(NULL), |
+ pending_bytes_to_read_(0), |
+ pending_write_buffer_(NULL), |
+ pending_bytes_to_write_(0), |
weak_ptr_factory_(this) { |
- CHECK_EQ(RawChannel::message_loop_for_io()->type(), |
- base::MessageLoop::TYPE_IO); |
DCHECK(fd_.is_valid()); |
} |
RawChannelPosix::~RawChannelPosix() { |
- DCHECK(write_stopped_); |
- DCHECK(!fd_.is_valid()); |
+ DCHECK(!pending_read_buffer_); |
+ DCHECK(!pending_write_buffer_); |
- // No need to take the |write_lock_| here -- if there are still weak pointers |
+ // No need to take the |write_lock()| here -- if there are still weak pointers |
// outstanding, then we're hosed anyway (since we wouldn't be able to |
// invalidate them cleanly, since we might not be on the I/O thread). |
DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
@@ -124,7 +102,79 @@ RawChannelPosix::~RawChannelPosix() { |
DCHECK(!write_watcher_.get()); |
} |
-bool RawChannelPosix::Init() { |
+RawChannel::IOResult RawChannelPosix::Read(bool schedule_for_later, |
+ char* buffer, |
+ size_t bytes_to_read, |
+ size_t* bytes_read) { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
+ DCHECK(!pending_read_buffer_); |
+ |
+ if (!schedule_for_later) { |
+ ssize_t read_result = HANDLE_EINTR( |
+ read(fd_.get().fd, buffer, bytes_to_read)); |
+ |
+ if (read_result >= 0) { |
+ *bytes_read = static_cast<size_t>(read_result); |
+ return IO_SUCCEEDED; |
+ } else if (errno != EAGAIN && errno != EWOULDBLOCK) { |
+ PLOG(ERROR) << "read"; |
+ |
+ // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again. |
+ read_watcher_.reset(); |
+ |
+ return IO_FAILED; |
+ } |
+ } |
+ |
+ // Either |schedule_for_later| is true or |read()| above would block. |
+ pending_read_buffer_ = buffer; |
+ pending_bytes_to_read_ = bytes_to_read; |
+ |
+ return IO_PENDING; |
+} |
+ |
+RawChannel::IOResult RawChannelPosix::WriteNoLock(bool schedule_for_later, |
+ const char* buffer, |
+ size_t bytes_to_write, |
+ size_t* bytes_written) { |
+ write_lock().AssertAcquired(); |
+ |
+ DCHECK(!pending_write_buffer_); |
+ |
+ if (!schedule_for_later) { |
+ ssize_t write_result = HANDLE_EINTR( |
+ write(fd_.get().fd, buffer, bytes_to_write)); |
+ |
+ if (write_result >= 0) { |
+ *bytes_written = static_cast<size_t>(write_result); |
+ return IO_SUCCEEDED; |
+ } else if (errno != EAGAIN && errno != EWOULDBLOCK) { |
+ PLOG(ERROR) << "write of size " << bytes_to_write; |
+ return IO_FAILED; |
+ } |
+ } |
+ |
+ pending_write_buffer_ = buffer; |
+ pending_bytes_to_write_ = bytes_to_write; |
+ |
+ // Either |schedule_for_later| is true or |write()| above would block, set up |
+ // to wait for the FD to become writable. |
+ // If we're not on the I/O thread, we have to post a task to do this. |
+ if (base::MessageLoop::current() != message_loop_for_io()) { |
+ message_loop_for_io()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&RawChannelPosix::WaitToWrite, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ return IO_PENDING; |
+ } |
+ |
+ bool result = message_loop_for_io()->WatchFileDescriptor( |
+ fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
+ write_watcher_.get(), this); |
+ return result ? IO_PENDING : IO_FAILED; |
+} |
+ |
+bool RawChannelPosix::OnInit() { |
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
DCHECK(!read_watcher_.get()); |
@@ -132,9 +182,6 @@ bool RawChannelPosix::Init() { |
DCHECK(!write_watcher_.get()); |
write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
- // No need to take the lock. No one should be using us yet. |
- DCHECK(write_message_queue_.empty()); |
- |
if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, |
base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { |
// TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly |
@@ -148,256 +195,88 @@ bool RawChannelPosix::Init() { |
return true; |
} |
-void RawChannelPosix::Shutdown() { |
+void RawChannelPosix::OnShutdownNoLock( |
+ scoped_ptr<IOBufferPreserver> /* buffer_preserver */) { |
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
- |
- base::AutoLock locker(write_lock_); |
- if (!write_stopped_) |
- CancelPendingWritesNoLock(); |
+ write_lock().AssertAcquired(); |
read_watcher_.reset(); // This will stop watching (if necessary). |
write_watcher_.reset(); // This will stop watching (if necessary). |
+ pending_read_buffer_ = NULL; |
+ pending_bytes_to_read_ = 0; |
+ |
+ pending_write_buffer_ = NULL; |
+ pending_bytes_to_write_ = 0; |
+ |
DCHECK(fd_.is_valid()); |
fd_.reset(); |
weak_ptr_factory_.InvalidateWeakPtrs(); |
} |
-// Reminder: This must be thread-safe, and takes ownership of |message|. |
-bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) { |
- base::AutoLock locker(write_lock_); |
- if (write_stopped_) |
- return false; |
- |
- if (!write_message_queue_.empty()) { |
- write_message_queue_.push_back(message.release()); |
- return true; |
- } |
- |
- write_message_queue_.push_front(message.release()); |
- DCHECK_EQ(write_message_offset_, 0u); |
- bool result = WriteFrontMessageNoLock(); |
- DCHECK(result || write_message_queue_.empty()); |
- |
- if (!result) { |
- // Even if we're on the I/O thread, don't call |OnFatalError()| in the |
- // nested context. |
- message_loop_for_io()->PostTask( |
- FROM_HERE, |
- base::Bind(&RawChannelPosix::CallOnFatalError, |
- weak_ptr_factory_.GetWeakPtr(), |
- Delegate::FATAL_ERROR_FAILED_WRITE)); |
- } else if (!write_message_queue_.empty()) { |
- // Set up to wait for the FD to become writable. If we're not on the I/O |
- // thread, we have to post a task to do this. |
- if (base::MessageLoop::current() == message_loop_for_io()) { |
- WaitToWrite(); |
- } else { |
- message_loop_for_io()->PostTask( |
- FROM_HERE, |
- base::Bind(&RawChannelPosix::WaitToWrite, |
- weak_ptr_factory_.GetWeakPtr())); |
- } |
- } |
- |
- return result; |
-} |
- |
void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { |
DCHECK_EQ(fd, fd_.get().fd); |
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
- bool did_dispatch_message = false; |
- // Tracks the offset of the first undispatched message in |read_buffer_|. |
- // Currently, we copy data to ensure that this is zero at the beginning. |
- size_t read_buffer_start = 0; |
- for (;;) { |
- if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) |
- < kReadSize) { |
- // Use power-of-2 buffer sizes. |
- // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
- // maximum message size to whatever extent necessary). |
- // TODO(vtl): We may often be able to peek at the header and get the real |
- // required extra space (which may be much bigger than |kReadSize|). |
- size_t new_size = std::max(read_buffer_.size(), kReadSize); |
- while (new_size < |
- read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) |
- new_size *= 2; |
- |
- // TODO(vtl): It's suboptimal to zero out the fresh memory. |
- read_buffer_.resize(new_size, 0); |
- } |
- |
- ssize_t bytes_read = HANDLE_EINTR( |
- read(fd_.get().fd, |
- &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], |
- kReadSize)); |
- if (bytes_read < 0) { |
- if (errno != EAGAIN && errno != EWOULDBLOCK) { |
- PLOG(ERROR) << "read"; |
- |
- // Make sure that |OnFileCanReadWithoutBlocking()| won't be called |
- // again. |
- read_watcher_.reset(); |
- |
- CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); |
- return; |
- } |
- |
- break; |
- } |
- |
- read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); |
- |
- // Dispatch all the messages that we can. |
- size_t message_size; |
- // Note that we rely on short-circuit evaluation here: |
- // - |read_buffer_start| may be an invalid index into |read_buffer_| if |
- // |read_buffer_num_valid_bytes_| is zero. |
- // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
- // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
- // next read). |
- while (read_buffer_num_valid_bytes_ > 0 && |
- MessageInTransit::GetNextMessageSize( |
- &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_, |
- &message_size) && |
- read_buffer_num_valid_bytes_ >= message_size) { |
- MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, |
- &read_buffer_[read_buffer_start]); |
- DCHECK_EQ(message.main_buffer_size(), message_size); |
- |
- // Dispatch the message. |
- delegate()->OnReadMessage(message); |
- if (!read_watcher_.get()) { |
- // |Shutdown()| was called in |OnReadMessage()|. |
- // TODO(vtl): Add test for this case. |
- return; |
- } |
- did_dispatch_message = true; |
- |
- // Update our state. |
- read_buffer_start += message_size; |
- read_buffer_num_valid_bytes_ -= message_size; |
- } |
+ if (!pending_read_buffer_) |
+ return; |
- // If we dispatched any messages, stop reading for now (and let the message |
- // loop do its thing for another round). |
- // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only |
- // a single message. Risks: slower, more complex if we want to avoid lots of |
- // copying. ii. Keep reading until there's no more data and dispatch all the |
- // messages we can. Risks: starvation of other users of the message loop.) |
- if (did_dispatch_message) |
- break; |
+ char* temp_buffer = pending_read_buffer_; |
+ size_t temp_bytes_to_read = pending_bytes_to_read_; |
+ size_t bytes_read = 0; |
- // If we didn't max out |kReadSize|, stop reading for now. |
- if (static_cast<size_t>(bytes_read) < kReadSize) |
- break; |
- |
- // Else try to read some more.... |
- } |
+ pending_read_buffer_ = NULL; |
+ pending_bytes_to_read_ = 0; |
- // Move data back to start. |
- if (read_buffer_start > 0) { |
- if (read_buffer_num_valid_bytes_ > 0) { |
- memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], |
- read_buffer_num_valid_bytes_); |
- } |
- read_buffer_start = 0; |
- } |
+ IOResult result = Read(false, temp_buffer, temp_bytes_to_read, |
+ &bytes_read); |
+ if (result != IO_PENDING) |
+ OnReadCompleted(result == IO_SUCCEEDED, bytes_read); |
} |
void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { |
DCHECK_EQ(fd, fd_.get().fd); |
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
- bool did_fail = false; |
+ IOResult result = IO_FAILED; |
+ size_t bytes_written = 0; |
{ |
- base::AutoLock locker(write_lock_); |
- DCHECK_EQ(write_stopped_, write_message_queue_.empty()); |
+ base::AutoLock locker(write_lock()); |
- if (write_stopped_) { |
- write_watcher_.reset(); |
+ if (!pending_write_buffer_) |
return; |
- } |
- bool result = WriteFrontMessageNoLock(); |
- DCHECK(result || write_message_queue_.empty()); |
+ const char* temp_write_buffer = pending_write_buffer_; |
+ size_t temp_bytes_to_write = pending_bytes_to_write_; |
+ pending_write_buffer_ = NULL; |
+ pending_bytes_to_write_ = 0; |
- if (!result) { |
- did_fail = true; |
- write_watcher_.reset(); |
- } else if (!write_message_queue_.empty()) { |
- WaitToWrite(); |
- } |
+ result = WriteNoLock(false, temp_write_buffer, temp_bytes_to_write, |
+ &bytes_written); |
} |
- if (did_fail) |
- CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); |
+ |
+ if (result != IO_PENDING) |
+ OnWriteCompleted(result == IO_SUCCEEDED, bytes_written); |
} |
void RawChannelPosix::WaitToWrite() { |
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
DCHECK(write_watcher_.get()); |
- bool result = message_loop_for_io()->WatchFileDescriptor( |
- fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
- write_watcher_.get(), this); |
- DCHECK(result); |
-} |
-void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { |
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
- // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
- delegate()->OnFatalError(fatal_error); |
-} |
+ if (!message_loop_for_io()->WatchFileDescriptor( |
+ fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
+ write_watcher_.get(), this)) { |
+ { |
+ base::AutoLock locker(write_lock()); |
-bool RawChannelPosix::WriteFrontMessageNoLock() { |
- write_lock_.AssertAcquired(); |
- |
- DCHECK(!write_stopped_); |
- DCHECK(!write_message_queue_.empty()); |
- |
- MessageInTransit* message = write_message_queue_.front(); |
- DCHECK_LT(write_message_offset_, message->main_buffer_size()); |
- size_t bytes_to_write = message->main_buffer_size() - write_message_offset_; |
- ssize_t bytes_written = HANDLE_EINTR( |
- write(fd_.get().fd, |
- static_cast<const char*>(message->main_buffer()) + |
- write_message_offset_, |
- bytes_to_write)); |
- if (bytes_written < 0) { |
- if (errno != EAGAIN && errno != EWOULDBLOCK) { |
- PLOG(ERROR) << "write of size " << bytes_to_write; |
- CancelPendingWritesNoLock(); |
- return false; |
+ DCHECK(pending_write_buffer_); |
+ pending_write_buffer_ = NULL; |
+ pending_bytes_to_write_ = 0; |
} |
- |
- // We simply failed to write since we'd block. The logic is the same as if |
- // we got a partial write. |
- bytes_written = 0; |
- } |
- |
- DCHECK_GE(bytes_written, 0); |
- if (static_cast<size_t>(bytes_written) < bytes_to_write) { |
- // Partial (or no) write. |
- write_message_offset_ += static_cast<size_t>(bytes_written); |
- } else { |
- // Complete write. |
- DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); |
- write_message_queue_.pop_front(); |
- delete message; |
- write_message_offset_ = 0; |
+ OnWriteCompleted(false, 0); |
} |
- |
- return true; |
-} |
- |
-void RawChannelPosix::CancelPendingWritesNoLock() { |
- write_lock_.AssertAcquired(); |
- DCHECK(!write_stopped_); |
- |
- write_stopped_ = true; |
- STLDeleteElements(&write_message_queue_); |
} |
} // namespace |