Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(412)

Side by Side Diff: mojo/system/raw_channel.cc

Issue 169723004: RawChannel refactoring (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/system/raw_channel.h ('k') | mojo/system/raw_channel_posix.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/raw_channel.h"
6
7 #include <string.h>
8
9 #include <algorithm>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/stl_util.h"
16 #include "mojo/system/message_in_transit.h"
17
18 namespace mojo {
19 namespace system {
20
21 const size_t kReadSize = 4096;
22
23 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
24 }
25
26 RawChannel::ReadBuffer::~ReadBuffer() {}
27
28 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
29 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
30 *addr = &buffer_[0] + num_valid_bytes_;
31 *size = kReadSize;
32 }
33
34 RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {}
35
36 RawChannel::WriteBuffer::~WriteBuffer() {
37 STLDeleteElements(&message_queue_);
38 }
39
40 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
41 buffers->clear();
42
43 size_t bytes_to_write = GetTotalBytesToWrite();
44 if (bytes_to_write == 0)
45 return;
46
47 MessageInTransit* message = message_queue_.front();
48 if (!message->secondary_buffer_size()) {
49 // Only write from the main buffer.
50 DCHECK_LT(offset_, message->main_buffer_size());
51 DCHECK_LE(bytes_to_write, message->main_buffer_size());
52 Buffer buffer = {
53 static_cast<const char*>(message->main_buffer()) + offset_,
54 bytes_to_write};
55 buffers->push_back(buffer);
56 return;
57 }
58
59 if (offset_ >= message->main_buffer_size()) {
60 // Only write from the secondary buffer.
61 DCHECK_LT(offset_ - message->main_buffer_size(),
62 message->secondary_buffer_size());
63 DCHECK_LE(bytes_to_write, message->secondary_buffer_size());
64 Buffer buffer = {
65 static_cast<const char*>(message->secondary_buffer()) +
66 (offset_ - message->main_buffer_size()),
67 bytes_to_write};
68 buffers->push_back(buffer);
69 return;
70 }
71
72 // Write from both buffers.
73 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ +
74 message->secondary_buffer_size());
75 Buffer buffer1 = {
76 static_cast<const char*>(message->main_buffer()) + offset_,
77 message->main_buffer_size() - offset_};
78 buffers->push_back(buffer1);
79 Buffer buffer2 = {
80 static_cast<const char*>(message->secondary_buffer()),
81 message->secondary_buffer_size()};
82 buffers->push_back(buffer2);
83 }
84
85 size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const {
86 if (message_queue_.empty())
87 return 0;
88
89 MessageInTransit* message = message_queue_.front();
90 DCHECK_LT(offset_, message->total_size());
91 return message->total_size() - offset_;
92 }
93
94 RawChannel::RawChannel(Delegate* delegate,
95 base::MessageLoopForIO* message_loop_for_io)
96 : delegate_(delegate),
97 message_loop_for_io_(message_loop_for_io),
98 read_stopped_(false),
99 write_stopped_(false),
100 weak_ptr_factory_(this) {
101 }
102
103 RawChannel::~RawChannel() {
104 DCHECK(!read_buffer_);
105 DCHECK(!write_buffer_);
106
107 // No need to take the |write_lock_| here -- if there are still weak pointers
108 // outstanding, then we're hosed anyway (since we wouldn't be able to
109 // invalidate them cleanly, since we might not be on the I/O thread).
110 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
111 }
112
113 bool RawChannel::Init() {
114 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
115
116 // No need to take the lock. No one should be using us yet.
117 DCHECK(!read_buffer_);
118 read_buffer_.reset(new ReadBuffer);
119 DCHECK(!write_buffer_);
120 write_buffer_.reset(new WriteBuffer);
121
122 if (!OnInit())
123 return false;
124
125 return ScheduleRead() == IO_PENDING;
126 }
127
128 void RawChannel::Shutdown() {
129 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
130
131 base::AutoLock locker(write_lock_);
132
133 weak_ptr_factory_.InvalidateWeakPtrs();
134
135 read_stopped_ = true;
136 write_stopped_ = true;
137
138 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
139 }
140
141 // Reminder: This must be thread-safe.
142 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
143 base::AutoLock locker(write_lock_);
144 if (write_stopped_)
145 return false;
146
147 if (!write_buffer_->message_queue_.empty()) {
148 write_buffer_->message_queue_.push_back(message.release());
149 return true;
150 }
151
152 write_buffer_->message_queue_.push_front(message.release());
153 DCHECK_EQ(write_buffer_->offset_, 0u);
154
155 size_t bytes_written = 0;
156 IOResult io_result = WriteNoLock(&bytes_written);
157 if (io_result == IO_PENDING)
158 return true;
159
160 bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
161 bytes_written);
162 if (!result) {
163 // Even if we're on the I/O thread, don't call |OnFatalError()| in the
164 // nested context.
165 message_loop_for_io_->PostTask(
166 FROM_HERE,
167 base::Bind(&RawChannel::CallOnFatalError,
168 weak_ptr_factory_.GetWeakPtr(),
169 Delegate::FATAL_ERROR_FAILED_WRITE));
170 }
171
172 return result;
173 }
174
175 RawChannel::ReadBuffer* RawChannel::read_buffer() {
176 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
177 return read_buffer_.get();
178 }
179
180 RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
181 write_lock_.AssertAcquired();
182 return write_buffer_.get();
183 }
184
185 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
186 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
187
188 if (read_stopped_) {
189 NOTREACHED();
190 return;
191 }
192
193 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
194
195 // Keep reading data in a loop, and dispatches messages if enough data is
196 // received. Exit the loop if any of the following happens:
197 // - one or more messages were dispatched;
198 // - the last read failed, was a partial read or would block;
199 // - |Shutdown()| was called.
200 do {
201 if (io_result != IO_SUCCEEDED) {
202 read_stopped_ = true;
203 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
204 return;
205 }
206
207 read_buffer_->num_valid_bytes_ += bytes_read;
208
209 // Dispatch all the messages that we can.
210 bool did_dispatch_message = false;
211 // Tracks the offset of the first undispatched message in |read_buffer_|.
212 // Currently, we copy data to ensure that this is zero at the beginning.
213 size_t read_buffer_start = 0;
214 size_t remaining_bytes = read_buffer_->num_valid_bytes_;
215 size_t message_size;
216 // Note that we rely on short-circuit evaluation here:
217 // - |read_buffer_start| may be an invalid index into
218 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
219 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
220 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
221 // next read).
222 while (remaining_bytes > 0 &&
223 MessageInTransit::GetNextMessageSize(
224 &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
225 &message_size) &&
226 remaining_bytes >= message_size) {
227 // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with
228 // some sort of "view" abstraction.
229 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
230 &read_buffer_->buffer_[read_buffer_start]);
231 DCHECK_EQ(message.total_size(), message_size);
232
233 // Dispatch the message.
234 delegate_->OnReadMessage(message);
235 if (read_stopped_) {
236 // |Shutdown()| was called in |OnReadMessage()|.
237 // TODO(vtl): Add test for this case.
238 return;
239 }
240 did_dispatch_message = true;
241
242 // Update our state.
243 read_buffer_start += message_size;
244 remaining_bytes -= message_size;
245 }
246
247 if (read_buffer_start > 0) {
248 // Move data back to start.
249 read_buffer_->num_valid_bytes_ = remaining_bytes;
250 if (read_buffer_->num_valid_bytes_ > 0) {
251 memmove(&read_buffer_->buffer_[0],
252 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
253 }
254 read_buffer_start = 0;
255 }
256
257 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
258 kReadSize) {
259 // Use power-of-2 buffer sizes.
260 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
261 // maximum message size to whatever extent necessary).
262 // TODO(vtl): We may often be able to peek at the header and get the real
263 // required extra space (which may be much bigger than |kReadSize|).
264 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
265 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
266 new_size *= 2;
267
268 // TODO(vtl): It's suboptimal to zero out the fresh memory.
269 read_buffer_->buffer_.resize(new_size, 0);
270 }
271
272 // (1) If we dispatched any messages, stop reading for now (and let the
273 // message loop do its thing for another round).
274 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
275 // a single message. Risks: slower, more complex if we want to avoid lots of
276 // copying. ii. Keep reading until there's no more data and dispatch all the
277 // messages we can. Risks: starvation of other users of the message loop.)
278 // (2) If we didn't max out |kReadSize|, stop reading for now.
279 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
280 bytes_read = 0;
281 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
282 } while (io_result != IO_PENDING);
283 }
284
285 void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
286 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
287
288 bool did_fail = false;
289 {
290 base::AutoLock locker(write_lock_);
291 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
292
293 if (write_stopped_) {
294 NOTREACHED();
295 return;
296 }
297
298 did_fail = !OnWriteCompletedNoLock(result, bytes_written);
299 }
300
301 if (did_fail)
302 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
303 }
304
305 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
306 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
307 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
308 delegate_->OnFatalError(fatal_error);
309 }
310
311 bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
312 write_lock_.AssertAcquired();
313
314 DCHECK(!write_stopped_);
315 DCHECK(!write_buffer_->message_queue_.empty());
316
317 if (result) {
318 if (bytes_written < write_buffer_->GetTotalBytesToWrite()) {
319 // Partial (or no) write.
320 write_buffer_->offset_ += bytes_written;
321 } else {
322 // Complete write.
323 DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite());
324 delete write_buffer_->message_queue_.front();
325 write_buffer_->message_queue_.pop_front();
326 write_buffer_->offset_ = 0;
327 }
328
329 if (write_buffer_->message_queue_.empty())
330 return true;
331
332 // Schedule the next write.
333 if (ScheduleWriteNoLock() == IO_PENDING)
334 return true;
335 }
336
337 write_stopped_ = true;
338 STLDeleteElements(&write_buffer_->message_queue_);
339 write_buffer_->offset_ = 0;
340 return false;
341 }
342
343 } // namespace system
344 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/raw_channel.h ('k') | mojo/system/raw_channel_posix.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698