Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(134)

Side by Side Diff: mojo/system/raw_channel_posix.cc

Issue 169723004: RawChannel refactoring (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/system/raw_channel.cc ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/raw_channel.h" 5 #include "mojo/system/raw_channel.h"
6 6
7 #include <errno.h> 7 #include <errno.h>
8 #include <string.h>
9 #include <sys/uio.h> 8 #include <sys/uio.h>
10 #include <unistd.h> 9 #include <unistd.h>
11 10
12 #include <algorithm> 11 #include <algorithm>
13 #include <deque>
14 #include <vector>
15 12
16 #include "base/basictypes.h" 13 #include "base/basictypes.h"
17 #include "base/bind.h" 14 #include "base/bind.h"
18 #include "base/compiler_specific.h" 15 #include "base/compiler_specific.h"
19 #include "base/location.h" 16 #include "base/location.h"
20 #include "base/logging.h" 17 #include "base/logging.h"
21 #include "base/memory/scoped_ptr.h" 18 #include "base/memory/scoped_ptr.h"
22 #include "base/memory/weak_ptr.h" 19 #include "base/memory/weak_ptr.h"
23 #include "base/message_loop/message_loop.h" 20 #include "base/message_loop/message_loop.h"
24 #include "base/posix/eintr_wrapper.h" 21 #include "base/posix/eintr_wrapper.h"
25 #include "base/stl_util.h"
26 #include "base/synchronization/lock.h" 22 #include "base/synchronization/lock.h"
27 #include "mojo/system/embedder/platform_handle.h" 23 #include "mojo/system/embedder/platform_handle.h"
28 #include "mojo/system/message_in_transit.h"
29 24
30 namespace mojo { 25 namespace mojo {
31 namespace system { 26 namespace system {
32 27
33 namespace { 28 namespace {
34 29
35 const size_t kReadSize = 4096;
36
37 class RawChannelPosix : public RawChannel, 30 class RawChannelPosix : public RawChannel,
38 public base::MessageLoopForIO::Watcher { 31 public base::MessageLoopForIO::Watcher {
39 public: 32 public:
40 RawChannelPosix(embedder::ScopedPlatformHandle handle, 33 RawChannelPosix(embedder::ScopedPlatformHandle handle,
41 Delegate* delegate, 34 Delegate* delegate,
42 base::MessageLoopForIO* message_loop_for_io); 35 base::MessageLoopForIO* message_loop_for_io);
43 virtual ~RawChannelPosix(); 36 virtual ~RawChannelPosix();
44 37
38 private:
45 // |RawChannel| implementation: 39 // |RawChannel| implementation:
46 virtual bool Init() OVERRIDE; 40 virtual IOResult Read(size_t* bytes_read) OVERRIDE;
47 virtual void Shutdown() OVERRIDE; 41 virtual IOResult ScheduleRead() OVERRIDE;
48 virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; 42 virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE;
43 virtual IOResult ScheduleWriteNoLock() OVERRIDE;
44 virtual bool OnInit() OVERRIDE;
45 virtual void OnShutdownNoLock(
46 scoped_ptr<ReadBuffer> read_buffer,
47 scoped_ptr<WriteBuffer> write_buffer) OVERRIDE;
49 48
50 private:
51 // |base::MessageLoopForIO::Watcher| implementation: 49 // |base::MessageLoopForIO::Watcher| implementation:
52 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; 50 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
53 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; 51 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
54 52
55 // Watches for |fd_| to become writable. Must be called on the I/O thread. 53 // Watches for |fd_| to become writable. Must be called on the I/O thread.
56 void WaitToWrite(); 54 void WaitToWrite();
57 55
58 // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
59 // thread WITHOUT |write_lock_| held.
60 void CallOnFatalError(Delegate::FatalError fatal_error);
61
62 // Writes the message at the front of |write_message_queue_|, starting at
63 // |write_message_offset_|. It removes and destroys if the write completes and
64 // otherwise updates |write_message_offset_|. Returns true on success. Must be
65 // called under |write_lock_|.
66 bool WriteFrontMessageNoLock();
67
68 // Cancels all pending writes and destroys the contents of
69 // |write_message_queue_|. Should only be called if |write_stopped_| is false;
70 // sets |write_stopped_| to true. Must be called under |write_lock_|.
71 void CancelPendingWritesNoLock();
72
73 embedder::ScopedPlatformHandle fd_; 56 embedder::ScopedPlatformHandle fd_;
74 57
75 // Only used on the I/O thread: 58 // The following members are only used on the I/O thread:
76 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; 59 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
77 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; 60 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
78 61
79 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| 62 bool pending_read_;
80 // is always aligned with a message boundary (we will copy memory to ensure
81 // this), but |read_buffer_| may be larger than the actual number of bytes we
82 // have.
83 std::vector<char> read_buffer_;
84 size_t read_buffer_num_valid_bytes_;
85 63
86 base::Lock write_lock_; // Protects the following members. 64 // The following members are used on multiple threads and protected by
87 bool write_stopped_; 65 // |write_lock()|:
88 // TODO(vtl): When C++11 is available, switch this to a deque of 66 bool pending_write_;
89 // |scoped_ptr|/|unique_ptr|s. 67
90 std::deque<MessageInTransit*> write_message_queue_;
91 size_t write_message_offset_;
92 // This is used for posting tasks from write threads to the I/O thread. It 68 // This is used for posting tasks from write threads to the I/O thread. It
93 // must only be accessed under |write_lock_|. The weak pointers it produces 69 // must only be accessed under |write_lock_|. The weak pointers it produces
94 // are only used/invalidated on the I/O thread. 70 // are only used/invalidated on the I/O thread.
95 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; 71 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
96 72
97 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); 73 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
98 }; 74 };
99 75
100 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, 76 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle,
101 Delegate* delegate, 77 Delegate* delegate,
102 base::MessageLoopForIO* message_loop_for_io) 78 base::MessageLoopForIO* message_loop_for_io)
103 : RawChannel(delegate, message_loop_for_io), 79 : RawChannel(delegate, message_loop_for_io),
104 fd_(handle.Pass()), 80 fd_(handle.Pass()),
105 read_buffer_num_valid_bytes_(0), 81 pending_read_(false),
106 write_stopped_(false), 82 pending_write_(false),
107 write_message_offset_(0),
108 weak_ptr_factory_(this) { 83 weak_ptr_factory_(this) {
109 CHECK_EQ(RawChannel::message_loop_for_io()->type(),
110 base::MessageLoop::TYPE_IO);
111 DCHECK(fd_.is_valid()); 84 DCHECK(fd_.is_valid());
112 } 85 }
113 86
114 RawChannelPosix::~RawChannelPosix() { 87 RawChannelPosix::~RawChannelPosix() {
115 DCHECK(write_stopped_); 88 DCHECK(!pending_read_);
116 DCHECK(!fd_.is_valid()); 89 DCHECK(!pending_write_);
117 90
118 // No need to take the |write_lock_| here -- if there are still weak pointers 91 // No need to take the |write_lock()| here -- if there are still weak pointers
119 // outstanding, then we're hosed anyway (since we wouldn't be able to 92 // outstanding, then we're hosed anyway (since we wouldn't be able to
120 // invalidate them cleanly, since we might not be on the I/O thread). 93 // invalidate them cleanly, since we might not be on the I/O thread).
121 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 94 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
122 95
123 // These must have been shut down/destroyed on the I/O thread. 96 // These must have been shut down/destroyed on the I/O thread.
124 DCHECK(!read_watcher_.get()); 97 DCHECK(!read_watcher_.get());
125 DCHECK(!write_watcher_.get()); 98 DCHECK(!write_watcher_.get());
126 } 99 }
127 100
128 bool RawChannelPosix::Init() { 101 RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
102 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
103 DCHECK(!pending_read_);
104
105 char* buffer = NULL;
106 size_t bytes_to_read = 0;
107 read_buffer()->GetBuffer(&buffer, &bytes_to_read);
108
109 ssize_t read_result = HANDLE_EINTR(read(fd_.get().fd, buffer, bytes_to_read));
110
111 if (read_result >= 0) {
112 *bytes_read = static_cast<size_t>(read_result);
113 return IO_SUCCEEDED;
114 }
115
116 if (errno != EAGAIN && errno != EWOULDBLOCK) {
117 PLOG(ERROR) << "read";
118
119 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
120 read_watcher_.reset();
121
122 return IO_FAILED;
123 }
124
125 return ScheduleRead();
126 }
127
128 RawChannel::IOResult RawChannelPosix::ScheduleRead() {
129 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
130 DCHECK(!pending_read_);
131
132 pending_read_ = true;
133
134 return IO_PENDING;
135 }
136
137 RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) {
138 write_lock().AssertAcquired();
139
140 DCHECK(!pending_write_);
141
142 std::vector<WriteBuffer::Buffer> buffers;
143 write_buffer_no_lock()->GetBuffers(&buffers);
144 DCHECK(!buffers.empty());
145
146 ssize_t write_result = -1;
147 if (buffers.size() == 1) {
148 write_result = HANDLE_EINTR(
149 write(fd_.get().fd, buffers[0].addr, buffers[0].size));
150 } else {
151 // Note that using |writev()| is measurably slower than using |write()| --
152 // at least in a microbenchmark -- but much faster than using multiple
153 // |write()|s.
154 const size_t kMaxBufferCount = 10;
155 iovec iov[kMaxBufferCount];
156 size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
157
158 for (size_t i = 0; i < buffer_count; ++i) {
159 iov[i].iov_base = const_cast<char*>(buffers[i].addr);
160 iov[i].iov_len = buffers[i].size;
161 }
162
163 write_result = HANDLE_EINTR(writev(fd_.get().fd, iov, buffer_count));
164 }
165
166 if (write_result >= 0) {
167 *bytes_written = static_cast<size_t>(write_result);
168 return IO_SUCCEEDED;
169 }
170
171 if (errno != EAGAIN && errno != EWOULDBLOCK) {
172 PLOG(ERROR) << "write of size "
173 << write_buffer_no_lock()->GetTotalBytesToWrite();
174 return IO_FAILED;
175 }
176
177 return ScheduleWriteNoLock();
178 }
179
180 RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
181 write_lock().AssertAcquired();
182
183 DCHECK(!pending_write_);
184
185 // Set up to wait for the FD to become writable.
186 // If we're not on the I/O thread, we have to post a task to do this.
187 if (base::MessageLoop::current() != message_loop_for_io()) {
188 message_loop_for_io()->PostTask(
189 FROM_HERE,
190 base::Bind(&RawChannelPosix::WaitToWrite,
191 weak_ptr_factory_.GetWeakPtr()));
192 pending_write_ = true;
193 return IO_PENDING;
194 }
195
196 if (message_loop_for_io()->WatchFileDescriptor(
197 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
198 write_watcher_.get(), this)) {
199 pending_write_ = true;
200 return IO_PENDING;
201 }
202
203 return IO_FAILED;
204 }
205
206 bool RawChannelPosix::OnInit() {
129 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 207 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
130 208
131 DCHECK(!read_watcher_.get()); 209 DCHECK(!read_watcher_.get());
132 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 210 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
133 DCHECK(!write_watcher_.get()); 211 DCHECK(!write_watcher_.get());
134 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 212 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
135 213
136 // No need to take the lock. No one should be using us yet.
137 DCHECK(write_message_queue_.empty());
138
139 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, 214 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true,
140 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { 215 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
141 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly 216 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
142 // (in the sense of returning the message loop's state to what it was before 217 // (in the sense of returning the message loop's state to what it was before
143 // it was called). 218 // it was called).
144 read_watcher_.reset(); 219 read_watcher_.reset();
145 write_watcher_.reset(); 220 write_watcher_.reset();
146 return false; 221 return false;
147 } 222 }
148 223
149 return true; 224 return true;
150 } 225 }
151 226
152 void RawChannelPosix::Shutdown() { 227 void RawChannelPosix::OnShutdownNoLock(
153 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 228 scoped_ptr<ReadBuffer> /*read_buffer*/,
154 229 scoped_ptr<WriteBuffer> /*write_buffer*/) {
155 base::AutoLock locker(write_lock_); 230 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
156 if (!write_stopped_) 231 write_lock().AssertAcquired();
157 CancelPendingWritesNoLock();
158 232
159 read_watcher_.reset(); // This will stop watching (if necessary). 233 read_watcher_.reset(); // This will stop watching (if necessary).
160 write_watcher_.reset(); // This will stop watching (if necessary). 234 write_watcher_.reset(); // This will stop watching (if necessary).
161 235
236 pending_read_ = false;
237 pending_write_ = false;
238
162 DCHECK(fd_.is_valid()); 239 DCHECK(fd_.is_valid());
163 fd_.reset(); 240 fd_.reset();
164 241
165 weak_ptr_factory_.InvalidateWeakPtrs(); 242 weak_ptr_factory_.InvalidateWeakPtrs();
166 } 243 }
167 244
168 // Reminder: This must be thread-safe, and takes ownership of |message|.
169 bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) {
170 base::AutoLock locker(write_lock_);
171 if (write_stopped_)
172 return false;
173
174 if (!write_message_queue_.empty()) {
175 write_message_queue_.push_back(message.release());
176 return true;
177 }
178
179 write_message_queue_.push_front(message.release());
180 DCHECK_EQ(write_message_offset_, 0u);
181 bool result = WriteFrontMessageNoLock();
182 DCHECK(result || write_message_queue_.empty());
183
184 if (!result) {
185 // Even if we're on the I/O thread, don't call |OnFatalError()| in the
186 // nested context.
187 message_loop_for_io()->PostTask(
188 FROM_HERE,
189 base::Bind(&RawChannelPosix::CallOnFatalError,
190 weak_ptr_factory_.GetWeakPtr(),
191 Delegate::FATAL_ERROR_FAILED_WRITE));
192 } else if (!write_message_queue_.empty()) {
193 // Set up to wait for the FD to become writable. If we're not on the I/O
194 // thread, we have to post a task to do this.
195 if (base::MessageLoop::current() == message_loop_for_io()) {
196 WaitToWrite();
197 } else {
198 message_loop_for_io()->PostTask(
199 FROM_HERE,
200 base::Bind(&RawChannelPosix::WaitToWrite,
201 weak_ptr_factory_.GetWeakPtr()));
202 }
203 }
204
205 return result;
206 }
207
208 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { 245 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
209 DCHECK_EQ(fd, fd_.get().fd); 246 DCHECK_EQ(fd, fd_.get().fd);
210 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 247 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
211 248
212 bool did_dispatch_message = false; 249 if (!pending_read_) {
213 // Tracks the offset of the first undispatched message in |read_buffer_|. 250 NOTREACHED();
214 // Currently, we copy data to ensure that this is zero at the beginning. 251 return;
215 size_t read_buffer_start = 0; 252 }
216 for (;;) { 253
217 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) 254 pending_read_ = false;
218 < kReadSize) { 255 size_t bytes_read = 0;
219 // Use power-of-2 buffer sizes. 256 IOResult result = Read(&bytes_read);
220 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 257 if (result != IO_PENDING)
221 // maximum message size to whatever extent necessary). 258 OnReadCompleted(result == IO_SUCCEEDED, bytes_read);
222 // TODO(vtl): We may often be able to peek at the header and get the real 259
223 // required extra space (which may be much bigger than |kReadSize|). 260 // On failure, |read_watcher_| must have been reset; on success,
224 size_t new_size = std::max(read_buffer_.size(), kReadSize); 261 // we assume that |OnReadCompleted()| always schedules another read.
225 while (new_size < 262 // Otherwise, we could end up spinning -- getting
226 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) 263 // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual
227 new_size *= 2; 264 // read.
228 265 // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't
229 // TODO(vtl): It's suboptimal to zero out the fresh memory. 266 // schedule a new read. But that code won't be reached under the current
230 read_buffer_.resize(new_size, 0); 267 // RawChannel implementation.
231 } 268 DCHECK(!read_watcher_.get() || pending_read_);
232
233 ssize_t bytes_read = HANDLE_EINTR(
234 read(fd_.get().fd,
235 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
236 kReadSize));
237 if (bytes_read < 0) {
238 if (errno != EAGAIN && errno != EWOULDBLOCK) {
239 PLOG(ERROR) << "read";
240
241 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called
242 // again.
243 read_watcher_.reset();
244
245 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
246 return;
247 }
248
249 break;
250 }
251
252 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read);
253
254 // Dispatch all the messages that we can.
255 size_t message_size;
256 // Note that we rely on short-circuit evaluation here:
257 // - |read_buffer_start| may be an invalid index into |read_buffer_| if
258 // |read_buffer_num_valid_bytes_| is zero.
259 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
260 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
261 // next read).
262 while (read_buffer_num_valid_bytes_ > 0 &&
263 MessageInTransit::GetNextMessageSize(
264 &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_,
265 &message_size) &&
266 read_buffer_num_valid_bytes_ >= message_size) {
267 // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with
268 // some sort of "view" abstraction.
269 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
270 &read_buffer_[read_buffer_start]);
271 DCHECK_EQ(message.total_size(), message_size);
272
273 // Dispatch the message.
274 delegate()->OnReadMessage(message);
275 if (!read_watcher_.get()) {
276 // |Shutdown()| was called in |OnReadMessage()|.
277 // TODO(vtl): Add test for this case.
278 return;
279 }
280 did_dispatch_message = true;
281
282 // Update our state.
283 read_buffer_start += message_size;
284 read_buffer_num_valid_bytes_ -= message_size;
285 }
286
287 // If we dispatched any messages, stop reading for now (and let the message
288 // loop do its thing for another round).
289 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
290 // a single message. Risks: slower, more complex if we want to avoid lots of
291 // copying. ii. Keep reading until there's no more data and dispatch all the
292 // messages we can. Risks: starvation of other users of the message loop.)
293 if (did_dispatch_message)
294 break;
295
296 // If we didn't max out |kReadSize|, stop reading for now.
297 if (static_cast<size_t>(bytes_read) < kReadSize)
298 break;
299
300 // Else try to read some more....
301 }
302
303 // Move data back to start.
304 if (read_buffer_start > 0) {
305 if (read_buffer_num_valid_bytes_ > 0) {
306 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
307 read_buffer_num_valid_bytes_);
308 }
309 read_buffer_start = 0;
310 }
311 } 269 }
312 270
313 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { 271 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
314 DCHECK_EQ(fd, fd_.get().fd); 272 DCHECK_EQ(fd, fd_.get().fd);
315 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 273 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
316 274
317 bool did_fail = false; 275 IOResult result = IO_FAILED;
276 size_t bytes_written = 0;
318 { 277 {
319 base::AutoLock locker(write_lock_); 278 base::AutoLock locker(write_lock());
320 DCHECK_EQ(write_stopped_, write_message_queue_.empty()); 279
321 280 DCHECK(pending_write_);
322 if (write_stopped_) { 281
323 write_watcher_.reset(); 282 pending_write_ = false;
324 return; 283 result = WriteNoLock(&bytes_written);
284 }
285
286 if (result != IO_PENDING)
287 OnWriteCompleted(result == IO_SUCCEEDED, bytes_written);
288 }
289
290 void RawChannelPosix::WaitToWrite() {
291 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
292
293 DCHECK(write_watcher_.get());
294
295 if (!message_loop_for_io()->WatchFileDescriptor(
296 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
297 write_watcher_.get(), this)) {
298 {
299 base::AutoLock locker(write_lock());
300
301 DCHECK(pending_write_);
302 pending_write_ = false;
325 } 303 }
326 304 OnWriteCompleted(false, 0);
327 bool result = WriteFrontMessageNoLock(); 305 }
328 DCHECK(result || write_message_queue_.empty());
329
330 if (!result) {
331 did_fail = true;
332 write_watcher_.reset();
333 } else if (!write_message_queue_.empty()) {
334 WaitToWrite();
335 }
336 }
337 if (did_fail)
338 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
339 }
340
341 void RawChannelPosix::WaitToWrite() {
342 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
343
344 DCHECK(write_watcher_.get());
345 bool result = message_loop_for_io()->WatchFileDescriptor(
346 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
347 write_watcher_.get(), this);
348 DCHECK(result);
349 }
350
351 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
352 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
353 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
354 delegate()->OnFatalError(fatal_error);
355 }
356
357 // TODO(vtl): This will collide with yzshen's work. This is just a hacky,
358 // minimally-invasive function that does what I want (write/resume writing a
359 // |MessageInTransit| that may consist of more than one buffer).
360 ssize_t WriteMessageInTransit(int fd,
361 MessageInTransit* message,
362 size_t offset,
363 size_t* bytes_to_write) {
364 *bytes_to_write = message->total_size() - offset;
365 if (!message->secondary_buffer_size()) {
366 // Only write from the main buffer.
367 DCHECK_LT(offset, message->main_buffer_size());
368 DCHECK_LE(*bytes_to_write, message->main_buffer_size());
369 return HANDLE_EINTR(
370 write(fd,
371 static_cast<const char*>(message->main_buffer()) + offset,
372 *bytes_to_write));
373 }
374 if (offset >= message->main_buffer_size()) {
375 // Only write from the secondary buffer.
376 DCHECK_LT(offset - message->main_buffer_size(),
377 message->secondary_buffer_size());
378 DCHECK_LE(*bytes_to_write, message->secondary_buffer_size());
379 return HANDLE_EINTR(
380 write(fd,
381 static_cast<const char*>(message->secondary_buffer()) +
382 (offset - message->main_buffer_size()),
383 *bytes_to_write));
384 }
385 // Write from both buffers. (Note that using |writev()| is measurably slower
386 // than using |write()| -- at least in a microbenchmark -- but much faster
387 // than using two |write()|s.)
388 DCHECK_EQ(*bytes_to_write, message->main_buffer_size() - offset +
389 message->secondary_buffer_size());
390 struct iovec iov[2] = {
391 { const_cast<char*>(
392 static_cast<const char*>(message->main_buffer()) + offset),
393 message->main_buffer_size() - offset },
394 { const_cast<void*>(message->secondary_buffer()),
395 message->secondary_buffer_size() }
396 };
397 return HANDLE_EINTR(writev(fd, iov, 2));
398 }
399
400 bool RawChannelPosix::WriteFrontMessageNoLock() {
401 write_lock_.AssertAcquired();
402
403 DCHECK(!write_stopped_);
404 DCHECK(!write_message_queue_.empty());
405
406 MessageInTransit* message = write_message_queue_.front();
407 DCHECK_LT(write_message_offset_, message->total_size());
408 size_t bytes_to_write;
409 ssize_t bytes_written = WriteMessageInTransit(fd_.get().fd,
410 message,
411 write_message_offset_,
412 &bytes_to_write);
413 if (bytes_written < 0) {
414 if (errno != EAGAIN && errno != EWOULDBLOCK) {
415 PLOG(ERROR) << "write of size " << bytes_to_write;
416 CancelPendingWritesNoLock();
417 return false;
418 }
419
420 // We simply failed to write since we'd block. The logic is the same as if
421 // we got a partial write.
422 bytes_written = 0;
423 }
424
425 DCHECK_GE(bytes_written, 0);
426 if (static_cast<size_t>(bytes_written) < bytes_to_write) {
427 // Partial (or no) write.
428 write_message_offset_ += static_cast<size_t>(bytes_written);
429 } else {
430 // Complete write.
431 DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write);
432 write_message_queue_.pop_front();
433 delete message;
434 write_message_offset_ = 0;
435 }
436
437 return true;
438 }
439
440 void RawChannelPosix::CancelPendingWritesNoLock() {
441 write_lock_.AssertAcquired();
442 DCHECK(!write_stopped_);
443
444 write_stopped_ = true;
445 STLDeleteElements(&write_message_queue_);
446 } 306 }
447 307
448 } // namespace 308 } // namespace
449 309
450 // ----------------------------------------------------------------------------- 310 // -----------------------------------------------------------------------------
451 311
452 // Static factory method declared in raw_channel.h. 312 // Static factory method declared in raw_channel.h.
453 // static 313 // static
454 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, 314 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle,
455 Delegate* delegate, 315 Delegate* delegate,
456 base::MessageLoopForIO* message_loop_for_io) { 316 base::MessageLoopForIO* message_loop_for_io) {
457 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io); 317 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io);
458 } 318 }
459 319
460 } // namespace system 320 } // namespace system
461 } // namespace mojo 321 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/raw_channel.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698