Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(365)

Side by Side Diff: mojo/system/raw_channel.cc

Issue 169723004: RawChannel refactoring (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: support multi-segment write Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/raw_channel.h"
6
7 #include <string.h>
8
9 #include <algorithm>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/stl_util.h"
16 #include "mojo/system/message_in_transit.h"
17
18 namespace mojo {
19 namespace system {
20
21 const size_t kReadSize = 4096;
22
23 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
24 }
25
26 RawChannel::ReadBuffer::~ReadBuffer() {}
27
28 void RawChannel::ReadBuffer::GetBuffer(char** buffer, size_t* bytes_to_read) {
29 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
30 *buffer = &buffer_[0] + num_valid_bytes_;
31 *bytes_to_read = kReadSize;
32 }
33
34 RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {}
35
36 RawChannel::WriteBuffer::~WriteBuffer() {
37 STLDeleteElements(&message_queue_);
38 }
39
40 void RawChannel::WriteBuffer::GetBuffers(BufferVector* buffers) const {
41 buffers->clear();
42
43 size_t bytes_to_write = GetTotalBytesToWrite();
44 if (bytes_to_write == 0)
45 return;
46
47 MessageInTransit* message = message_queue_.front();
48 if (!message->secondary_buffer_size()) {
49 // Only write from the main buffer.
50 DCHECK_LT(offset_, message->main_buffer_size());
51 DCHECK_LE(bytes_to_write, message->main_buffer_size());
52 buffers->push_back(std::make_pair(
53 static_cast<const char*>(message->main_buffer()) + offset_,
54 bytes_to_write));
55 return;
56 }
57
58 if (offset_ >= message->main_buffer_size()) {
59 // Only write from the secondary buffer.
60 DCHECK_LT(offset_ - message->main_buffer_size(),
61 message->secondary_buffer_size());
62 DCHECK_LE(bytes_to_write, message->secondary_buffer_size());
63 buffers->push_back(std::make_pair(
64 static_cast<const char*>(message->secondary_buffer()) +
65 (offset_ - message->main_buffer_size()),
66 bytes_to_write));
67 return;
68 }
69
70 // Write from both buffers.
71 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ +
72 message->secondary_buffer_size());
73 buffers->push_back(
74 std::make_pair(
75 static_cast<const char*>(message->main_buffer()) + offset_,
76 message->main_buffer_size() - offset_));
77 buffers->push_back(
78 std::make_pair(
79 static_cast<const char*>(message->secondary_buffer()),
80 message->secondary_buffer_size()));
81 }
82
83 size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const {
84 if (message_queue_.empty())
85 return 0;
86
87 MessageInTransit* message = message_queue_.front();
88 DCHECK_LT(offset_, message->total_size());
89 return message->total_size() - offset_;
90 }
91
92 RawChannel::RawChannel(Delegate* delegate,
93 base::MessageLoopForIO* message_loop_for_io)
94 : delegate_(delegate),
95 message_loop_for_io_(message_loop_for_io),
96 read_stopped_(false),
97 write_stopped_(false),
98 weak_ptr_factory_(this) {
99 }
100
101 RawChannel::~RawChannel() {
102 DCHECK(!read_buffer_);
103 DCHECK(!write_buffer_);
104
105 // No need to take the |write_lock_| here -- if there are still weak pointers
106 // outstanding, then we're hosed anyway (since we wouldn't be able to
107 // invalidate them cleanly, since we might not be on the I/O thread).
108 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
109 }
110
111 bool RawChannel::Init() {
112 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
113
114 // No need to take the lock. No one should be using us yet.
115 DCHECK(!read_buffer_);
116 read_buffer_.reset(new ReadBuffer);
117 DCHECK(!write_buffer_);
118 write_buffer_.reset(new WriteBuffer);
119
120 if (!OnInit())
121 return false;
122
123 return ScheduleRead() == IO_PENDING;
124 }
125
126 void RawChannel::Shutdown() {
127 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
128
129 base::AutoLock locker(write_lock_);
130
131 weak_ptr_factory_.InvalidateWeakPtrs();
132
133 read_stopped_ = true;
134 write_stopped_ = true;
135
136 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
137 }
138
139 // Reminder: This must be thread-safe.
140 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
141 base::AutoLock locker(write_lock_);
142 if (write_stopped_)
143 return false;
144
145 if (!write_buffer_->message_queue_.empty()) {
146 write_buffer_->message_queue_.push_back(message.release());
147 return true;
148 }
149
150 write_buffer_->message_queue_.push_front(message.release());
151 DCHECK_EQ(write_buffer_->offset_, 0u);
152
153 size_t bytes_written = 0;
154 IOResult io_result = WriteNoLock(&bytes_written);
155 if (io_result == IO_PENDING)
156 return true;
157
158 bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
159 bytes_written);
160 if (!result) {
161 // Even if we're on the I/O thread, don't call |OnFatalError()| in the
162 // nested context.
163 message_loop_for_io_->PostTask(
164 FROM_HERE,
165 base::Bind(&RawChannel::CallOnFatalError,
166 weak_ptr_factory_.GetWeakPtr(),
167 Delegate::FATAL_ERROR_FAILED_WRITE));
168 }
169
170 return result;
171 }
172
173 RawChannel::ReadBuffer* RawChannel::read_buffer() {
174 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
175 return read_buffer_.get();
176 }
177
178 RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
179 write_lock_.AssertAcquired();
180 return write_buffer_.get();
181 }
182
183 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
184 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
185
186 if (read_stopped_) {
187 NOTREACHED();
188 return;
189 }
190
191 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
192
193 // Keep reading data in a loop, and dispatches messages if enough data is
194 // received. Exit the loop if any of the following happens:
195 // - one or more messages were dispatched;
196 // - the last read failed, was a partial read or would block;
197 // - |Shutdown()| was called.
198 do {
199 if (io_result != IO_SUCCEEDED) {
200 read_stopped_ = true;
201 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
202 return;
203 }
204
205 read_buffer_->num_valid_bytes_ += bytes_read;
206
207 // Dispatch all the messages that we can.
208 bool did_dispatch_message = false;
209 // Tracks the offset of the first undispatched message in |read_buffer_|.
210 // Currently, we copy data to ensure that this is zero at the beginning.
211 size_t read_buffer_start = 0;
212 size_t remaining_bytes = read_buffer_->num_valid_bytes_;
213 size_t message_size;
214 // Note that we rely on short-circuit evaluation here:
215 // - |read_buffer_start| may be an invalid index into
216 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
217 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
218 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
219 // next read).
220 while (remaining_bytes > 0 &&
221 MessageInTransit::GetNextMessageSize(
222 &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
223 &message_size) &&
224 remaining_bytes >= message_size) {
225 // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with
226 // some sort of "view" abstraction.
227 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
228 &read_buffer_->buffer_[read_buffer_start]);
229 DCHECK_EQ(message.total_size(), message_size);
230
231 // Dispatch the message.
232 delegate_->OnReadMessage(message);
233 if (read_stopped_) {
234 // |Shutdown()| was called in |OnReadMessage()|.
235 // TODO(vtl): Add test for this case.
236 return;
237 }
238 did_dispatch_message = true;
239
240 // Update our state.
241 read_buffer_start += message_size;
242 remaining_bytes -= message_size;
243 }
244
245 if (read_buffer_start > 0) {
246 // Move data back to start.
247 read_buffer_->num_valid_bytes_ = remaining_bytes;
248 if (read_buffer_->num_valid_bytes_ > 0) {
249 memmove(&read_buffer_->buffer_[0],
250 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
251 }
252 read_buffer_start = 0;
253 }
254
255 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
256 kReadSize) {
257 // Use power-of-2 buffer sizes.
258 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
259 // maximum message size to whatever extent necessary).
260 // TODO(vtl): We may often be able to peek at the header and get the real
261 // required extra space (which may be much bigger than |kReadSize|).
262 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
263 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
264 new_size *= 2;
265
266 // TODO(vtl): It's suboptimal to zero out the fresh memory.
267 read_buffer_->buffer_.resize(new_size, 0);
268 }
269
270 // (1) If we dispatched any messages, stop reading for now (and let the
271 // message loop do its thing for another round).
272 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
273 // a single message. Risks: slower, more complex if we want to avoid lots of
274 // copying. ii. Keep reading until there's no more data and dispatch all the
275 // messages we can. Risks: starvation of other users of the message loop.)
276 // (2) If we didn't max out |kReadSize|, stop reading for now.
277 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
278 bytes_read = 0;
279 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
280 } while (io_result != IO_PENDING);
281 }
282
283 void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
284 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
285
286 bool did_fail = false;
287 {
288 base::AutoLock locker(write_lock_);
289 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
290
291 if (write_stopped_) {
292 NOTREACHED();
293 return;
294 }
295
296 did_fail = !OnWriteCompletedNoLock(result, bytes_written);
297 }
298
299 if (did_fail)
300 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
301 }
302
303 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
304 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
305 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
306 delegate_->OnFatalError(fatal_error);
307 }
308
309 bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
310 write_lock_.AssertAcquired();
311
312 DCHECK(!write_stopped_);
313 DCHECK(!write_buffer_->message_queue_.empty());
314
315 if (result) {
316 if (bytes_written < write_buffer_->GetTotalBytesToWrite()) {
317 // Partial (or no) write.
318 write_buffer_->offset_ += bytes_written;
319 } else {
320 // Complete write.
321 DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite());
322 delete write_buffer_->message_queue_.front();
323 write_buffer_->message_queue_.pop_front();
324 write_buffer_->offset_ = 0;
325 }
326
327 if (write_buffer_->message_queue_.empty())
328 return true;
329
330 // Schedule the next write.
331 if (ScheduleWriteNoLock() == IO_PENDING)
332 return true;
333 }
334
335 write_stopped_ = true;
336 STLDeleteElements(&write_buffer_->message_queue_);
337 write_buffer_->offset_ = 0;
338 return false;
339 }
340
341 } // namespace system
342 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698