Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(968)

Side by Side Diff: mojo/system/raw_channel.cc

Issue 169723004: RawChannel refactoring (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: introduce ReadBuffer / WriteBuffer Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/raw_channel.h"
6
7 #include <string.h>
8
9 #include <algorithm>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/stl_util.h"
16 #include "mojo/system/message_in_transit.h"
17
18 namespace mojo {
19 namespace system {
20
21 namespace {
viettrungluu 2014/02/26 23:03:39 Note that this anonymous namespace is redundant. (
yzshen1 2014/02/27 02:00:30 Done.
22
23 const size_t kReadSize = 4096;
24
25 } // namespace
26
27
28 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
29 }
30
31 RawChannel::ReadBuffer::~ReadBuffer() {}
32
33 char* RawChannel::ReadBuffer::GetPosition() {
34 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
35 return &buffer_[0] + num_valid_bytes_;
36 }
37
38 size_t RawChannel::ReadBuffer::GetBytesToRead() const {
39 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
40 return kReadSize;
41 }
42
43 RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {}
44
45 RawChannel::WriteBuffer::~WriteBuffer() {
46 STLDeleteElements(&message_queue_);
47 }
48
49 const char* RawChannel::WriteBuffer::GetPosition() const {
50 if (message_queue_.empty())
51 return NULL;
52
53 DCHECK_GT(message_queue_.front()->main_buffer_size(), offset_);
54 return static_cast<const char*>(message_queue_.front()->main_buffer()) +
55 offset_;
56 }
57
58 size_t RawChannel::WriteBuffer::GetBytesToWrite() const {
59 if (message_queue_.empty())
60 return 0;
61
62 DCHECK_GT(message_queue_.front()->main_buffer_size(), offset_);
63 return message_queue_.front()->main_buffer_size() - offset_;
64 }
65
66 RawChannel::RawChannel(Delegate* delegate,
67 base::MessageLoopForIO* message_loop_for_io)
68 : delegate_(delegate),
69 read_stopped_(false),
70 message_loop_for_io_(message_loop_for_io),
71 write_stopped_(false),
72 weak_ptr_factory_(this) {
73 }
74
75 RawChannel::~RawChannel() {
76 DCHECK(!read_buffer_);
77 DCHECK(!write_buffer_);
78
79 // No need to take the |write_lock_| here -- if there are still weak pointers
80 // outstanding, then we're hosed anyway (since we wouldn't be able to
81 // invalidate them cleanly, since we might not be on the I/O thread).
82 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
83 }
84
85 bool RawChannel::Init() {
86 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
87
88 // No need to take the lock. No one should be using us yet.
89 DCHECK(!read_buffer_);
90 read_buffer_.reset(new ReadBuffer);
91 DCHECK(!write_buffer_);
92 write_buffer_.reset(new WriteBuffer);
93
94 if (!OnInit())
95 return false;
96
97 IOResult result = ScheduleRead();
98 return result == IO_PENDING;
viettrungluu 2014/02/26 23:03:39 nit: I think you may as well write this as |return
yzshen1 2014/02/27 02:00:30 Done.
99 }
100
101 void RawChannel::Shutdown() {
102 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
103
104 base::AutoLock locker(write_lock_);
105
106 weak_ptr_factory_.InvalidateWeakPtrs();
107
108 read_stopped_ = true;
109 write_stopped_ = true;
110
111 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
112 }
113
114 // Reminder: This must be thread-safe, and takes ownership of |message|.
viettrungluu 2014/02/26 23:03:39 You can get rid of the ", and takes ownership ..."
yzshen1 2014/02/27 02:00:30 Done.
115 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
116 base::AutoLock locker(write_lock_);
117 if (write_stopped_)
118 return false;
119
120 if (!write_buffer_->message_queue_.empty()) {
121 write_buffer_->message_queue_.push_back(message.release());
122 return true;
123 }
124
125 write_buffer_->message_queue_.push_front(message.release());
126 DCHECK_EQ(write_buffer_->offset_, 0u);
127
128 size_t bytes_written = 0;
129 IOResult io_result = WriteNoLock(&bytes_written);
130 if (io_result == IO_PENDING)
131 return true;
132
133 bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
134 bytes_written);
135 if (!result) {
136 // Even if we're on the I/O thread, don't call |OnFatalError()| in the
137 // nested context.
138 message_loop_for_io_->PostTask(
139 FROM_HERE,
140 base::Bind(&RawChannel::CallOnFatalError,
141 weak_ptr_factory_.GetWeakPtr(),
142 Delegate::FATAL_ERROR_FAILED_WRITE));
143 }
144
145 return result;
146 }
147
148 RawChannel::ReadBuffer* RawChannel::read_buffer() {
149 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
150 return read_buffer_.get();
151 }
152
153 RawChannel::WriteBuffer* RawChannel::write_buffer() {
154 write_lock_.AssertAcquired();
155 return write_buffer_.get();
156 }
157
158 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
viettrungluu 2014/02/26 23:03:39 The implementation of this function could use a co
yzshen1 2014/02/27 02:00:30 Done.
159 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
160
161 if (read_stopped_) {
162 NOTREACHED();
163 return;
164 }
165
166 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
167 for (;;) {
168 if (io_result != IO_SUCCEEDED) {
169 read_stopped_ = true;
170 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
171 return;
172 }
173
174 read_buffer_->num_valid_bytes_ += bytes_read;
175
176 // Dispatch all the messages that we can.
177 bool did_dispatch_message = false;
178 // Tracks the offset of the first undispatched message in |read_buffer_|.
179 // Currently, we copy data to ensure that this is zero at the beginning.
180 size_t read_buffer_start = 0;
181 size_t remaining_bytes = read_buffer_->num_valid_bytes_;
182 size_t message_size;
183 // Note that we rely on short-circuit evaluation here:
184 // - |read_buffer_start| may be an invalid index into
185 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
186 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
187 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
188 // next read).
189 while (remaining_bytes > 0 &&
190 MessageInTransit::GetNextMessageSize(
191 &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
192 &message_size) &&
193 remaining_bytes >= message_size) {
194 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
195 &read_buffer_->buffer_[read_buffer_start]);
196 DCHECK_EQ(message.main_buffer_size(), message_size);
197
198 // Dispatch the message.
199 delegate_->OnReadMessage(message);
200 if (read_stopped_) {
201 // |Shutdown()| was called in |OnReadMessage()|.
202 // TODO(vtl): Add test for this case.
203 return;
204 }
205 did_dispatch_message = true;
206
207 // Update our state.
208 read_buffer_start += message_size;
209 remaining_bytes -= message_size;
210 }
211
212 if (read_buffer_start > 0) {
213 // Move data back to start.
214 read_buffer_->num_valid_bytes_ = remaining_bytes;
215 if (read_buffer_->num_valid_bytes_ > 0) {
216 memmove(&read_buffer_->buffer_[0],
217 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
218 }
219 read_buffer_start = 0;
220 }
221
222 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
223 kReadSize) {
224 // Use power-of-2 buffer sizes.
225 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
226 // maximum message size to whatever extent necessary).
227 // TODO(vtl): We may often be able to peek at the header and get the real
228 // required extra space (which may be much bigger than |kReadSize|).
229 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
230 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
231 new_size *= 2;
232
233 // TODO(vtl): It's suboptimal to zero out the fresh memory.
234 read_buffer_->buffer_.resize(new_size, 0);
235 }
236
237 // (1) If we dispatched any messages, stop reading for now (and let the
238 // message loop do its thing for another round).
239 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
240 // a single message. Risks: slower, more complex if we want to avoid lots of
241 // copying. ii. Keep reading until there's no more data and dispatch all the
242 // messages we can. Risks: starvation of other users of the message loop.)
243 // (2) If we didn't max out |kReadSize|, stop reading for now.
244 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
245 bytes_read = 0;
246 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
247
248 if (io_result == IO_PENDING)
viettrungluu 2014/02/26 23:03:39 do { ... } while (io_result != IO_PENDING);
yzshen1 2014/02/27 02:00:30 Done.
249 return;
250 }
251 }
252
253 void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
254 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
255
256 bool did_fail = false;
257 {
258 base::AutoLock locker(write_lock_);
259 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
260
261 if (write_stopped_) {
262 NOTREACHED();
263 return;
264 }
265
266 did_fail = !OnWriteCompletedNoLock(result, bytes_written);
267 }
268
269 if (did_fail)
270 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
271 }
272
273 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
274 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
275 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
276 delegate_->OnFatalError(fatal_error);
277 }
278
279 bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
280 write_lock_.AssertAcquired();
281
282 DCHECK(!write_stopped_);
283 DCHECK(!write_buffer_->message_queue_.empty());
284
285 if (result) {
286 if (bytes_written < write_buffer_->GetBytesToWrite()) {
287 // Partial (or no) write.
288 write_buffer_->offset_ += bytes_written;
289 } else {
290 // Complete write.
291 DCHECK_EQ(bytes_written, write_buffer_->GetBytesToWrite());
292 delete write_buffer_->message_queue_.front();
293 write_buffer_->message_queue_.pop_front();
294 write_buffer_->offset_ = 0;
295 }
296
297 if (write_buffer_->message_queue_.empty())
298 return true;
299
300 // Schedule the next write.
301 if (ScheduleWriteNoLock() == IO_PENDING)
302 return true;
303 }
304
305 write_stopped_ = true;
306 STLDeleteElements(&write_buffer_->message_queue_);
viettrungluu 2014/02/26 23:03:39 Can't you just do |write_buffer_.reset()| here?
yzshen1 2014/02/27 02:00:30 I was thinking maybe it is good to always give the
307 write_buffer_->offset_ = 0;
308 return false;
viettrungluu 2014/02/26 23:03:39 More importantly, if it's the subclass that calls
yzshen1 2014/02/27 02:00:30 OnWriteCompletedNoLock() is a private helper metho
309 }
310
311 } // namespace system
312 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698