Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(62)

Side by Side Diff: mojo/system/raw_channel.cc

Issue 169723004: RawChannel refactoring (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/raw_channel.h"
6
7 #include <string.h>
8
9 #include <algorithm>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/stl_util.h"
16 #include "mojo/system/message_in_transit.h"
17
18 namespace mojo {
19 namespace system {
20
21 namespace {
22
23 const size_t kReadSize = 4096;
24
25 } // namespace
26
27 RawChannel::IOBufferPreserver::IOBufferPreserver(
28 scoped_ptr<std::vector<char> > read_buffer,
29 scoped_ptr<MessageInTransit> write_buffer)
30 : read_buffer_(read_buffer.Pass()),
31 write_buffer_(write_buffer.Pass()) {
32 }
33
34 RawChannel::IOBufferPreserver::~IOBufferPreserver() {
35 }
36
37 RawChannel::RawChannel(Delegate* delegate,
38 base::MessageLoopForIO* message_loop_for_io)
39 : delegate_(delegate),
40 read_stopped_(false),
41 read_buffer_(kReadSize),
42 read_buffer_num_valid_bytes_(0),
43 message_loop_for_io_(message_loop_for_io),
44 write_stopped_(false),
45 write_message_offset_(0),
46 weak_ptr_factory_(this) {
47 }
48
49 RawChannel::~RawChannel() {
50 DCHECK(write_message_queue_.empty());
51
52 // No need to take the |write_lock_| here -- if there are still weak pointers
53 // outstanding, then we're hosed anyway (since we wouldn't be able to
54 // invalidate them cleanly, since we might not be on the I/O thread).
55 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
56 }
57
58 bool RawChannel::Init() {
59 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
60
61 // No need to take the lock. No one should be using us yet.
62 DCHECK(write_message_queue_.empty());
63 DCHECK_EQ(kReadSize, read_buffer_.size());
64 DCHECK_EQ(0u, read_buffer_num_valid_bytes_);
65
66 if (!OnInit())
67 return false;
68
69 IOResult result = Read(true, &read_buffer_[0], kReadSize, NULL);
70 return result == IO_PENDING;
71 }
72
73 void RawChannel::Shutdown() {
74 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
75
76 base::AutoLock locker(write_lock_);
77
78 weak_ptr_factory_.InvalidateWeakPtrs();
79
80 scoped_ptr<std::vector<char> > preserved_read_buffer;
81 if (!read_stopped_) {
82 read_stopped_ = true;
83 preserved_read_buffer.reset(new std::vector<char>());
84 preserved_read_buffer->swap(read_buffer_);
85 }
86
87 scoped_ptr<MessageInTransit> preserved_write_buffer;
88 if (!write_stopped_) {
89 write_stopped_ = true;
90 if (!write_message_queue_.empty()) {
91 preserved_write_buffer.reset(write_message_queue_.front());
92 write_message_queue_.pop_front();
93 }
94 STLDeleteElements(&write_message_queue_);
95 } else {
96 DCHECK(write_message_queue_.empty());
97 }
98
99 scoped_ptr<IOBufferPreserver> preserver(new IOBufferPreserver(
100 preserved_read_buffer.Pass(), preserved_write_buffer.Pass()));
101
102 OnShutdownNoLock(preserver.Pass());
103 }
104
105 // Reminder: This must be thread-safe, and takes ownership of |message|.
106 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
107 base::AutoLock locker(write_lock_);
108 if (write_stopped_)
109 return false;
110
111 if (!write_message_queue_.empty()) {
112 write_message_queue_.push_back(message.release());
113 return true;
114 }
115
116 write_message_queue_.push_front(message.release());
117 DCHECK_EQ(write_message_offset_, 0u);
118
119 MessageInTransit* front_message = write_message_queue_.front();
120 size_t bytes_written = 0;
121 IOResult io_result = WriteNoLock(
122 false,
123 static_cast<const char*>(front_message->main_buffer()),
124 front_message->main_buffer_size(),
125 &bytes_written);
126
127 if (io_result == IO_PENDING)
128 return true;
129
130 bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
131 bytes_written);
132 if (!result) {
133 // Even if we're on the I/O thread, don't call |OnFatalError()| in the
134 // nested context.
135 message_loop_for_io_->PostTask(
136 FROM_HERE,
137 base::Bind(&RawChannel::CallOnFatalError,
138 weak_ptr_factory_.GetWeakPtr(),
139 Delegate::FATAL_ERROR_FAILED_WRITE));
140 }
141
142 return result;
143 }
144
145 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
146 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
147
148 if (read_stopped_) {
149 NOTREACHED();
150 return;
151 }
152
153 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
154 for (;;) {
155 if (io_result != IO_SUCCEEDED) {
156 read_stopped_ = true;
157 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
158 return;
159 }
160
161 read_buffer_num_valid_bytes_ += bytes_read;
162
163 // Dispatch all the messages that we can.
164 bool did_dispatch_message = false;
165 // Tracks the offset of the first undispatched message in |read_buffer_|.
166 // Currently, we copy data to ensure that this is zero at the beginning.
167 size_t read_buffer_start = 0;
168 size_t message_size;
169 // Note that we rely on short-circuit evaluation here:
170 // - |read_buffer_start| may be an invalid index into |read_buffer_| if
171 // |read_buffer_num_valid_bytes_| is zero.
172 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
173 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
174 // next read).
175 while (read_buffer_num_valid_bytes_ > 0 &&
176 MessageInTransit::GetNextMessageSize(
177 &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_,
178 &message_size) &&
179 read_buffer_num_valid_bytes_ >= message_size) {
180 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
181 &read_buffer_[read_buffer_start]);
182 DCHECK_EQ(message.main_buffer_size(), message_size);
183
184 // Dispatch the message.
185 delegate_->OnReadMessage(message);
186 if (read_stopped_) {
187 // |Shutdown()| was called in |OnReadMessage()|.
188 // TODO(vtl): Add test for this case.
189 return;
190 }
191 did_dispatch_message = true;
192
193 // Update our state.
194 read_buffer_start += message_size;
195 read_buffer_num_valid_bytes_ -= message_size;
196 }
197
198 // (1) If we dispatched any messages, stop reading for now (and let the
199 // message loop do its thing for another round).
200 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
201 // a single message. Risks: slower, more complex if we want to avoid lots of
202 // copying. ii. Keep reading until there's no more data and dispatch all the
203 // messages we can. Risks: starvation of other users of the message loop.)
204 // (2) If we didn't max out |kReadSize|, stop reading for now.
205 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
206
207 if (read_buffer_start > 0) {
208 // Move data back to start.
209 if (read_buffer_num_valid_bytes_ > 0) {
210 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
211 read_buffer_num_valid_bytes_);
212 }
213 read_buffer_start = 0;
214 }
215
216 if (read_buffer_.size() - read_buffer_num_valid_bytes_ < kReadSize) {
217 // Use power-of-2 buffer sizes.
218 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
219 // maximum message size to whatever extent necessary).
220 // TODO(vtl): We may often be able to peek at the header and get the real
221 // required extra space (which may be much bigger than |kReadSize|).
222 size_t new_size = std::max(read_buffer_.size(), kReadSize);
223 while (new_size < read_buffer_num_valid_bytes_ + kReadSize)
224 new_size *= 2;
225
226 // TODO(vtl): It's suboptimal to zero out the fresh memory.
227 read_buffer_.resize(new_size, 0);
228 }
229
230 io_result = Read(schedule_for_later,
231 &read_buffer_[read_buffer_num_valid_bytes_],
232 kReadSize,
233 &bytes_read);
234 if (io_result == IO_PENDING)
235 return;
236 }
237 }
238
239 void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
240 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
241
242 bool did_fail = false;
243 {
244 base::AutoLock locker(write_lock_);
245 DCHECK_EQ(write_stopped_, write_message_queue_.empty());
246
247 if (write_stopped_) {
248 NOTREACHED();
249 return;
250 }
251
252 did_fail = !OnWriteCompletedNoLock(result, bytes_written);
253 }
254
255 if (did_fail)
256 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
257 }
258
259 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
260 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
261 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
262 delegate_->OnFatalError(fatal_error);
263 }
264
265 bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
266 write_lock_.AssertAcquired();
267
268 DCHECK(!write_stopped_);
269 DCHECK(!write_message_queue_.empty());
270
271 if (result) {
272 MessageInTransit* message = write_message_queue_.front();
273 DCHECK_LT(write_message_offset_, message->main_buffer_size());
274 size_t bytes_to_write = message->main_buffer_size() - write_message_offset_;
275
276 if (bytes_written < bytes_to_write) {
277 // Partial (or no) write.
278 write_message_offset_ += bytes_written;
279 } else {
280 // Complete write.
281 DCHECK_EQ(bytes_written, bytes_to_write);
282 write_message_queue_.pop_front();
283 delete message;
284 write_message_offset_ = 0;
285 }
286
287 if (write_message_queue_.empty())
288 return true;
289
290 // Schedule the next write.
291 message = write_message_queue_.front();
292 bytes_to_write = message->main_buffer_size() - write_message_offset_;
293
294 if (WriteNoLock(true,
295 static_cast<const char*>(message->main_buffer()) +
296 write_message_offset_,
297 bytes_to_write,
298 NULL) == IO_PENDING) {
299 return true;
300 }
301 }
302
303 write_stopped_ = true;
304 STLDeleteElements(&write_message_queue_);
305 return false;
306 }
307
308 } // namespace system
309 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698