| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/system/raw_channel.h" | 5 #include "mojo/system/raw_channel.h" |
| 6 | 6 |
| 7 #include <errno.h> | 7 #include <errno.h> |
| 8 #include <string.h> | |
| 9 #include <unistd.h> | 8 #include <unistd.h> |
| 10 | 9 |
| 11 #include <algorithm> | |
| 12 #include <deque> | |
| 13 #include <vector> | |
| 14 | |
| 15 #include "base/basictypes.h" | 10 #include "base/basictypes.h" |
| 16 #include "base/bind.h" | 11 #include "base/bind.h" |
| 17 #include "base/compiler_specific.h" | 12 #include "base/compiler_specific.h" |
| 18 #include "base/location.h" | 13 #include "base/location.h" |
| 19 #include "base/logging.h" | 14 #include "base/logging.h" |
| 20 #include "base/memory/scoped_ptr.h" | 15 #include "base/memory/scoped_ptr.h" |
| 21 #include "base/memory/weak_ptr.h" | 16 #include "base/memory/weak_ptr.h" |
| 22 #include "base/message_loop/message_loop.h" | 17 #include "base/message_loop/message_loop.h" |
| 23 #include "base/posix/eintr_wrapper.h" | 18 #include "base/posix/eintr_wrapper.h" |
| 24 #include "base/stl_util.h" | |
| 25 #include "base/synchronization/lock.h" | 19 #include "base/synchronization/lock.h" |
| 26 #include "mojo/system/embedder/platform_handle.h" | 20 #include "mojo/system/embedder/platform_handle.h" |
| 27 #include "mojo/system/message_in_transit.h" | |
| 28 | 21 |
| 29 namespace mojo { | 22 namespace mojo { |
| 30 namespace system { | 23 namespace system { |
| 31 | 24 |
| 32 namespace { | 25 namespace { |
| 33 | 26 |
| 34 const size_t kReadSize = 4096; | |
| 35 | |
| 36 class RawChannelPosix : public RawChannel, | 27 class RawChannelPosix : public RawChannel, |
| 37 public base::MessageLoopForIO::Watcher { | 28 public base::MessageLoopForIO::Watcher { |
| 38 public: | 29 public: |
| 39 RawChannelPosix(embedder::ScopedPlatformHandle handle, | 30 RawChannelPosix(embedder::ScopedPlatformHandle handle, |
| 40 Delegate* delegate, | 31 Delegate* delegate, |
| 41 base::MessageLoopForIO* message_loop_for_io); | 32 base::MessageLoopForIO* message_loop_for_io); |
| 42 virtual ~RawChannelPosix(); | 33 virtual ~RawChannelPosix(); |
| 43 | 34 |
| 35 private: |
| 44 // |RawChannel| implementation: | 36 // |RawChannel| implementation: |
| 45 virtual bool Init() OVERRIDE; | 37 virtual IOResult Read(bool schedule_for_later, |
| 46 virtual void Shutdown() OVERRIDE; | 38 char* buffer, |
| 47 virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; | 39 size_t bytes_to_read, |
| 40 size_t* bytes_read) OVERRIDE; |
| 41 virtual IOResult WriteNoLock(bool schedule_for_later, |
| 42 const char* buffer, |
| 43 size_t bytes_to_write, |
| 44 size_t* bytes_written) OVERRIDE; |
| 45 virtual bool OnInit() OVERRIDE; |
| 46 virtual void OnShutdownNoLock( |
| 47 scoped_ptr<IOBufferPreserver> buffer_preserver) OVERRIDE; |
| 48 | 48 |
| 49 private: | |
| 50 // |base::MessageLoopForIO::Watcher| implementation: | 49 // |base::MessageLoopForIO::Watcher| implementation: |
| 51 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; | 50 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; |
| 52 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; | 51 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; |
| 53 | 52 |
| 54 // Watches for |fd_| to become writable. Must be called on the I/O thread. | 53 // Watches for |fd_| to become writable. Must be called on the I/O thread. |
| 55 void WaitToWrite(); | 54 void WaitToWrite(); |
| 56 | 55 |
| 57 // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O | |
| 58 // thread WITHOUT |write_lock_| held. | |
| 59 void CallOnFatalError(Delegate::FatalError fatal_error); | |
| 60 | |
| 61 // Writes the message at the front of |write_message_queue_|, starting at | |
| 62 // |write_message_offset_|. It removes and destroys if the write completes and | |
| 63 // otherwise updates |write_message_offset_|. Returns true on success. Must be | |
| 64 // called under |write_lock_|. | |
| 65 bool WriteFrontMessageNoLock(); | |
| 66 | |
| 67 // Cancels all pending writes and destroys the contents of | |
| 68 // |write_message_queue_|. Should only be called if |write_stopped_| is false; | |
| 69 // sets |write_stopped_| to true. Must be called under |write_lock_|. | |
| 70 void CancelPendingWritesNoLock(); | |
| 71 | |
| 72 embedder::ScopedPlatformHandle fd_; | 56 embedder::ScopedPlatformHandle fd_; |
| 73 | 57 |
| 74 // Only used on the I/O thread: | 58 // The following members are only used on the I/O thread: |
| 75 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; | 59 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; |
| 76 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; | 60 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; |
| 77 | 61 |
| 78 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| | 62 char* pending_read_buffer_; |
| 79 // is always aligned with a message boundary (we will copy memory to ensure | 63 size_t pending_bytes_to_read_; |
| 80 // this), but |read_buffer_| may be larger than the actual number of bytes we | |
| 81 // have. | |
| 82 std::vector<char> read_buffer_; | |
| 83 size_t read_buffer_num_valid_bytes_; | |
| 84 | 64 |
| 85 base::Lock write_lock_; // Protects the following members. | 65 // The following members are used on multiple threads and protected by |
| 86 bool write_stopped_; | 66 // |write_lock()|: |
| 87 // TODO(vtl): When C++11 is available, switch this to a deque of | 67 const char* pending_write_buffer_; |
| 88 // |scoped_ptr|/|unique_ptr|s. | 68 size_t pending_bytes_to_write_; |
| 89 std::deque<MessageInTransit*> write_message_queue_; | 69 |
| 90 size_t write_message_offset_; | |
| 91 // This is used for posting tasks from write threads to the I/O thread. It | 70 // This is used for posting tasks from write threads to the I/O thread. It |
| 92 // must only be accessed under |write_lock_|. The weak pointers it produces | 71 // must only be accessed under |write_lock_|. The weak pointers it produces |
| 93 // are only used/invalidated on the I/O thread. | 72 // are only used/invalidated on the I/O thread. |
| 94 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; | 73 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; |
| 95 | 74 |
| 96 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); | 75 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); |
| 97 }; | 76 }; |
| 98 | 77 |
| 99 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, | 78 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, |
| 100 Delegate* delegate, | 79 Delegate* delegate, |
| 101 base::MessageLoopForIO* message_loop_for_io) | 80 base::MessageLoopForIO* message_loop_for_io) |
| 102 : RawChannel(delegate, message_loop_for_io), | 81 : RawChannel(delegate, message_loop_for_io), |
| 103 fd_(handle.Pass()), | 82 fd_(handle.Pass()), |
| 104 read_buffer_num_valid_bytes_(0), | 83 pending_read_buffer_(NULL), |
| 105 write_stopped_(false), | 84 pending_bytes_to_read_(0), |
| 106 write_message_offset_(0), | 85 pending_write_buffer_(NULL), |
| 86 pending_bytes_to_write_(0), |
| 107 weak_ptr_factory_(this) { | 87 weak_ptr_factory_(this) { |
| 108 CHECK_EQ(RawChannel::message_loop_for_io()->type(), | |
| 109 base::MessageLoop::TYPE_IO); | |
| 110 DCHECK(fd_.is_valid()); | 88 DCHECK(fd_.is_valid()); |
| 111 } | 89 } |
| 112 | 90 |
| 113 RawChannelPosix::~RawChannelPosix() { | 91 RawChannelPosix::~RawChannelPosix() { |
| 114 DCHECK(write_stopped_); | 92 DCHECK(!pending_read_buffer_); |
| 115 DCHECK(!fd_.is_valid()); | 93 DCHECK(!pending_write_buffer_); |
| 116 | 94 |
| 117 // No need to take the |write_lock_| here -- if there are still weak pointers | 95 // No need to take the |write_lock()| here -- if there are still weak pointers |
| 118 // outstanding, then we're hosed anyway (since we wouldn't be able to | 96 // outstanding, then we're hosed anyway (since we wouldn't be able to |
| 119 // invalidate them cleanly, since we might not be on the I/O thread). | 97 // invalidate them cleanly, since we might not be on the I/O thread). |
| 120 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); | 98 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
| 121 | 99 |
| 122 // These must have been shut down/destroyed on the I/O thread. | 100 // These must have been shut down/destroyed on the I/O thread. |
| 123 DCHECK(!read_watcher_.get()); | 101 DCHECK(!read_watcher_.get()); |
| 124 DCHECK(!write_watcher_.get()); | 102 DCHECK(!write_watcher_.get()); |
| 125 } | 103 } |
| 126 | 104 |
| 127 bool RawChannelPosix::Init() { | 105 RawChannel::IOResult RawChannelPosix::Read(bool schedule_for_later, |
| 106 char* buffer, |
| 107 size_t bytes_to_read, |
| 108 size_t* bytes_read) { |
| 109 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| 110 DCHECK(!pending_read_buffer_); |
| 111 |
| 112 if (!schedule_for_later) { |
| 113 ssize_t read_result = HANDLE_EINTR( |
| 114 read(fd_.get().fd, buffer, bytes_to_read)); |
| 115 |
| 116 if (read_result >= 0) { |
| 117 *bytes_read = static_cast<size_t>(read_result); |
| 118 return IO_SUCCEEDED; |
| 119 } else if (errno != EAGAIN && errno != EWOULDBLOCK) { |
| 120 PLOG(ERROR) << "read"; |
| 121 |
| 122 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again. |
| 123 read_watcher_.reset(); |
| 124 |
| 125 return IO_FAILED; |
| 126 } |
| 127 } |
| 128 |
| 129 // Either |schedule_for_later| is true or |read()| above would block. |
| 130 pending_read_buffer_ = buffer; |
| 131 pending_bytes_to_read_ = bytes_to_read; |
| 132 |
| 133 return IO_PENDING; |
| 134 } |
| 135 |
| 136 RawChannel::IOResult RawChannelPosix::WriteNoLock(bool schedule_for_later, |
| 137 const char* buffer, |
| 138 size_t bytes_to_write, |
| 139 size_t* bytes_written) { |
| 140 write_lock().AssertAcquired(); |
| 141 |
| 142 DCHECK(!pending_write_buffer_); |
| 143 |
| 144 if (!schedule_for_later) { |
| 145 ssize_t write_result = HANDLE_EINTR( |
| 146 write(fd_.get().fd, buffer, bytes_to_write)); |
| 147 |
| 148 if (write_result >= 0) { |
| 149 *bytes_written = static_cast<size_t>(write_result); |
| 150 return IO_SUCCEEDED; |
| 151 } else if (errno != EAGAIN && errno != EWOULDBLOCK) { |
| 152 PLOG(ERROR) << "write of size " << bytes_to_write; |
| 153 return IO_FAILED; |
| 154 } |
| 155 } |
| 156 |
| 157 pending_write_buffer_ = buffer; |
| 158 pending_bytes_to_write_ = bytes_to_write; |
| 159 |
| 160 // Either |schedule_for_later| is true or |write()| above would block, set up |
| 161 // to wait for the FD to become writable. |
| 162 // If we're not on the I/O thread, we have to post a task to do this. |
| 163 if (base::MessageLoop::current() != message_loop_for_io()) { |
| 164 message_loop_for_io()->PostTask( |
| 165 FROM_HERE, |
| 166 base::Bind(&RawChannelPosix::WaitToWrite, |
| 167 weak_ptr_factory_.GetWeakPtr())); |
| 168 return IO_PENDING; |
| 169 } |
| 170 |
| 171 bool result = message_loop_for_io()->WatchFileDescriptor( |
| 172 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
| 173 write_watcher_.get(), this); |
| 174 return result ? IO_PENDING : IO_FAILED; |
| 175 } |
| 176 |
| 177 bool RawChannelPosix::OnInit() { |
| 128 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 178 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| 129 | 179 |
| 130 DCHECK(!read_watcher_.get()); | 180 DCHECK(!read_watcher_.get()); |
| 131 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); | 181 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
| 132 DCHECK(!write_watcher_.get()); | 182 DCHECK(!write_watcher_.get()); |
| 133 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); | 183 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
| 134 | 184 |
| 135 // No need to take the lock. No one should be using us yet. | |
| 136 DCHECK(write_message_queue_.empty()); | |
| 137 | |
| 138 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, | 185 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, |
| 139 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { | 186 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { |
| 140 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly | 187 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly |
| 141 // (in the sense of returning the message loop's state to what it was before | 188 // (in the sense of returning the message loop's state to what it was before |
| 142 // it was called). | 189 // it was called). |
| 143 read_watcher_.reset(); | 190 read_watcher_.reset(); |
| 144 write_watcher_.reset(); | 191 write_watcher_.reset(); |
| 145 return false; | 192 return false; |
| 146 } | 193 } |
| 147 | 194 |
| 148 return true; | 195 return true; |
| 149 } | 196 } |
| 150 | 197 |
| 151 void RawChannelPosix::Shutdown() { | 198 void RawChannelPosix::OnShutdownNoLock( |
| 199 scoped_ptr<IOBufferPreserver> /* buffer_preserver */) { |
| 152 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 200 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| 153 | 201 write_lock().AssertAcquired(); |
| 154 base::AutoLock locker(write_lock_); | |
| 155 if (!write_stopped_) | |
| 156 CancelPendingWritesNoLock(); | |
| 157 | 202 |
| 158 read_watcher_.reset(); // This will stop watching (if necessary). | 203 read_watcher_.reset(); // This will stop watching (if necessary). |
| 159 write_watcher_.reset(); // This will stop watching (if necessary). | 204 write_watcher_.reset(); // This will stop watching (if necessary). |
| 160 | 205 |
| 206 pending_read_buffer_ = NULL; |
| 207 pending_bytes_to_read_ = 0; |
| 208 |
| 209 pending_write_buffer_ = NULL; |
| 210 pending_bytes_to_write_ = 0; |
| 211 |
| 161 DCHECK(fd_.is_valid()); | 212 DCHECK(fd_.is_valid()); |
| 162 fd_.reset(); | 213 fd_.reset(); |
| 163 | 214 |
| 164 weak_ptr_factory_.InvalidateWeakPtrs(); | 215 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 165 } | 216 } |
| 166 | 217 |
| 167 // Reminder: This must be thread-safe, and takes ownership of |message|. | |
| 168 bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) { | |
| 169 base::AutoLock locker(write_lock_); | |
| 170 if (write_stopped_) | |
| 171 return false; | |
| 172 | |
| 173 if (!write_message_queue_.empty()) { | |
| 174 write_message_queue_.push_back(message.release()); | |
| 175 return true; | |
| 176 } | |
| 177 | |
| 178 write_message_queue_.push_front(message.release()); | |
| 179 DCHECK_EQ(write_message_offset_, 0u); | |
| 180 bool result = WriteFrontMessageNoLock(); | |
| 181 DCHECK(result || write_message_queue_.empty()); | |
| 182 | |
| 183 if (!result) { | |
| 184 // Even if we're on the I/O thread, don't call |OnFatalError()| in the | |
| 185 // nested context. | |
| 186 message_loop_for_io()->PostTask( | |
| 187 FROM_HERE, | |
| 188 base::Bind(&RawChannelPosix::CallOnFatalError, | |
| 189 weak_ptr_factory_.GetWeakPtr(), | |
| 190 Delegate::FATAL_ERROR_FAILED_WRITE)); | |
| 191 } else if (!write_message_queue_.empty()) { | |
| 192 // Set up to wait for the FD to become writable. If we're not on the I/O | |
| 193 // thread, we have to post a task to do this. | |
| 194 if (base::MessageLoop::current() == message_loop_for_io()) { | |
| 195 WaitToWrite(); | |
| 196 } else { | |
| 197 message_loop_for_io()->PostTask( | |
| 198 FROM_HERE, | |
| 199 base::Bind(&RawChannelPosix::WaitToWrite, | |
| 200 weak_ptr_factory_.GetWeakPtr())); | |
| 201 } | |
| 202 } | |
| 203 | |
| 204 return result; | |
| 205 } | |
| 206 | |
| 207 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { | 218 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { |
| 208 DCHECK_EQ(fd, fd_.get().fd); | 219 DCHECK_EQ(fd, fd_.get().fd); |
| 209 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 220 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| 210 | 221 |
| 211 bool did_dispatch_message = false; | 222 if (!pending_read_buffer_) |
| 212 // Tracks the offset of the first undispatched message in |read_buffer_|. | 223 return; |
| 213 // Currently, we copy data to ensure that this is zero at the beginning. | |
| 214 size_t read_buffer_start = 0; | |
| 215 for (;;) { | |
| 216 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) | |
| 217 < kReadSize) { | |
| 218 // Use power-of-2 buffer sizes. | |
| 219 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the | |
| 220 // maximum message size to whatever extent necessary). | |
| 221 // TODO(vtl): We may often be able to peek at the header and get the real | |
| 222 // required extra space (which may be much bigger than |kReadSize|). | |
| 223 size_t new_size = std::max(read_buffer_.size(), kReadSize); | |
| 224 while (new_size < | |
| 225 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) | |
| 226 new_size *= 2; | |
| 227 | 224 |
| 228 // TODO(vtl): It's suboptimal to zero out the fresh memory. | 225 char* temp_buffer = pending_read_buffer_; |
| 229 read_buffer_.resize(new_size, 0); | 226 size_t temp_bytes_to_read = pending_bytes_to_read_; |
| 230 } | 227 size_t bytes_read = 0; |
| 231 | 228 |
| 232 ssize_t bytes_read = HANDLE_EINTR( | 229 pending_read_buffer_ = NULL; |
| 233 read(fd_.get().fd, | 230 pending_bytes_to_read_ = 0; |
| 234 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], | |
| 235 kReadSize)); | |
| 236 if (bytes_read < 0) { | |
| 237 if (errno != EAGAIN && errno != EWOULDBLOCK) { | |
| 238 PLOG(ERROR) << "read"; | |
| 239 | 231 |
| 240 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called | 232 IOResult result = Read(false, temp_buffer, temp_bytes_to_read, |
| 241 // again. | 233 &bytes_read); |
| 242 read_watcher_.reset(); | 234 if (result != IO_PENDING) |
| 243 | 235 OnReadCompleted(result == IO_SUCCEEDED, bytes_read); |
| 244 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); | |
| 245 return; | |
| 246 } | |
| 247 | |
| 248 break; | |
| 249 } | |
| 250 | |
| 251 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); | |
| 252 | |
| 253 // Dispatch all the messages that we can. | |
| 254 size_t message_size; | |
| 255 // Note that we rely on short-circuit evaluation here: | |
| 256 // - |read_buffer_start| may be an invalid index into |read_buffer_| if | |
| 257 // |read_buffer_num_valid_bytes_| is zero. | |
| 258 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | |
| 259 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | |
| 260 // next read). | |
| 261 while (read_buffer_num_valid_bytes_ > 0 && | |
| 262 MessageInTransit::GetNextMessageSize( | |
| 263 &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_, | |
| 264 &message_size) && | |
| 265 read_buffer_num_valid_bytes_ >= message_size) { | |
| 266 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, | |
| 267 &read_buffer_[read_buffer_start]); | |
| 268 DCHECK_EQ(message.main_buffer_size(), message_size); | |
| 269 | |
| 270 // Dispatch the message. | |
| 271 delegate()->OnReadMessage(message); | |
| 272 if (!read_watcher_.get()) { | |
| 273 // |Shutdown()| was called in |OnReadMessage()|. | |
| 274 // TODO(vtl): Add test for this case. | |
| 275 return; | |
| 276 } | |
| 277 did_dispatch_message = true; | |
| 278 | |
| 279 // Update our state. | |
| 280 read_buffer_start += message_size; | |
| 281 read_buffer_num_valid_bytes_ -= message_size; | |
| 282 } | |
| 283 | |
| 284 // If we dispatched any messages, stop reading for now (and let the message | |
| 285 // loop do its thing for another round). | |
| 286 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only | |
| 287 // a single message. Risks: slower, more complex if we want to avoid lots of | |
| 288 // copying. ii. Keep reading until there's no more data and dispatch all the | |
| 289 // messages we can. Risks: starvation of other users of the message loop.) | |
| 290 if (did_dispatch_message) | |
| 291 break; | |
| 292 | |
| 293 // If we didn't max out |kReadSize|, stop reading for now. | |
| 294 if (static_cast<size_t>(bytes_read) < kReadSize) | |
| 295 break; | |
| 296 | |
| 297 // Else try to read some more.... | |
| 298 } | |
| 299 | |
| 300 // Move data back to start. | |
| 301 if (read_buffer_start > 0) { | |
| 302 if (read_buffer_num_valid_bytes_ > 0) { | |
| 303 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], | |
| 304 read_buffer_num_valid_bytes_); | |
| 305 } | |
| 306 read_buffer_start = 0; | |
| 307 } | |
| 308 } | 236 } |
| 309 | 237 |
| 310 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { | 238 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { |
| 311 DCHECK_EQ(fd, fd_.get().fd); | 239 DCHECK_EQ(fd, fd_.get().fd); |
| 312 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 240 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| 313 | 241 |
| 314 bool did_fail = false; | 242 IOResult result = IO_FAILED; |
| 243 size_t bytes_written = 0; |
| 315 { | 244 { |
| 316 base::AutoLock locker(write_lock_); | 245 base::AutoLock locker(write_lock()); |
| 317 DCHECK_EQ(write_stopped_, write_message_queue_.empty()); | |
| 318 | 246 |
| 319 if (write_stopped_) { | 247 if (!pending_write_buffer_) |
| 320 write_watcher_.reset(); | |
| 321 return; | 248 return; |
| 322 } | |
| 323 | 249 |
| 324 bool result = WriteFrontMessageNoLock(); | 250 const char* temp_write_buffer = pending_write_buffer_; |
| 325 DCHECK(result || write_message_queue_.empty()); | 251 size_t temp_bytes_to_write = pending_bytes_to_write_; |
| 252 pending_write_buffer_ = NULL; |
| 253 pending_bytes_to_write_ = 0; |
| 326 | 254 |
| 327 if (!result) { | 255 result = WriteNoLock(false, temp_write_buffer, temp_bytes_to_write, |
| 328 did_fail = true; | 256 &bytes_written); |
| 329 write_watcher_.reset(); | |
| 330 } else if (!write_message_queue_.empty()) { | |
| 331 WaitToWrite(); | |
| 332 } | |
| 333 } | 257 } |
| 334 if (did_fail) | 258 |
| 335 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); | 259 if (result != IO_PENDING) |
| 260 OnWriteCompleted(result == IO_SUCCEEDED, bytes_written); |
| 336 } | 261 } |
| 337 | 262 |
| 338 void RawChannelPosix::WaitToWrite() { | 263 void RawChannelPosix::WaitToWrite() { |
| 339 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 264 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| 340 | 265 |
| 341 DCHECK(write_watcher_.get()); | 266 DCHECK(write_watcher_.get()); |
| 342 bool result = message_loop_for_io()->WatchFileDescriptor( | |
| 343 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, | |
| 344 write_watcher_.get(), this); | |
| 345 DCHECK(result); | |
| 346 } | |
| 347 | 267 |
| 348 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { | 268 if (!message_loop_for_io()->WatchFileDescriptor( |
| 349 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 269 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
| 350 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | 270 write_watcher_.get(), this)) { |
| 351 delegate()->OnFatalError(fatal_error); | 271 { |
| 352 } | 272 base::AutoLock locker(write_lock()); |
| 353 | 273 |
| 354 bool RawChannelPosix::WriteFrontMessageNoLock() { | 274 DCHECK(pending_write_buffer_); |
| 355 write_lock_.AssertAcquired(); | 275 pending_write_buffer_ = NULL; |
| 356 | 276 pending_bytes_to_write_ = 0; |
| 357 DCHECK(!write_stopped_); | |
| 358 DCHECK(!write_message_queue_.empty()); | |
| 359 | |
| 360 MessageInTransit* message = write_message_queue_.front(); | |
| 361 DCHECK_LT(write_message_offset_, message->main_buffer_size()); | |
| 362 size_t bytes_to_write = message->main_buffer_size() - write_message_offset_; | |
| 363 ssize_t bytes_written = HANDLE_EINTR( | |
| 364 write(fd_.get().fd, | |
| 365 static_cast<const char*>(message->main_buffer()) + | |
| 366 write_message_offset_, | |
| 367 bytes_to_write)); | |
| 368 if (bytes_written < 0) { | |
| 369 if (errno != EAGAIN && errno != EWOULDBLOCK) { | |
| 370 PLOG(ERROR) << "write of size " << bytes_to_write; | |
| 371 CancelPendingWritesNoLock(); | |
| 372 return false; | |
| 373 } | 277 } |
| 374 | 278 OnWriteCompleted(false, 0); |
| 375 // We simply failed to write since we'd block. The logic is the same as if | |
| 376 // we got a partial write. | |
| 377 bytes_written = 0; | |
| 378 } | 279 } |
| 379 | |
| 380 DCHECK_GE(bytes_written, 0); | |
| 381 if (static_cast<size_t>(bytes_written) < bytes_to_write) { | |
| 382 // Partial (or no) write. | |
| 383 write_message_offset_ += static_cast<size_t>(bytes_written); | |
| 384 } else { | |
| 385 // Complete write. | |
| 386 DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); | |
| 387 write_message_queue_.pop_front(); | |
| 388 delete message; | |
| 389 write_message_offset_ = 0; | |
| 390 } | |
| 391 | |
| 392 return true; | |
| 393 } | |
| 394 | |
| 395 void RawChannelPosix::CancelPendingWritesNoLock() { | |
| 396 write_lock_.AssertAcquired(); | |
| 397 DCHECK(!write_stopped_); | |
| 398 | |
| 399 write_stopped_ = true; | |
| 400 STLDeleteElements(&write_message_queue_); | |
| 401 } | 280 } |
| 402 | 281 |
| 403 } // namespace | 282 } // namespace |
| 404 | 283 |
| 405 // ----------------------------------------------------------------------------- | 284 // ----------------------------------------------------------------------------- |
| 406 | 285 |
| 407 // Static factory method declared in raw_channel.h. | 286 // Static factory method declared in raw_channel.h. |
| 408 // static | 287 // static |
| 409 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, | 288 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, |
| 410 Delegate* delegate, | 289 Delegate* delegate, |
| 411 base::MessageLoopForIO* message_loop_for_io) { | 290 base::MessageLoopForIO* message_loop_for_io) { |
| 412 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io); | 291 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io); |
| 413 } | 292 } |
| 414 | 293 |
| 415 } // namespace system | 294 } // namespace system |
| 416 } // namespace mojo | 295 } // namespace mojo |
| OLD | NEW |