Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/system/raw_channel.h" | 5 #include "mojo/system/raw_channel.h" |
| 6 | 6 |
| 7 #include <errno.h> | 7 #include <errno.h> |
| 8 #include <string.h> | |
| 9 #include <unistd.h> | 8 #include <unistd.h> |
| 10 | 9 |
| 11 #include <algorithm> | |
| 12 #include <deque> | |
| 13 #include <vector> | |
| 14 | |
| 15 #include "base/basictypes.h" | 10 #include "base/basictypes.h" |
| 16 #include "base/bind.h" | 11 #include "base/bind.h" |
| 17 #include "base/compiler_specific.h" | 12 #include "base/compiler_specific.h" |
| 18 #include "base/location.h" | 13 #include "base/location.h" |
| 19 #include "base/logging.h" | 14 #include "base/logging.h" |
| 20 #include "base/memory/scoped_ptr.h" | 15 #include "base/memory/scoped_ptr.h" |
| 21 #include "base/memory/weak_ptr.h" | 16 #include "base/memory/weak_ptr.h" |
| 22 #include "base/message_loop/message_loop.h" | 17 #include "base/message_loop/message_loop.h" |
| 23 #include "base/posix/eintr_wrapper.h" | 18 #include "base/posix/eintr_wrapper.h" |
| 24 #include "base/stl_util.h" | |
| 25 #include "base/synchronization/lock.h" | 19 #include "base/synchronization/lock.h" |
| 26 #include "mojo/system/embedder/platform_handle.h" | 20 #include "mojo/system/embedder/platform_handle.h" |
| 27 #include "mojo/system/message_in_transit.h" | |
| 28 | 21 |
| 29 namespace mojo { | 22 namespace mojo { |
| 30 namespace system { | 23 namespace system { |
| 31 | 24 |
| 32 namespace { | 25 namespace { |
| 33 | 26 |
| 34 const size_t kReadSize = 4096; | |
| 35 | |
| 36 class RawChannelPosix : public RawChannel, | 27 class RawChannelPosix : public RawChannel, |
| 37 public base::MessageLoopForIO::Watcher { | 28 public base::MessageLoopForIO::Watcher { |
| 38 public: | 29 public: |
| 39 RawChannelPosix(embedder::ScopedPlatformHandle handle, | 30 RawChannelPosix(embedder::ScopedPlatformHandle handle, |
| 40 Delegate* delegate, | 31 Delegate* delegate, |
| 41 base::MessageLoopForIO* message_loop_for_io); | 32 base::MessageLoopForIO* message_loop_for_io); |
| 42 virtual ~RawChannelPosix(); | 33 virtual ~RawChannelPosix(); |
| 43 | 34 |
| 35 private: | |
| 44 // |RawChannel| implementation: | 36 // |RawChannel| implementation: |
| 45 virtual bool Init() OVERRIDE; | 37 virtual IOResult Read(size_t* bytes_read) OVERRIDE; |
| 46 virtual void Shutdown() OVERRIDE; | 38 virtual IOResult ScheduleRead() OVERRIDE; |
| 47 virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; | 39 virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE; |
| 40 virtual IOResult ScheduleWriteNoLock() OVERRIDE; | |
| 41 virtual bool OnInit() OVERRIDE; | |
| 42 virtual void OnShutdownNoLock( | |
| 43 scoped_ptr<ReadBuffer> read_buffer, | |
| 44 scoped_ptr<WriteBuffer> write_buffer) OVERRIDE; | |
| 48 | 45 |
| 49 private: | |
| 50 // |base::MessageLoopForIO::Watcher| implementation: | 46 // |base::MessageLoopForIO::Watcher| implementation: |
| 51 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; | 47 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; |
| 52 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; | 48 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; |
| 53 | 49 |
| 54 // Watches for |fd_| to become writable. Must be called on the I/O thread. | 50 // Watches for |fd_| to become writable. Must be called on the I/O thread. |
| 55 void WaitToWrite(); | 51 void WaitToWrite(); |
| 56 | 52 |
| 57 // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O | |
| 58 // thread WITHOUT |write_lock_| held. | |
| 59 void CallOnFatalError(Delegate::FatalError fatal_error); | |
| 60 | |
| 61 // Writes the message at the front of |write_message_queue_|, starting at | |
| 62 // |write_message_offset_|. It removes and destroys if the write completes and | |
| 63 // otherwise updates |write_message_offset_|. Returns true on success. Must be | |
| 64 // called under |write_lock_|. | |
| 65 bool WriteFrontMessageNoLock(); | |
| 66 | |
| 67 // Cancels all pending writes and destroys the contents of | |
| 68 // |write_message_queue_|. Should only be called if |write_stopped_| is false; | |
| 69 // sets |write_stopped_| to true. Must be called under |write_lock_|. | |
| 70 void CancelPendingWritesNoLock(); | |
| 71 | |
| 72 embedder::ScopedPlatformHandle fd_; | 53 embedder::ScopedPlatformHandle fd_; |
| 73 | 54 |
| 74 // Only used on the I/O thread: | 55 // The following members are only used on the I/O thread: |
| 75 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; | 56 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; |
| 76 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; | 57 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; |
| 77 | 58 |
| 78 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| | 59 bool pending_read_; |
| 79 // is always aligned with a message boundary (we will copy memory to ensure | |
| 80 // this), but |read_buffer_| may be larger than the actual number of bytes we | |
| 81 // have. | |
| 82 std::vector<char> read_buffer_; | |
| 83 size_t read_buffer_num_valid_bytes_; | |
| 84 | 60 |
| 85 base::Lock write_lock_; // Protects the following members. | 61 // The following members are used on multiple threads and protected by |
| 86 bool write_stopped_; | 62 // |write_lock()|: |
| 87 // TODO(vtl): When C++11 is available, switch this to a deque of | 63 bool pending_write_; |
| 88 // |scoped_ptr|/|unique_ptr|s. | 64 |
| 89 std::deque<MessageInTransit*> write_message_queue_; | |
| 90 size_t write_message_offset_; | |
| 91 // This is used for posting tasks from write threads to the I/O thread. It | 65 // This is used for posting tasks from write threads to the I/O thread. It |
| 92 // must only be accessed under |write_lock_|. The weak pointers it produces | 66 // must only be accessed under |write_lock_|. The weak pointers it produces |
| 93 // are only used/invalidated on the I/O thread. | 67 // are only used/invalidated on the I/O thread. |
| 94 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; | 68 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; |
| 95 | 69 |
| 96 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); | 70 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); |
| 97 }; | 71 }; |
| 98 | 72 |
| 99 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, | 73 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, |
| 100 Delegate* delegate, | 74 Delegate* delegate, |
| 101 base::MessageLoopForIO* message_loop_for_io) | 75 base::MessageLoopForIO* message_loop_for_io) |
| 102 : RawChannel(delegate, message_loop_for_io), | 76 : RawChannel(delegate, message_loop_for_io), |
| 103 fd_(handle.Pass()), | 77 fd_(handle.Pass()), |
| 104 read_buffer_num_valid_bytes_(0), | 78 pending_read_(false), |
| 105 write_stopped_(false), | 79 pending_write_(false), |
| 106 write_message_offset_(0), | |
| 107 weak_ptr_factory_(this) { | 80 weak_ptr_factory_(this) { |
| 108 CHECK_EQ(RawChannel::message_loop_for_io()->type(), | |
| 109 base::MessageLoop::TYPE_IO); | |
| 110 DCHECK(fd_.is_valid()); | 81 DCHECK(fd_.is_valid()); |
| 111 } | 82 } |
| 112 | 83 |
| 113 RawChannelPosix::~RawChannelPosix() { | 84 RawChannelPosix::~RawChannelPosix() { |
| 114 DCHECK(write_stopped_); | 85 DCHECK(!pending_read_); |
| 115 DCHECK(!fd_.is_valid()); | 86 DCHECK(!pending_write_); |
| 116 | 87 |
| 117 // No need to take the |write_lock_| here -- if there are still weak pointers | 88 // No need to take the |write_lock()| here -- if there are still weak pointers |
| 118 // outstanding, then we're hosed anyway (since we wouldn't be able to | 89 // outstanding, then we're hosed anyway (since we wouldn't be able to |
| 119 // invalidate them cleanly, since we might not be on the I/O thread). | 90 // invalidate them cleanly, since we might not be on the I/O thread). |
| 120 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); | 91 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
| 121 | 92 |
| 122 // These must have been shut down/destroyed on the I/O thread. | 93 // These must have been shut down/destroyed on the I/O thread. |
| 123 DCHECK(!read_watcher_.get()); | 94 DCHECK(!read_watcher_.get()); |
| 124 DCHECK(!write_watcher_.get()); | 95 DCHECK(!write_watcher_.get()); |
| 125 } | 96 } |
| 126 | 97 |
| 127 bool RawChannelPosix::Init() { | 98 RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { |
| 99 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | |
| 100 DCHECK(!pending_read_); | |
| 101 | |
| 102 ssize_t read_result = HANDLE_EINTR( | |
| 103 read(fd_.get().fd, read_buffer()->GetPosition(), | |
| 104 read_buffer()->GetBytesToRead())); | |
| 105 | |
| 106 if (read_result >= 0) { | |
| 107 *bytes_read = static_cast<size_t>(read_result); | |
| 108 return IO_SUCCEEDED; | |
| 109 } else if (errno != EAGAIN && errno != EWOULDBLOCK) { | |
| 110 PLOG(ERROR) << "read"; | |
| 111 | |
| 112 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again. | |
| 113 read_watcher_.reset(); | |
| 114 | |
| 115 return IO_FAILED; | |
| 116 } | |
| 117 | |
| 118 return ScheduleRead(); | |
| 119 } | |
| 120 | |
| 121 RawChannel::IOResult RawChannelPosix::ScheduleRead() { | |
| 122 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | |
| 123 DCHECK(!pending_read_); | |
| 124 | |
| 125 pending_read_ = true; | |
| 126 | |
| 127 return IO_PENDING; | |
| 128 } | |
| 129 | |
| 130 RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) { | |
| 131 write_lock().AssertAcquired(); | |
| 132 | |
| 133 DCHECK(!pending_write_); | |
| 134 | |
| 135 ssize_t write_result = HANDLE_EINTR( | |
| 136 write(fd_.get().fd, write_buffer()->GetPosition(), | |
| 137 write_buffer()->GetBytesToWrite())); | |
| 138 | |
| 139 if (write_result >= 0) { | |
| 140 *bytes_written = static_cast<size_t>(write_result); | |
| 141 return IO_SUCCEEDED; | |
| 142 } else if (errno != EAGAIN && errno != EWOULDBLOCK) { | |
| 143 PLOG(ERROR) << "write of size " << write_buffer()->GetBytesToWrite(); | |
| 144 return IO_FAILED; | |
| 145 } | |
| 146 | |
| 147 return ScheduleWriteNoLock(); | |
| 148 } | |
| 149 | |
| 150 RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() { | |
| 151 write_lock().AssertAcquired(); | |
| 152 | |
| 153 DCHECK(!pending_write_); | |
| 154 | |
| 155 // Set up to wait for the FD to become writable. | |
| 156 // If we're not on the I/O thread, we have to post a task to do this. | |
| 157 if (base::MessageLoop::current() != message_loop_for_io()) { | |
| 158 message_loop_for_io()->PostTask( | |
| 159 FROM_HERE, | |
| 160 base::Bind(&RawChannelPosix::WaitToWrite, | |
| 161 weak_ptr_factory_.GetWeakPtr())); | |
| 162 pending_write_ = true; | |
| 163 return IO_PENDING; | |
| 164 } | |
| 165 | |
| 166 if (message_loop_for_io()->WatchFileDescriptor( | |
| 167 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, | |
| 168 write_watcher_.get(), this)) { | |
| 169 pending_write_ = true; | |
| 170 return IO_PENDING; | |
| 171 } | |
| 172 | |
| 173 return IO_FAILED; | |
| 174 } | |
| 175 | |
| 176 bool RawChannelPosix::OnInit() { | |
| 128 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 177 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| 129 | 178 |
| 130 DCHECK(!read_watcher_.get()); | 179 DCHECK(!read_watcher_.get()); |
| 131 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); | 180 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
| 132 DCHECK(!write_watcher_.get()); | 181 DCHECK(!write_watcher_.get()); |
| 133 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); | 182 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
| 134 | 183 |
| 135 // No need to take the lock. No one should be using us yet. | |
| 136 DCHECK(write_message_queue_.empty()); | |
| 137 | |
| 138 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, | 184 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, |
| 139 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { | 185 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { |
| 140 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly | 186 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly |
| 141 // (in the sense of returning the message loop's state to what it was before | 187 // (in the sense of returning the message loop's state to what it was before |
| 142 // it was called). | 188 // it was called). |
| 143 read_watcher_.reset(); | 189 read_watcher_.reset(); |
| 144 write_watcher_.reset(); | 190 write_watcher_.reset(); |
| 145 return false; | 191 return false; |
| 146 } | 192 } |
| 147 | 193 |
| 148 return true; | 194 return true; |
| 149 } | 195 } |
| 150 | 196 |
| 151 void RawChannelPosix::Shutdown() { | 197 void RawChannelPosix::OnShutdownNoLock( |
| 198 scoped_ptr<ReadBuffer> /* read_buffer */, | |
|
viettrungluu
2014/02/26 23:03:39
nit: In this directory, I've mostly left out space
yzshen1
2014/02/27 02:00:30
Done.
| |
| 199 scoped_ptr<WriteBuffer> /* write_buffer */) { | |
| 152 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 200 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| 153 | 201 write_lock().AssertAcquired(); |
| 154 base::AutoLock locker(write_lock_); | |
| 155 if (!write_stopped_) | |
| 156 CancelPendingWritesNoLock(); | |
| 157 | 202 |
| 158 read_watcher_.reset(); // This will stop watching (if necessary). | 203 read_watcher_.reset(); // This will stop watching (if necessary). |
| 159 write_watcher_.reset(); // This will stop watching (if necessary). | 204 write_watcher_.reset(); // This will stop watching (if necessary). |
| 160 | 205 |
| 206 pending_read_ = false; | |
| 207 pending_write_ = false; | |
| 208 | |
| 161 DCHECK(fd_.is_valid()); | 209 DCHECK(fd_.is_valid()); |
| 162 fd_.reset(); | 210 fd_.reset(); |
| 163 | 211 |
| 164 weak_ptr_factory_.InvalidateWeakPtrs(); | 212 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 165 } | 213 } |
| 166 | 214 |
| 167 // Reminder: This must be thread-safe, and takes ownership of |message|. | |
| 168 bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) { | |
| 169 base::AutoLock locker(write_lock_); | |
| 170 if (write_stopped_) | |
| 171 return false; | |
| 172 | |
| 173 if (!write_message_queue_.empty()) { | |
| 174 write_message_queue_.push_back(message.release()); | |
| 175 return true; | |
| 176 } | |
| 177 | |
| 178 write_message_queue_.push_front(message.release()); | |
| 179 DCHECK_EQ(write_message_offset_, 0u); | |
| 180 bool result = WriteFrontMessageNoLock(); | |
| 181 DCHECK(result || write_message_queue_.empty()); | |
| 182 | |
| 183 if (!result) { | |
| 184 // Even if we're on the I/O thread, don't call |OnFatalError()| in the | |
| 185 // nested context. | |
| 186 message_loop_for_io()->PostTask( | |
| 187 FROM_HERE, | |
| 188 base::Bind(&RawChannelPosix::CallOnFatalError, | |
| 189 weak_ptr_factory_.GetWeakPtr(), | |
| 190 Delegate::FATAL_ERROR_FAILED_WRITE)); | |
| 191 } else if (!write_message_queue_.empty()) { | |
| 192 // Set up to wait for the FD to become writable. If we're not on the I/O | |
| 193 // thread, we have to post a task to do this. | |
| 194 if (base::MessageLoop::current() == message_loop_for_io()) { | |
| 195 WaitToWrite(); | |
| 196 } else { | |
| 197 message_loop_for_io()->PostTask( | |
| 198 FROM_HERE, | |
| 199 base::Bind(&RawChannelPosix::WaitToWrite, | |
| 200 weak_ptr_factory_.GetWeakPtr())); | |
| 201 } | |
| 202 } | |
| 203 | |
| 204 return result; | |
| 205 } | |
| 206 | |
| 207 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { | 215 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { |
| 208 DCHECK_EQ(fd, fd_.get().fd); | 216 DCHECK_EQ(fd, fd_.get().fd); |
| 209 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 217 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| 210 | 218 |
| 211 bool did_dispatch_message = false; | 219 if (!pending_read_) |
| 212 // Tracks the offset of the first undispatched message in |read_buffer_|. | 220 return; |
|
viettrungluu
2014/02/26 23:03:39
You should probably DCHECK that read_waiter_ is nu
yzshen1
2014/02/27 02:00:30
Thanks! :)
| |
| 213 // Currently, we copy data to ensure that this is zero at the beginning. | |
| 214 size_t read_buffer_start = 0; | |
| 215 for (;;) { | |
| 216 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) | |
| 217 < kReadSize) { | |
| 218 // Use power-of-2 buffer sizes. | |
| 219 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the | |
| 220 // maximum message size to whatever extent necessary). | |
| 221 // TODO(vtl): We may often be able to peek at the header and get the real | |
| 222 // required extra space (which may be much bigger than |kReadSize|). | |
| 223 size_t new_size = std::max(read_buffer_.size(), kReadSize); | |
| 224 while (new_size < | |
| 225 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) | |
| 226 new_size *= 2; | |
| 227 | 221 |
| 228 // TODO(vtl): It's suboptimal to zero out the fresh memory. | 222 pending_read_ = false; |
| 229 read_buffer_.resize(new_size, 0); | 223 size_t bytes_read = 0; |
| 230 } | 224 IOResult result = Read(&bytes_read); |
| 231 | 225 if (result != IO_PENDING) |
| 232 ssize_t bytes_read = HANDLE_EINTR( | 226 OnReadCompleted(result == IO_SUCCEEDED, bytes_read); |
| 233 read(fd_.get().fd, | |
| 234 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], | |
| 235 kReadSize)); | |
| 236 if (bytes_read < 0) { | |
| 237 if (errno != EAGAIN && errno != EWOULDBLOCK) { | |
| 238 PLOG(ERROR) << "read"; | |
| 239 | |
| 240 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called | |
| 241 // again. | |
| 242 read_watcher_.reset(); | |
| 243 | |
| 244 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); | |
| 245 return; | |
| 246 } | |
| 247 | |
| 248 break; | |
| 249 } | |
| 250 | |
| 251 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); | |
| 252 | |
| 253 // Dispatch all the messages that we can. | |
| 254 size_t message_size; | |
| 255 // Note that we rely on short-circuit evaluation here: | |
| 256 // - |read_buffer_start| may be an invalid index into |read_buffer_| if | |
| 257 // |read_buffer_num_valid_bytes_| is zero. | |
| 258 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | |
| 259 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | |
| 260 // next read). | |
| 261 while (read_buffer_num_valid_bytes_ > 0 && | |
| 262 MessageInTransit::GetNextMessageSize( | |
| 263 &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_, | |
| 264 &message_size) && | |
| 265 read_buffer_num_valid_bytes_ >= message_size) { | |
| 266 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, | |
| 267 &read_buffer_[read_buffer_start]); | |
| 268 DCHECK_EQ(message.main_buffer_size(), message_size); | |
| 269 | |
| 270 // Dispatch the message. | |
| 271 delegate()->OnReadMessage(message); | |
| 272 if (!read_watcher_.get()) { | |
| 273 // |Shutdown()| was called in |OnReadMessage()|. | |
| 274 // TODO(vtl): Add test for this case. | |
| 275 return; | |
| 276 } | |
| 277 did_dispatch_message = true; | |
| 278 | |
| 279 // Update our state. | |
| 280 read_buffer_start += message_size; | |
| 281 read_buffer_num_valid_bytes_ -= message_size; | |
| 282 } | |
| 283 | |
| 284 // If we dispatched any messages, stop reading for now (and let the message | |
| 285 // loop do its thing for another round). | |
| 286 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only | |
| 287 // a single message. Risks: slower, more complex if we want to avoid lots of | |
| 288 // copying. ii. Keep reading until there's no more data and dispatch all the | |
| 289 // messages we can. Risks: starvation of other users of the message loop.) | |
| 290 if (did_dispatch_message) | |
| 291 break; | |
| 292 | |
| 293 // If we didn't max out |kReadSize|, stop reading for now. | |
| 294 if (static_cast<size_t>(bytes_read) < kReadSize) | |
| 295 break; | |
| 296 | |
| 297 // Else try to read some more.... | |
| 298 } | |
| 299 | |
| 300 // Move data back to start. | |
| 301 if (read_buffer_start > 0) { | |
| 302 if (read_buffer_num_valid_bytes_ > 0) { | |
| 303 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], | |
| 304 read_buffer_num_valid_bytes_); | |
| 305 } | |
| 306 read_buffer_start = 0; | |
| 307 } | |
| 308 } | 227 } |
| 309 | 228 |
| 310 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { | 229 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { |
| 311 DCHECK_EQ(fd, fd_.get().fd); | 230 DCHECK_EQ(fd, fd_.get().fd); |
| 312 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 231 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| 313 | 232 |
| 314 bool did_fail = false; | 233 IOResult result = IO_FAILED; |
| 234 size_t bytes_written = 0; | |
| 315 { | 235 { |
| 316 base::AutoLock locker(write_lock_); | 236 base::AutoLock locker(write_lock()); |
| 317 DCHECK_EQ(write_stopped_, write_message_queue_.empty()); | |
| 318 | 237 |
| 319 if (write_stopped_) { | 238 if (!pending_write_) |
| 320 write_watcher_.reset(); | |
| 321 return; | 239 return; |
|
viettrungluu
2014/02/26 23:03:39
"
(though the situation for pending_write_ isn't
yzshen1
2014/02/27 02:00:30
Right. I made it a DCHECK().
| |
| 322 } | |
| 323 | 240 |
| 324 bool result = WriteFrontMessageNoLock(); | 241 pending_write_ = false; |
| 325 DCHECK(result || write_message_queue_.empty()); | 242 result = WriteNoLock(&bytes_written); |
| 243 } | |
| 326 | 244 |
| 327 if (!result) { | 245 if (result != IO_PENDING) |
| 328 did_fail = true; | 246 OnWriteCompleted(result == IO_SUCCEEDED, bytes_written); |
| 329 write_watcher_.reset(); | |
| 330 } else if (!write_message_queue_.empty()) { | |
| 331 WaitToWrite(); | |
| 332 } | |
| 333 } | |
| 334 if (did_fail) | |
| 335 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); | |
| 336 } | 247 } |
| 337 | 248 |
| 338 void RawChannelPosix::WaitToWrite() { | 249 void RawChannelPosix::WaitToWrite() { |
| 339 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 250 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| 340 | 251 |
| 341 DCHECK(write_watcher_.get()); | 252 DCHECK(write_watcher_.get()); |
| 342 bool result = message_loop_for_io()->WatchFileDescriptor( | |
| 343 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, | |
| 344 write_watcher_.get(), this); | |
| 345 DCHECK(result); | |
| 346 } | |
| 347 | 253 |
| 348 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { | 254 if (!message_loop_for_io()->WatchFileDescriptor( |
| 349 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 255 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
| 350 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | 256 write_watcher_.get(), this)) { |
| 351 delegate()->OnFatalError(fatal_error); | 257 { |
| 352 } | 258 base::AutoLock locker(write_lock()); |
| 353 | 259 |
| 354 bool RawChannelPosix::WriteFrontMessageNoLock() { | 260 DCHECK(pending_write_); |
| 355 write_lock_.AssertAcquired(); | 261 pending_write_ = false; |
| 356 | |
| 357 DCHECK(!write_stopped_); | |
| 358 DCHECK(!write_message_queue_.empty()); | |
| 359 | |
| 360 MessageInTransit* message = write_message_queue_.front(); | |
| 361 DCHECK_LT(write_message_offset_, message->main_buffer_size()); | |
| 362 size_t bytes_to_write = message->main_buffer_size() - write_message_offset_; | |
| 363 ssize_t bytes_written = HANDLE_EINTR( | |
| 364 write(fd_.get().fd, | |
| 365 static_cast<const char*>(message->main_buffer()) + | |
| 366 write_message_offset_, | |
| 367 bytes_to_write)); | |
| 368 if (bytes_written < 0) { | |
| 369 if (errno != EAGAIN && errno != EWOULDBLOCK) { | |
| 370 PLOG(ERROR) << "write of size " << bytes_to_write; | |
| 371 CancelPendingWritesNoLock(); | |
| 372 return false; | |
| 373 } | 262 } |
| 374 | 263 OnWriteCompleted(false, 0); |
| 375 // We simply failed to write since we'd block. The logic is the same as if | |
| 376 // we got a partial write. | |
| 377 bytes_written = 0; | |
| 378 } | 264 } |
| 379 | |
| 380 DCHECK_GE(bytes_written, 0); | |
| 381 if (static_cast<size_t>(bytes_written) < bytes_to_write) { | |
| 382 // Partial (or no) write. | |
| 383 write_message_offset_ += static_cast<size_t>(bytes_written); | |
| 384 } else { | |
| 385 // Complete write. | |
| 386 DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); | |
| 387 write_message_queue_.pop_front(); | |
| 388 delete message; | |
| 389 write_message_offset_ = 0; | |
| 390 } | |
| 391 | |
| 392 return true; | |
| 393 } | |
| 394 | |
| 395 void RawChannelPosix::CancelPendingWritesNoLock() { | |
| 396 write_lock_.AssertAcquired(); | |
| 397 DCHECK(!write_stopped_); | |
| 398 | |
| 399 write_stopped_ = true; | |
| 400 STLDeleteElements(&write_message_queue_); | |
| 401 } | 265 } |
| 402 | 266 |
| 403 } // namespace | 267 } // namespace |
| 404 | 268 |
| 405 // ----------------------------------------------------------------------------- | 269 // ----------------------------------------------------------------------------- |
| 406 | 270 |
| 407 // Static factory method declared in raw_channel.h. | 271 // Static factory method declared in raw_channel.h. |
| 408 // static | 272 // static |
| 409 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, | 273 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, |
| 410 Delegate* delegate, | 274 Delegate* delegate, |
| 411 base::MessageLoopForIO* message_loop_for_io) { | 275 base::MessageLoopForIO* message_loop_for_io) { |
| 412 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io); | 276 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io); |
| 413 } | 277 } |
| 414 | 278 |
| 415 } // namespace system | 279 } // namespace system |
| 416 } // namespace mojo | 280 } // namespace mojo |
| OLD | NEW |