OLD | NEW |
(Empty) | |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "mojo/system/raw_channel.h" |
| 6 |
| 7 #include <string.h> |
| 8 |
| 9 #include <algorithm> |
| 10 |
| 11 #include "base/bind.h" |
| 12 #include "base/location.h" |
| 13 #include "base/logging.h" |
| 14 #include "base/message_loop/message_loop.h" |
| 15 #include "base/stl_util.h" |
| 16 #include "mojo/system/message_in_transit.h" |
| 17 |
| 18 namespace mojo { |
| 19 namespace system { |
| 20 |
| 21 namespace { |
| 22 |
| 23 const size_t kReadSize = 4096; |
| 24 |
| 25 } // namespace |
| 26 |
| 27 RawChannel::IOBufferPreserver::IOBufferPreserver( |
| 28 scoped_ptr<std::vector<char> > read_buffer, |
| 29 scoped_ptr<MessageInTransit> write_buffer) |
| 30 : read_buffer_(read_buffer.Pass()), |
| 31 write_buffer_(write_buffer.Pass()) { |
| 32 } |
| 33 |
| 34 RawChannel::IOBufferPreserver::~IOBufferPreserver() { |
| 35 } |
| 36 |
| 37 RawChannel::RawChannel(Delegate* delegate, |
| 38 base::MessageLoopForIO* message_loop_for_io) |
| 39 : delegate_(delegate), |
| 40 read_stopped_(false), |
| 41 read_buffer_(kReadSize), |
| 42 read_buffer_num_valid_bytes_(0), |
| 43 message_loop_for_io_(message_loop_for_io), |
| 44 write_stopped_(false), |
| 45 write_message_offset_(0), |
| 46 weak_ptr_factory_(this) { |
| 47 } |
| 48 |
| 49 RawChannel::~RawChannel() { |
| 50 DCHECK(write_message_queue_.empty()); |
| 51 |
| 52 // No need to take the |write_lock_| here -- if there are still weak pointers |
| 53 // outstanding, then we're hosed anyway (since we wouldn't be able to |
| 54 // invalidate them cleanly, since we might not be on the I/O thread). |
| 55 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
| 56 } |
| 57 |
| 58 bool RawChannel::Init() { |
| 59 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 60 |
| 61 // No need to take the lock. No one should be using us yet. |
| 62 DCHECK(write_message_queue_.empty()); |
| 63 DCHECK_EQ(kReadSize, read_buffer_.size()); |
| 64 DCHECK_EQ(0u, read_buffer_num_valid_bytes_); |
| 65 |
| 66 if (!OnInit()) |
| 67 return false; |
| 68 |
| 69 IOResult result = Read(true, &read_buffer_[0], kReadSize, NULL); |
| 70 return result == IO_PENDING; |
| 71 } |
| 72 |
| 73 void RawChannel::Shutdown() { |
| 74 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 75 |
| 76 base::AutoLock locker(write_lock_); |
| 77 |
| 78 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 79 |
| 80 scoped_ptr<std::vector<char> > preserved_read_buffer; |
| 81 if (!read_stopped_) { |
| 82 read_stopped_ = true; |
| 83 preserved_read_buffer.reset(new std::vector<char>()); |
| 84 preserved_read_buffer->swap(read_buffer_); |
| 85 } |
| 86 |
| 87 scoped_ptr<MessageInTransit> preserved_write_buffer; |
| 88 if (!write_stopped_) { |
| 89 write_stopped_ = true; |
| 90 if (!write_message_queue_.empty()) { |
| 91 preserved_write_buffer.reset(write_message_queue_.front()); |
| 92 write_message_queue_.pop_front(); |
| 93 } |
| 94 STLDeleteElements(&write_message_queue_); |
| 95 } else { |
| 96 DCHECK(write_message_queue_.empty()); |
| 97 } |
| 98 |
| 99 scoped_ptr<IOBufferPreserver> preserver(new IOBufferPreserver( |
| 100 preserved_read_buffer.Pass(), preserved_write_buffer.Pass())); |
| 101 |
| 102 OnShutdownNoLock(preserver.Pass()); |
| 103 } |
| 104 |
| 105 // Reminder: This must be thread-safe, and takes ownership of |message|. |
| 106 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
| 107 base::AutoLock locker(write_lock_); |
| 108 if (write_stopped_) |
| 109 return false; |
| 110 |
| 111 if (!write_message_queue_.empty()) { |
| 112 write_message_queue_.push_back(message.release()); |
| 113 return true; |
| 114 } |
| 115 |
| 116 write_message_queue_.push_front(message.release()); |
| 117 DCHECK_EQ(write_message_offset_, 0u); |
| 118 |
| 119 MessageInTransit* front_message = write_message_queue_.front(); |
| 120 size_t bytes_written = 0; |
| 121 IOResult io_result = WriteNoLock( |
| 122 false, |
| 123 static_cast<const char*>(front_message->main_buffer()), |
| 124 front_message->main_buffer_size(), |
| 125 &bytes_written); |
| 126 |
| 127 if (io_result == IO_PENDING) |
| 128 return true; |
| 129 |
| 130 bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED, |
| 131 bytes_written); |
| 132 if (!result) { |
| 133 // Even if we're on the I/O thread, don't call |OnFatalError()| in the |
| 134 // nested context. |
| 135 message_loop_for_io_->PostTask( |
| 136 FROM_HERE, |
| 137 base::Bind(&RawChannel::CallOnFatalError, |
| 138 weak_ptr_factory_.GetWeakPtr(), |
| 139 Delegate::FATAL_ERROR_FAILED_WRITE)); |
| 140 } |
| 141 |
| 142 return result; |
| 143 } |
| 144 |
| 145 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { |
| 146 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 147 |
| 148 if (read_stopped_) { |
| 149 NOTREACHED(); |
| 150 return; |
| 151 } |
| 152 |
| 153 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; |
| 154 for (;;) { |
| 155 if (io_result != IO_SUCCEEDED) { |
| 156 read_stopped_ = true; |
| 157 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); |
| 158 return; |
| 159 } |
| 160 |
| 161 read_buffer_num_valid_bytes_ += bytes_read; |
| 162 |
| 163 // Dispatch all the messages that we can. |
| 164 bool did_dispatch_message = false; |
| 165 // Tracks the offset of the first undispatched message in |read_buffer_|. |
| 166 // Currently, we copy data to ensure that this is zero at the beginning. |
| 167 size_t read_buffer_start = 0; |
| 168 size_t message_size; |
| 169 // Note that we rely on short-circuit evaluation here: |
| 170 // - |read_buffer_start| may be an invalid index into |read_buffer_| if |
| 171 // |read_buffer_num_valid_bytes_| is zero. |
| 172 // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
| 173 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
| 174 // next read). |
| 175 while (read_buffer_num_valid_bytes_ > 0 && |
| 176 MessageInTransit::GetNextMessageSize( |
| 177 &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_, |
| 178 &message_size) && |
| 179 read_buffer_num_valid_bytes_ >= message_size) { |
| 180 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, |
| 181 &read_buffer_[read_buffer_start]); |
| 182 DCHECK_EQ(message.main_buffer_size(), message_size); |
| 183 |
| 184 // Dispatch the message. |
| 185 delegate_->OnReadMessage(message); |
| 186 if (read_stopped_) { |
| 187 // |Shutdown()| was called in |OnReadMessage()|. |
| 188 // TODO(vtl): Add test for this case. |
| 189 return; |
| 190 } |
| 191 did_dispatch_message = true; |
| 192 |
| 193 // Update our state. |
| 194 read_buffer_start += message_size; |
| 195 read_buffer_num_valid_bytes_ -= message_size; |
| 196 } |
| 197 |
| 198 // (1) If we dispatched any messages, stop reading for now (and let the |
| 199 // message loop do its thing for another round). |
| 200 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only |
| 201 // a single message. Risks: slower, more complex if we want to avoid lots of |
| 202 // copying. ii. Keep reading until there's no more data and dispatch all the |
| 203 // messages we can. Risks: starvation of other users of the message loop.) |
| 204 // (2) If we didn't max out |kReadSize|, stop reading for now. |
| 205 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; |
| 206 |
| 207 if (read_buffer_start > 0) { |
| 208 // Move data back to start. |
| 209 if (read_buffer_num_valid_bytes_ > 0) { |
| 210 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], |
| 211 read_buffer_num_valid_bytes_); |
| 212 } |
| 213 read_buffer_start = 0; |
| 214 } |
| 215 |
| 216 if (read_buffer_.size() - read_buffer_num_valid_bytes_ < kReadSize) { |
| 217 // Use power-of-2 buffer sizes. |
| 218 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
| 219 // maximum message size to whatever extent necessary). |
| 220 // TODO(vtl): We may often be able to peek at the header and get the real |
| 221 // required extra space (which may be much bigger than |kReadSize|). |
| 222 size_t new_size = std::max(read_buffer_.size(), kReadSize); |
| 223 while (new_size < read_buffer_num_valid_bytes_ + kReadSize) |
| 224 new_size *= 2; |
| 225 |
| 226 // TODO(vtl): It's suboptimal to zero out the fresh memory. |
| 227 read_buffer_.resize(new_size, 0); |
| 228 } |
| 229 |
| 230 io_result = Read(schedule_for_later, |
| 231 &read_buffer_[read_buffer_num_valid_bytes_], |
| 232 kReadSize, |
| 233 &bytes_read); |
| 234 if (io_result == IO_PENDING) |
| 235 return; |
| 236 } |
| 237 } |
| 238 |
| 239 void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { |
| 240 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 241 |
| 242 bool did_fail = false; |
| 243 { |
| 244 base::AutoLock locker(write_lock_); |
| 245 DCHECK_EQ(write_stopped_, write_message_queue_.empty()); |
| 246 |
| 247 if (write_stopped_) { |
| 248 NOTREACHED(); |
| 249 return; |
| 250 } |
| 251 |
| 252 did_fail = !OnWriteCompletedNoLock(result, bytes_written); |
| 253 } |
| 254 |
| 255 if (did_fail) |
| 256 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); |
| 257 } |
| 258 |
| 259 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { |
| 260 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 261 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
| 262 delegate_->OnFatalError(fatal_error); |
| 263 } |
| 264 |
| 265 bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { |
| 266 write_lock_.AssertAcquired(); |
| 267 |
| 268 DCHECK(!write_stopped_); |
| 269 DCHECK(!write_message_queue_.empty()); |
| 270 |
| 271 if (result) { |
| 272 MessageInTransit* message = write_message_queue_.front(); |
| 273 DCHECK_LT(write_message_offset_, message->main_buffer_size()); |
| 274 size_t bytes_to_write = message->main_buffer_size() - write_message_offset_; |
| 275 |
| 276 if (bytes_written < bytes_to_write) { |
| 277 // Partial (or no) write. |
| 278 write_message_offset_ += bytes_written; |
| 279 } else { |
| 280 // Complete write. |
| 281 DCHECK_EQ(bytes_written, bytes_to_write); |
| 282 write_message_queue_.pop_front(); |
| 283 delete message; |
| 284 write_message_offset_ = 0; |
| 285 } |
| 286 |
| 287 if (write_message_queue_.empty()) |
| 288 return true; |
| 289 |
| 290 // Schedule the next write. |
| 291 message = write_message_queue_.front(); |
| 292 bytes_to_write = message->main_buffer_size() - write_message_offset_; |
| 293 |
| 294 if (WriteNoLock(true, |
| 295 static_cast<const char*>(message->main_buffer()) + |
| 296 write_message_offset_, |
| 297 bytes_to_write, |
| 298 NULL) == IO_PENDING) { |
| 299 return true; |
| 300 } |
| 301 } |
| 302 |
| 303 write_stopped_ = true; |
| 304 STLDeleteElements(&write_message_queue_); |
| 305 return false; |
| 306 } |
| 307 |
| 308 } // namespace system |
| 309 } // namespace mojo |
OLD | NEW |