Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(71)

Side by Side Diff: mojo/system/raw_channel_posix.cc

Issue 169723004: RawChannel refactoring (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: support multi-segment write Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« mojo/system/raw_channel.h ('K') | « mojo/system/raw_channel.cc ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/raw_channel.h" 5 #include "mojo/system/raw_channel.h"
6 6
7 #include <errno.h> 7 #include <errno.h>
8 #include <string.h>
9 #include <sys/uio.h> 8 #include <sys/uio.h>
10 #include <unistd.h> 9 #include <unistd.h>
11 10
12 #include <algorithm>
13 #include <deque>
14 #include <vector>
15
16 #include "base/basictypes.h" 11 #include "base/basictypes.h"
17 #include "base/bind.h" 12 #include "base/bind.h"
18 #include "base/compiler_specific.h" 13 #include "base/compiler_specific.h"
19 #include "base/location.h" 14 #include "base/location.h"
20 #include "base/logging.h" 15 #include "base/logging.h"
21 #include "base/memory/scoped_ptr.h" 16 #include "base/memory/scoped_ptr.h"
22 #include "base/memory/weak_ptr.h" 17 #include "base/memory/weak_ptr.h"
23 #include "base/message_loop/message_loop.h" 18 #include "base/message_loop/message_loop.h"
24 #include "base/posix/eintr_wrapper.h" 19 #include "base/posix/eintr_wrapper.h"
25 #include "base/stl_util.h"
26 #include "base/synchronization/lock.h" 20 #include "base/synchronization/lock.h"
27 #include "mojo/system/embedder/platform_handle.h" 21 #include "mojo/system/embedder/platform_handle.h"
28 #include "mojo/system/message_in_transit.h"
29 22
30 namespace mojo { 23 namespace mojo {
31 namespace system { 24 namespace system {
32 25
33 namespace { 26 namespace {
34 27
35 const size_t kReadSize = 4096;
36
37 class RawChannelPosix : public RawChannel, 28 class RawChannelPosix : public RawChannel,
38 public base::MessageLoopForIO::Watcher { 29 public base::MessageLoopForIO::Watcher {
39 public: 30 public:
40 RawChannelPosix(embedder::ScopedPlatformHandle handle, 31 RawChannelPosix(embedder::ScopedPlatformHandle handle,
41 Delegate* delegate, 32 Delegate* delegate,
42 base::MessageLoopForIO* message_loop_for_io); 33 base::MessageLoopForIO* message_loop_for_io);
43 virtual ~RawChannelPosix(); 34 virtual ~RawChannelPosix();
44 35
36 private:
45 // |RawChannel| implementation: 37 // |RawChannel| implementation:
46 virtual bool Init() OVERRIDE; 38 virtual IOResult Read(size_t* bytes_read) OVERRIDE;
47 virtual void Shutdown() OVERRIDE; 39 virtual IOResult ScheduleRead() OVERRIDE;
48 virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; 40 virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE;
41 virtual IOResult ScheduleWriteNoLock() OVERRIDE;
42 virtual bool OnInit() OVERRIDE;
43 virtual void OnShutdownNoLock(
44 scoped_ptr<ReadBuffer> read_buffer,
45 scoped_ptr<WriteBuffer> write_buffer) OVERRIDE;
49 46
50 private:
51 // |base::MessageLoopForIO::Watcher| implementation: 47 // |base::MessageLoopForIO::Watcher| implementation:
52 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; 48 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
53 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; 49 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
54 50
55 // Watches for |fd_| to become writable. Must be called on the I/O thread. 51 // Watches for |fd_| to become writable. Must be called on the I/O thread.
56 void WaitToWrite(); 52 void WaitToWrite();
57 53
58 // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
59 // thread WITHOUT |write_lock_| held.
60 void CallOnFatalError(Delegate::FatalError fatal_error);
61
62 // Writes the message at the front of |write_message_queue_|, starting at
63 // |write_message_offset_|. It removes and destroys if the write completes and
64 // otherwise updates |write_message_offset_|. Returns true on success. Must be
65 // called under |write_lock_|.
66 bool WriteFrontMessageNoLock();
67
68 // Cancels all pending writes and destroys the contents of
69 // |write_message_queue_|. Should only be called if |write_stopped_| is false;
70 // sets |write_stopped_| to true. Must be called under |write_lock_|.
71 void CancelPendingWritesNoLock();
72
73 embedder::ScopedPlatformHandle fd_; 54 embedder::ScopedPlatformHandle fd_;
74 55
75 // Only used on the I/O thread: 56 // The following members are only used on the I/O thread:
76 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; 57 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
77 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; 58 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
78 59
79 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| 60 bool pending_read_;
80 // is always aligned with a message boundary (we will copy memory to ensure
81 // this), but |read_buffer_| may be larger than the actual number of bytes we
82 // have.
83 std::vector<char> read_buffer_;
84 size_t read_buffer_num_valid_bytes_;
85 61
86 base::Lock write_lock_; // Protects the following members. 62 // The following members are used on multiple threads and protected by
87 bool write_stopped_; 63 // |write_lock()|:
88 // TODO(vtl): When C++11 is available, switch this to a deque of 64 bool pending_write_;
89 // |scoped_ptr|/|unique_ptr|s. 65
90 std::deque<MessageInTransit*> write_message_queue_;
91 size_t write_message_offset_;
92 // This is used for posting tasks from write threads to the I/O thread. It 66 // This is used for posting tasks from write threads to the I/O thread. It
93 // must only be accessed under |write_lock_|. The weak pointers it produces 67 // must only be accessed under |write_lock_|. The weak pointers it produces
94 // are only used/invalidated on the I/O thread. 68 // are only used/invalidated on the I/O thread.
95 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; 69 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
96 70
97 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); 71 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
98 }; 72 };
99 73
100 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, 74 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle,
101 Delegate* delegate, 75 Delegate* delegate,
102 base::MessageLoopForIO* message_loop_for_io) 76 base::MessageLoopForIO* message_loop_for_io)
103 : RawChannel(delegate, message_loop_for_io), 77 : RawChannel(delegate, message_loop_for_io),
104 fd_(handle.Pass()), 78 fd_(handle.Pass()),
105 read_buffer_num_valid_bytes_(0), 79 pending_read_(false),
106 write_stopped_(false), 80 pending_write_(false),
107 write_message_offset_(0),
108 weak_ptr_factory_(this) { 81 weak_ptr_factory_(this) {
109 CHECK_EQ(RawChannel::message_loop_for_io()->type(),
110 base::MessageLoop::TYPE_IO);
111 DCHECK(fd_.is_valid()); 82 DCHECK(fd_.is_valid());
112 } 83 }
113 84
114 RawChannelPosix::~RawChannelPosix() { 85 RawChannelPosix::~RawChannelPosix() {
115 DCHECK(write_stopped_); 86 DCHECK(!pending_read_);
116 DCHECK(!fd_.is_valid()); 87 DCHECK(!pending_write_);
117 88
118 // No need to take the |write_lock_| here -- if there are still weak pointers 89 // No need to take the |write_lock()| here -- if there are still weak pointers
119 // outstanding, then we're hosed anyway (since we wouldn't be able to 90 // outstanding, then we're hosed anyway (since we wouldn't be able to
120 // invalidate them cleanly, since we might not be on the I/O thread). 91 // invalidate them cleanly, since we might not be on the I/O thread).
121 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 92 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
122 93
123 // These must have been shut down/destroyed on the I/O thread. 94 // These must have been shut down/destroyed on the I/O thread.
124 DCHECK(!read_watcher_.get()); 95 DCHECK(!read_watcher_.get());
125 DCHECK(!write_watcher_.get()); 96 DCHECK(!write_watcher_.get());
126 } 97 }
127 98
128 bool RawChannelPosix::Init() { 99 RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
100 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
101 DCHECK(!pending_read_);
102
103 char* buffer = NULL;
104 size_t bytes_to_read = 0;
105 read_buffer()->GetBuffer(&buffer, &bytes_to_read);
106
107 ssize_t read_result = HANDLE_EINTR(read(fd_.get().fd, buffer, bytes_to_read));
108
109 if (read_result >= 0) {
110 *bytes_read = static_cast<size_t>(read_result);
111 return IO_SUCCEEDED;
112 } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
viettrungluu 2014/02/27 05:22:15 nit: no |else| needed here
yzshen1 2014/02/27 06:56:18 Done.
113 PLOG(ERROR) << "read";
114
115 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
116 read_watcher_.reset();
117
118 return IO_FAILED;
119 }
120
121 return ScheduleRead();
122 }
123
124 RawChannel::IOResult RawChannelPosix::ScheduleRead() {
125 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
126 DCHECK(!pending_read_);
127
128 pending_read_ = true;
129
130 return IO_PENDING;
131 }
132
133 RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) {
134 write_lock().AssertAcquired();
135
136 DCHECK(!pending_write_);
137
138 WriteBuffer::BufferVector buffers;
139 write_buffer_no_lock()->GetBuffers(&buffers);
140 DCHECK(!buffers.empty());
141
142 ssize_t write_result = -1;
143 if (buffers.size() == 1) {
144 write_result = HANDLE_EINTR(
145 write(fd_.get().fd, buffers[0].first, buffers[0].second));
146 } else {
147 // Note that using |writev()| is measurably slower than using |write()| --
148 // at least in a microbenchmark -- but much faster than using multiple
149 // |write()|s.
150 iovec* iov = new iovec[buffers.size()];
viettrungluu 2014/02/27 05:22:15 I think it's overkill to do a heap allocation for
yzshen1 2014/02/27 06:56:18 Done.
viettrungluu 2014/02/27 07:36:35 Isn't consistency guaranteed by the fact that acce
yzshen1 2014/02/27 16:19:33 I mean: write_buffer_->message_queue_->pop_front(
viettrungluu 2014/02/27 17:21:41 Okay, LGTM then.
151 for (size_t i = 0; i < buffers.size(); ++i) {
152 iov[i].iov_base = const_cast<char*>(buffers[i].first);
153 iov[i].iov_len = buffers[i].second;
154 }
155
156 write_result = HANDLE_EINTR(
157 writev(fd_.get().fd, iov, buffers.size()));
158 delete [] iov;
159 }
160
161 if (write_result >= 0) {
162 *bytes_written = static_cast<size_t>(write_result);
163 return IO_SUCCEEDED;
164 } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
viettrungluu 2014/02/27 05:22:15 no else
yzshen1 2014/02/27 06:56:18 Done.
165 PLOG(ERROR) << "write of size "
166 << write_buffer_no_lock()->GetTotalBytesToWrite();
167 return IO_FAILED;
168 }
169
170 return ScheduleWriteNoLock();
171 }
172
173 RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
174 write_lock().AssertAcquired();
175
176 DCHECK(!pending_write_);
177
178 // Set up to wait for the FD to become writable.
179 // If we're not on the I/O thread, we have to post a task to do this.
180 if (base::MessageLoop::current() != message_loop_for_io()) {
181 message_loop_for_io()->PostTask(
182 FROM_HERE,
183 base::Bind(&RawChannelPosix::WaitToWrite,
184 weak_ptr_factory_.GetWeakPtr()));
185 pending_write_ = true;
186 return IO_PENDING;
187 }
188
189 if (message_loop_for_io()->WatchFileDescriptor(
190 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
191 write_watcher_.get(), this)) {
192 pending_write_ = true;
193 return IO_PENDING;
194 }
195
196 return IO_FAILED;
197 }
198
199 bool RawChannelPosix::OnInit() {
129 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 200 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
130 201
131 DCHECK(!read_watcher_.get()); 202 DCHECK(!read_watcher_.get());
132 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 203 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
133 DCHECK(!write_watcher_.get()); 204 DCHECK(!write_watcher_.get());
134 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 205 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
135 206
136 // No need to take the lock. No one should be using us yet.
137 DCHECK(write_message_queue_.empty());
138
139 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, 207 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true,
140 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { 208 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
141 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly 209 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
142 // (in the sense of returning the message loop's state to what it was before 210 // (in the sense of returning the message loop's state to what it was before
143 // it was called). 211 // it was called).
144 read_watcher_.reset(); 212 read_watcher_.reset();
145 write_watcher_.reset(); 213 write_watcher_.reset();
146 return false; 214 return false;
147 } 215 }
148 216
149 return true; 217 return true;
150 } 218 }
151 219
152 void RawChannelPosix::Shutdown() { 220 void RawChannelPosix::OnShutdownNoLock(
153 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 221 scoped_ptr<ReadBuffer> /*read_buffer*/,
154 222 scoped_ptr<WriteBuffer> /*write_buffer*/) {
155 base::AutoLock locker(write_lock_); 223 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
156 if (!write_stopped_) 224 write_lock().AssertAcquired();
157 CancelPendingWritesNoLock();
158 225
159 read_watcher_.reset(); // This will stop watching (if necessary). 226 read_watcher_.reset(); // This will stop watching (if necessary).
160 write_watcher_.reset(); // This will stop watching (if necessary). 227 write_watcher_.reset(); // This will stop watching (if necessary).
161 228
229 pending_read_ = false;
230 pending_write_ = false;
231
162 DCHECK(fd_.is_valid()); 232 DCHECK(fd_.is_valid());
163 fd_.reset(); 233 fd_.reset();
164 234
165 weak_ptr_factory_.InvalidateWeakPtrs(); 235 weak_ptr_factory_.InvalidateWeakPtrs();
166 } 236 }
167 237
168 // Reminder: This must be thread-safe, and takes ownership of |message|.
169 bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) {
170 base::AutoLock locker(write_lock_);
171 if (write_stopped_)
172 return false;
173
174 if (!write_message_queue_.empty()) {
175 write_message_queue_.push_back(message.release());
176 return true;
177 }
178
179 write_message_queue_.push_front(message.release());
180 DCHECK_EQ(write_message_offset_, 0u);
181 bool result = WriteFrontMessageNoLock();
182 DCHECK(result || write_message_queue_.empty());
183
184 if (!result) {
185 // Even if we're on the I/O thread, don't call |OnFatalError()| in the
186 // nested context.
187 message_loop_for_io()->PostTask(
188 FROM_HERE,
189 base::Bind(&RawChannelPosix::CallOnFatalError,
190 weak_ptr_factory_.GetWeakPtr(),
191 Delegate::FATAL_ERROR_FAILED_WRITE));
192 } else if (!write_message_queue_.empty()) {
193 // Set up to wait for the FD to become writable. If we're not on the I/O
194 // thread, we have to post a task to do this.
195 if (base::MessageLoop::current() == message_loop_for_io()) {
196 WaitToWrite();
197 } else {
198 message_loop_for_io()->PostTask(
199 FROM_HERE,
200 base::Bind(&RawChannelPosix::WaitToWrite,
201 weak_ptr_factory_.GetWeakPtr()));
202 }
203 }
204
205 return result;
206 }
207
208 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { 238 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
209 DCHECK_EQ(fd, fd_.get().fd); 239 DCHECK_EQ(fd, fd_.get().fd);
210 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 240 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
211 241
212 bool did_dispatch_message = false; 242 if (!pending_read_) {
213 // Tracks the offset of the first undispatched message in |read_buffer_|. 243 NOTREACHED();
214 // Currently, we copy data to ensure that this is zero at the beginning. 244 return;
215 size_t read_buffer_start = 0; 245 }
216 for (;;) { 246
217 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) 247 pending_read_ = false;
218 < kReadSize) { 248 size_t bytes_read = 0;
219 // Use power-of-2 buffer sizes. 249 IOResult result = Read(&bytes_read);
220 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 250 if (result != IO_PENDING)
221 // maximum message size to whatever extent necessary). 251 OnReadCompleted(result == IO_SUCCEEDED, bytes_read);
222 // TODO(vtl): We may often be able to peek at the header and get the real 252
223 // required extra space (which may be much bigger than |kReadSize|). 253 // On failure, |read_watcher_| must have been reset; on success,
224 size_t new_size = std::max(read_buffer_.size(), kReadSize); 254 // we assume that |OnReadCompleted()| always schedules another read.
225 while (new_size < 255 // Otherwise, we could end up spinning -- getting
226 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) 256 // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual
227 new_size *= 2; 257 // read.
228 258 // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't
229 // TODO(vtl): It's suboptimal to zero out the fresh memory. 259 // schedule a new read. But that code won't be reached under the current
230 read_buffer_.resize(new_size, 0); 260 // RawChannel implementation.
231 } 261 DCHECK(!read_watcher_.get() || pending_read_);
232
233 ssize_t bytes_read = HANDLE_EINTR(
234 read(fd_.get().fd,
235 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
236 kReadSize));
237 if (bytes_read < 0) {
238 if (errno != EAGAIN && errno != EWOULDBLOCK) {
239 PLOG(ERROR) << "read";
240
241 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called
242 // again.
243 read_watcher_.reset();
244
245 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
246 return;
247 }
248
249 break;
250 }
251
252 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read);
253
254 // Dispatch all the messages that we can.
255 size_t message_size;
256 // Note that we rely on short-circuit evaluation here:
257 // - |read_buffer_start| may be an invalid index into |read_buffer_| if
258 // |read_buffer_num_valid_bytes_| is zero.
259 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
260 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
261 // next read).
262 while (read_buffer_num_valid_bytes_ > 0 &&
263 MessageInTransit::GetNextMessageSize(
264 &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_,
265 &message_size) &&
266 read_buffer_num_valid_bytes_ >= message_size) {
267 // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with
268 // some sort of "view" abstraction.
269 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
270 &read_buffer_[read_buffer_start]);
271 DCHECK_EQ(message.total_size(), message_size);
272
273 // Dispatch the message.
274 delegate()->OnReadMessage(message);
275 if (!read_watcher_.get()) {
276 // |Shutdown()| was called in |OnReadMessage()|.
277 // TODO(vtl): Add test for this case.
278 return;
279 }
280 did_dispatch_message = true;
281
282 // Update our state.
283 read_buffer_start += message_size;
284 read_buffer_num_valid_bytes_ -= message_size;
285 }
286
287 // If we dispatched any messages, stop reading for now (and let the message
288 // loop do its thing for another round).
289 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
290 // a single message. Risks: slower, more complex if we want to avoid lots of
291 // copying. ii. Keep reading until there's no more data and dispatch all the
292 // messages we can. Risks: starvation of other users of the message loop.)
293 if (did_dispatch_message)
294 break;
295
296 // If we didn't max out |kReadSize|, stop reading for now.
297 if (static_cast<size_t>(bytes_read) < kReadSize)
298 break;
299
300 // Else try to read some more....
301 }
302
303 // Move data back to start.
304 if (read_buffer_start > 0) {
305 if (read_buffer_num_valid_bytes_ > 0) {
306 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
307 read_buffer_num_valid_bytes_);
308 }
309 read_buffer_start = 0;
310 }
311 } 262 }
312 263
313 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { 264 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
314 DCHECK_EQ(fd, fd_.get().fd); 265 DCHECK_EQ(fd, fd_.get().fd);
315 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 266 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
316 267
317 bool did_fail = false; 268 IOResult result = IO_FAILED;
269 size_t bytes_written = 0;
318 { 270 {
319 base::AutoLock locker(write_lock_); 271 base::AutoLock locker(write_lock());
320 DCHECK_EQ(write_stopped_, write_message_queue_.empty()); 272
321 273 DCHECK(pending_write_);
322 if (write_stopped_) { 274
323 write_watcher_.reset(); 275 pending_write_ = false;
324 return; 276 result = WriteNoLock(&bytes_written);
277 }
278
279 if (result != IO_PENDING)
280 OnWriteCompleted(result == IO_SUCCEEDED, bytes_written);
281 }
282
283 void RawChannelPosix::WaitToWrite() {
284 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
285
286 DCHECK(write_watcher_.get());
287
288 if (!message_loop_for_io()->WatchFileDescriptor(
289 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
290 write_watcher_.get(), this)) {
291 {
292 base::AutoLock locker(write_lock());
293
294 DCHECK(pending_write_);
295 pending_write_ = false;
325 } 296 }
326 297 OnWriteCompleted(false, 0);
327 bool result = WriteFrontMessageNoLock(); 298 }
328 DCHECK(result || write_message_queue_.empty());
329
330 if (!result) {
331 did_fail = true;
332 write_watcher_.reset();
333 } else if (!write_message_queue_.empty()) {
334 WaitToWrite();
335 }
336 }
337 if (did_fail)
338 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
339 }
340
341 void RawChannelPosix::WaitToWrite() {
342 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
343
344 DCHECK(write_watcher_.get());
345 bool result = message_loop_for_io()->WatchFileDescriptor(
346 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
347 write_watcher_.get(), this);
348 DCHECK(result);
349 }
350
351 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
352 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
353 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
354 delegate()->OnFatalError(fatal_error);
355 }
356
357 // TODO(vtl): This will collide with yzshen's work. This is just a hacky,
358 // minimally-invasive function that does what I want (write/resume writing a
359 // |MessageInTransit| that may consist of more than one buffer).
360 ssize_t WriteMessageInTransit(int fd,
361 MessageInTransit* message,
362 size_t offset,
363 size_t* bytes_to_write) {
364 *bytes_to_write = message->total_size() - offset;
365 if (!message->secondary_buffer_size()) {
366 // Only write from the main buffer.
367 DCHECK_LT(offset, message->main_buffer_size());
368 DCHECK_LE(*bytes_to_write, message->main_buffer_size());
369 return HANDLE_EINTR(
370 write(fd,
371 static_cast<const char*>(message->main_buffer()) + offset,
372 *bytes_to_write));
373 }
374 if (offset >= message->main_buffer_size()) {
375 // Only write from the secondary buffer.
376 DCHECK_LT(offset - message->main_buffer_size(),
377 message->secondary_buffer_size());
378 DCHECK_LE(*bytes_to_write, message->secondary_buffer_size());
379 return HANDLE_EINTR(
380 write(fd,
381 static_cast<const char*>(message->secondary_buffer()) +
382 (offset - message->main_buffer_size()),
383 *bytes_to_write));
384 }
385 // Write from both buffers. (Note that using |writev()| is measurably slower
386 // than using |write()| -- at least in a microbenchmark -- but much faster
387 // than using two |write()|s.)
388 DCHECK_EQ(*bytes_to_write, message->main_buffer_size() - offset +
389 message->secondary_buffer_size());
390 struct iovec iov[2] = {
391 { const_cast<char*>(
392 static_cast<const char*>(message->main_buffer()) + offset),
393 message->main_buffer_size() - offset },
394 { const_cast<void*>(message->secondary_buffer()),
395 message->secondary_buffer_size() }
396 };
397 return HANDLE_EINTR(writev(fd, iov, 2));
398 }
399
400 bool RawChannelPosix::WriteFrontMessageNoLock() {
401 write_lock_.AssertAcquired();
402
403 DCHECK(!write_stopped_);
404 DCHECK(!write_message_queue_.empty());
405
406 MessageInTransit* message = write_message_queue_.front();
407 DCHECK_LT(write_message_offset_, message->total_size());
408 size_t bytes_to_write;
409 ssize_t bytes_written = WriteMessageInTransit(fd_.get().fd,
410 message,
411 write_message_offset_,
412 &bytes_to_write);
413 if (bytes_written < 0) {
414 if (errno != EAGAIN && errno != EWOULDBLOCK) {
415 PLOG(ERROR) << "write of size " << bytes_to_write;
416 CancelPendingWritesNoLock();
417 return false;
418 }
419
420 // We simply failed to write since we'd block. The logic is the same as if
421 // we got a partial write.
422 bytes_written = 0;
423 }
424
425 DCHECK_GE(bytes_written, 0);
426 if (static_cast<size_t>(bytes_written) < bytes_to_write) {
427 // Partial (or no) write.
428 write_message_offset_ += static_cast<size_t>(bytes_written);
429 } else {
430 // Complete write.
431 DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write);
432 write_message_queue_.pop_front();
433 delete message;
434 write_message_offset_ = 0;
435 }
436
437 return true;
438 }
439
440 void RawChannelPosix::CancelPendingWritesNoLock() {
441 write_lock_.AssertAcquired();
442 DCHECK(!write_stopped_);
443
444 write_stopped_ = true;
445 STLDeleteElements(&write_message_queue_);
446 } 299 }
447 300
448 } // namespace 301 } // namespace
449 302
450 // ----------------------------------------------------------------------------- 303 // -----------------------------------------------------------------------------
451 304
452 // Static factory method declared in raw_channel.h. 305 // Static factory method declared in raw_channel.h.
453 // static 306 // static
454 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, 307 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle,
455 Delegate* delegate, 308 Delegate* delegate,
456 base::MessageLoopForIO* message_loop_for_io) { 309 base::MessageLoopForIO* message_loop_for_io) {
457 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io); 310 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io);
458 } 311 }
459 312
460 } // namespace system 313 } // namespace system
461 } // namespace mojo 314 } // namespace mojo
OLDNEW
« mojo/system/raw_channel.h ('K') | « mojo/system/raw_channel.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698