Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "mojo/system/raw_channel.h" | |
| 6 | |
| 7 #include <string.h> | |
| 8 | |
| 9 #include <algorithm> | |
| 10 | |
| 11 #include "base/bind.h" | |
| 12 #include "base/location.h" | |
| 13 #include "base/logging.h" | |
| 14 #include "base/message_loop/message_loop.h" | |
| 15 #include "base/stl_util.h" | |
| 16 #include "mojo/system/message_in_transit.h" | |
| 17 | |
| 18 namespace mojo { | |
| 19 namespace system { | |
| 20 | |
| 21 namespace { | |
|
viettrungluu
2014/02/26 23:03:39
Note that this anonymous namespace is redundant. (
yzshen1
2014/02/27 02:00:30
Done.
| |
| 22 | |
| 23 const size_t kReadSize = 4096; | |
| 24 | |
| 25 } // namespace | |
| 26 | |
| 27 | |
| 28 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { | |
| 29 } | |
| 30 | |
| 31 RawChannel::ReadBuffer::~ReadBuffer() {} | |
| 32 | |
| 33 char* RawChannel::ReadBuffer::GetPosition() { | |
| 34 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); | |
| 35 return &buffer_[0] + num_valid_bytes_; | |
| 36 } | |
| 37 | |
| 38 size_t RawChannel::ReadBuffer::GetBytesToRead() const { | |
| 39 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); | |
| 40 return kReadSize; | |
| 41 } | |
| 42 | |
| 43 RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {} | |
| 44 | |
| 45 RawChannel::WriteBuffer::~WriteBuffer() { | |
| 46 STLDeleteElements(&message_queue_); | |
| 47 } | |
| 48 | |
| 49 const char* RawChannel::WriteBuffer::GetPosition() const { | |
| 50 if (message_queue_.empty()) | |
| 51 return NULL; | |
| 52 | |
| 53 DCHECK_GT(message_queue_.front()->main_buffer_size(), offset_); | |
| 54 return static_cast<const char*>(message_queue_.front()->main_buffer()) + | |
| 55 offset_; | |
| 56 } | |
| 57 | |
| 58 size_t RawChannel::WriteBuffer::GetBytesToWrite() const { | |
| 59 if (message_queue_.empty()) | |
| 60 return 0; | |
| 61 | |
| 62 DCHECK_GT(message_queue_.front()->main_buffer_size(), offset_); | |
| 63 return message_queue_.front()->main_buffer_size() - offset_; | |
| 64 } | |
| 65 | |
| 66 RawChannel::RawChannel(Delegate* delegate, | |
| 67 base::MessageLoopForIO* message_loop_for_io) | |
| 68 : delegate_(delegate), | |
| 69 read_stopped_(false), | |
| 70 message_loop_for_io_(message_loop_for_io), | |
| 71 write_stopped_(false), | |
| 72 weak_ptr_factory_(this) { | |
| 73 } | |
| 74 | |
| 75 RawChannel::~RawChannel() { | |
| 76 DCHECK(!read_buffer_); | |
| 77 DCHECK(!write_buffer_); | |
| 78 | |
| 79 // No need to take the |write_lock_| here -- if there are still weak pointers | |
| 80 // outstanding, then we're hosed anyway (since we wouldn't be able to | |
| 81 // invalidate them cleanly, since we might not be on the I/O thread). | |
| 82 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); | |
| 83 } | |
| 84 | |
| 85 bool RawChannel::Init() { | |
| 86 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | |
| 87 | |
| 88 // No need to take the lock. No one should be using us yet. | |
| 89 DCHECK(!read_buffer_); | |
| 90 read_buffer_.reset(new ReadBuffer); | |
| 91 DCHECK(!write_buffer_); | |
| 92 write_buffer_.reset(new WriteBuffer); | |
| 93 | |
| 94 if (!OnInit()) | |
| 95 return false; | |
| 96 | |
| 97 IOResult result = ScheduleRead(); | |
| 98 return result == IO_PENDING; | |
|
viettrungluu
2014/02/26 23:03:39
nit: I think you may as well write this as |return
yzshen1
2014/02/27 02:00:30
Done.
| |
| 99 } | |
| 100 | |
| 101 void RawChannel::Shutdown() { | |
| 102 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | |
| 103 | |
| 104 base::AutoLock locker(write_lock_); | |
| 105 | |
| 106 weak_ptr_factory_.InvalidateWeakPtrs(); | |
| 107 | |
| 108 read_stopped_ = true; | |
| 109 write_stopped_ = true; | |
| 110 | |
| 111 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); | |
| 112 } | |
| 113 | |
| 114 // Reminder: This must be thread-safe, and takes ownership of |message|. | |
|
viettrungluu
2014/02/26 23:03:39
You can get rid of the ", and takes ownership ..."
yzshen1
2014/02/27 02:00:30
Done.
| |
| 115 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { | |
| 116 base::AutoLock locker(write_lock_); | |
| 117 if (write_stopped_) | |
| 118 return false; | |
| 119 | |
| 120 if (!write_buffer_->message_queue_.empty()) { | |
| 121 write_buffer_->message_queue_.push_back(message.release()); | |
| 122 return true; | |
| 123 } | |
| 124 | |
| 125 write_buffer_->message_queue_.push_front(message.release()); | |
| 126 DCHECK_EQ(write_buffer_->offset_, 0u); | |
| 127 | |
| 128 size_t bytes_written = 0; | |
| 129 IOResult io_result = WriteNoLock(&bytes_written); | |
| 130 if (io_result == IO_PENDING) | |
| 131 return true; | |
| 132 | |
| 133 bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED, | |
| 134 bytes_written); | |
| 135 if (!result) { | |
| 136 // Even if we're on the I/O thread, don't call |OnFatalError()| in the | |
| 137 // nested context. | |
| 138 message_loop_for_io_->PostTask( | |
| 139 FROM_HERE, | |
| 140 base::Bind(&RawChannel::CallOnFatalError, | |
| 141 weak_ptr_factory_.GetWeakPtr(), | |
| 142 Delegate::FATAL_ERROR_FAILED_WRITE)); | |
| 143 } | |
| 144 | |
| 145 return result; | |
| 146 } | |
| 147 | |
| 148 RawChannel::ReadBuffer* RawChannel::read_buffer() { | |
| 149 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | |
| 150 return read_buffer_.get(); | |
| 151 } | |
| 152 | |
| 153 RawChannel::WriteBuffer* RawChannel::write_buffer() { | |
| 154 write_lock_.AssertAcquired(); | |
| 155 return write_buffer_.get(); | |
| 156 } | |
| 157 | |
| 158 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { | |
|
viettrungluu
2014/02/26 23:03:39
The implementation of this function could use a co
yzshen1
2014/02/27 02:00:30
Done.
| |
| 159 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | |
| 160 | |
| 161 if (read_stopped_) { | |
| 162 NOTREACHED(); | |
| 163 return; | |
| 164 } | |
| 165 | |
| 166 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; | |
| 167 for (;;) { | |
| 168 if (io_result != IO_SUCCEEDED) { | |
| 169 read_stopped_ = true; | |
| 170 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); | |
| 171 return; | |
| 172 } | |
| 173 | |
| 174 read_buffer_->num_valid_bytes_ += bytes_read; | |
| 175 | |
| 176 // Dispatch all the messages that we can. | |
| 177 bool did_dispatch_message = false; | |
| 178 // Tracks the offset of the first undispatched message in |read_buffer_|. | |
| 179 // Currently, we copy data to ensure that this is zero at the beginning. | |
| 180 size_t read_buffer_start = 0; | |
| 181 size_t remaining_bytes = read_buffer_->num_valid_bytes_; | |
| 182 size_t message_size; | |
| 183 // Note that we rely on short-circuit evaluation here: | |
| 184 // - |read_buffer_start| may be an invalid index into | |
| 185 // |read_buffer_->buffer_| if |remaining_bytes| is zero. | |
| 186 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | |
| 187 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | |
| 188 // next read). | |
| 189 while (remaining_bytes > 0 && | |
| 190 MessageInTransit::GetNextMessageSize( | |
| 191 &read_buffer_->buffer_[read_buffer_start], remaining_bytes, | |
| 192 &message_size) && | |
| 193 remaining_bytes >= message_size) { | |
| 194 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, | |
| 195 &read_buffer_->buffer_[read_buffer_start]); | |
| 196 DCHECK_EQ(message.main_buffer_size(), message_size); | |
| 197 | |
| 198 // Dispatch the message. | |
| 199 delegate_->OnReadMessage(message); | |
| 200 if (read_stopped_) { | |
| 201 // |Shutdown()| was called in |OnReadMessage()|. | |
| 202 // TODO(vtl): Add test for this case. | |
| 203 return; | |
| 204 } | |
| 205 did_dispatch_message = true; | |
| 206 | |
| 207 // Update our state. | |
| 208 read_buffer_start += message_size; | |
| 209 remaining_bytes -= message_size; | |
| 210 } | |
| 211 | |
| 212 if (read_buffer_start > 0) { | |
| 213 // Move data back to start. | |
| 214 read_buffer_->num_valid_bytes_ = remaining_bytes; | |
| 215 if (read_buffer_->num_valid_bytes_ > 0) { | |
| 216 memmove(&read_buffer_->buffer_[0], | |
| 217 &read_buffer_->buffer_[read_buffer_start], remaining_bytes); | |
| 218 } | |
| 219 read_buffer_start = 0; | |
| 220 } | |
| 221 | |
| 222 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < | |
| 223 kReadSize) { | |
| 224 // Use power-of-2 buffer sizes. | |
| 225 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the | |
| 226 // maximum message size to whatever extent necessary). | |
| 227 // TODO(vtl): We may often be able to peek at the header and get the real | |
| 228 // required extra space (which may be much bigger than |kReadSize|). | |
| 229 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); | |
| 230 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) | |
| 231 new_size *= 2; | |
| 232 | |
| 233 // TODO(vtl): It's suboptimal to zero out the fresh memory. | |
| 234 read_buffer_->buffer_.resize(new_size, 0); | |
| 235 } | |
| 236 | |
| 237 // (1) If we dispatched any messages, stop reading for now (and let the | |
| 238 // message loop do its thing for another round). | |
| 239 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only | |
| 240 // a single message. Risks: slower, more complex if we want to avoid lots of | |
| 241 // copying. ii. Keep reading until there's no more data and dispatch all the | |
| 242 // messages we can. Risks: starvation of other users of the message loop.) | |
| 243 // (2) If we didn't max out |kReadSize|, stop reading for now. | |
| 244 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; | |
| 245 bytes_read = 0; | |
| 246 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); | |
| 247 | |
| 248 if (io_result == IO_PENDING) | |
|
viettrungluu
2014/02/26 23:03:39
do { ... } while (io_result != IO_PENDING);
yzshen1
2014/02/27 02:00:30
Done.
| |
| 249 return; | |
| 250 } | |
| 251 } | |
| 252 | |
| 253 void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { | |
| 254 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | |
| 255 | |
| 256 bool did_fail = false; | |
| 257 { | |
| 258 base::AutoLock locker(write_lock_); | |
| 259 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); | |
| 260 | |
| 261 if (write_stopped_) { | |
| 262 NOTREACHED(); | |
| 263 return; | |
| 264 } | |
| 265 | |
| 266 did_fail = !OnWriteCompletedNoLock(result, bytes_written); | |
| 267 } | |
| 268 | |
| 269 if (did_fail) | |
| 270 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); | |
| 271 } | |
| 272 | |
| 273 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { | |
| 274 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | |
| 275 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | |
| 276 delegate_->OnFatalError(fatal_error); | |
| 277 } | |
| 278 | |
| 279 bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { | |
| 280 write_lock_.AssertAcquired(); | |
| 281 | |
| 282 DCHECK(!write_stopped_); | |
| 283 DCHECK(!write_buffer_->message_queue_.empty()); | |
| 284 | |
| 285 if (result) { | |
| 286 if (bytes_written < write_buffer_->GetBytesToWrite()) { | |
| 287 // Partial (or no) write. | |
| 288 write_buffer_->offset_ += bytes_written; | |
| 289 } else { | |
| 290 // Complete write. | |
| 291 DCHECK_EQ(bytes_written, write_buffer_->GetBytesToWrite()); | |
| 292 delete write_buffer_->message_queue_.front(); | |
| 293 write_buffer_->message_queue_.pop_front(); | |
| 294 write_buffer_->offset_ = 0; | |
| 295 } | |
| 296 | |
| 297 if (write_buffer_->message_queue_.empty()) | |
| 298 return true; | |
| 299 | |
| 300 // Schedule the next write. | |
| 301 if (ScheduleWriteNoLock() == IO_PENDING) | |
| 302 return true; | |
| 303 } | |
| 304 | |
| 305 write_stopped_ = true; | |
| 306 STLDeleteElements(&write_buffer_->message_queue_); | |
|
viettrungluu
2014/02/26 23:03:39
Can't you just do |write_buffer_.reset()| here?
yzshen1
2014/02/27 02:00:30
I was thinking maybe it is good to always give the
| |
| 307 write_buffer_->offset_ = 0; | |
| 308 return false; | |
|
viettrungluu
2014/02/26 23:03:39
More importantly, if it's the subclass that calls
yzshen1
2014/02/27 02:00:30
OnWriteCompletedNoLock() is a private helper metho
| |
| 309 } | |
| 310 | |
| 311 } // namespace system | |
| 312 } // namespace mojo | |
| OLD | NEW |