Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(248)

Side by Side Diff: mojo/system/raw_channel_posix.cc

Issue 169723004: RawChannel refactoring (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« mojo/system/raw_channel.h ('K') | « mojo/system/raw_channel.cc ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/raw_channel.h" 5 #include "mojo/system/raw_channel.h"
6 6
7 #include <errno.h> 7 #include <errno.h>
8 #include <string.h>
9 #include <unistd.h> 8 #include <unistd.h>
10 9
11 #include <algorithm>
12 #include <deque>
13 #include <vector>
14
15 #include "base/basictypes.h" 10 #include "base/basictypes.h"
16 #include "base/bind.h" 11 #include "base/bind.h"
17 #include "base/compiler_specific.h" 12 #include "base/compiler_specific.h"
18 #include "base/location.h" 13 #include "base/location.h"
19 #include "base/logging.h" 14 #include "base/logging.h"
20 #include "base/memory/scoped_ptr.h" 15 #include "base/memory/scoped_ptr.h"
21 #include "base/memory/weak_ptr.h" 16 #include "base/memory/weak_ptr.h"
22 #include "base/message_loop/message_loop.h" 17 #include "base/message_loop/message_loop.h"
23 #include "base/posix/eintr_wrapper.h" 18 #include "base/posix/eintr_wrapper.h"
24 #include "base/stl_util.h"
25 #include "base/synchronization/lock.h" 19 #include "base/synchronization/lock.h"
26 #include "mojo/system/embedder/platform_handle.h" 20 #include "mojo/system/embedder/platform_handle.h"
27 #include "mojo/system/message_in_transit.h"
28 21
29 namespace mojo { 22 namespace mojo {
30 namespace system { 23 namespace system {
31 24
32 namespace { 25 namespace {
33 26
34 const size_t kReadSize = 4096;
35
36 class RawChannelPosix : public RawChannel, 27 class RawChannelPosix : public RawChannel,
37 public base::MessageLoopForIO::Watcher { 28 public base::MessageLoopForIO::Watcher {
38 public: 29 public:
39 RawChannelPosix(embedder::ScopedPlatformHandle handle, 30 RawChannelPosix(embedder::ScopedPlatformHandle handle,
40 Delegate* delegate, 31 Delegate* delegate,
41 base::MessageLoopForIO* message_loop_for_io); 32 base::MessageLoopForIO* message_loop_for_io);
42 virtual ~RawChannelPosix(); 33 virtual ~RawChannelPosix();
43 34
35 private:
44 // |RawChannel| implementation: 36 // |RawChannel| implementation:
45 virtual bool Init() OVERRIDE; 37 virtual IOResult Read(bool schedule_for_later,
46 virtual void Shutdown() OVERRIDE; 38 char* buffer,
47 virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; 39 size_t bytes_to_read,
40 size_t* bytes_read) OVERRIDE;
41 virtual IOResult WriteNoLock(bool schedule_for_later,
42 const char* buffer,
43 size_t bytes_to_write,
44 size_t* bytes_written) OVERRIDE;
45 virtual bool OnInit() OVERRIDE;
46 virtual void OnShutdownNoLock(
47 scoped_ptr<IOBufferPreserver> buffer_preserver) OVERRIDE;
48 48
49 private:
50 // |base::MessageLoopForIO::Watcher| implementation: 49 // |base::MessageLoopForIO::Watcher| implementation:
51 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; 50 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
52 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; 51 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
53 52
54 // Watches for |fd_| to become writable. Must be called on the I/O thread. 53 // Watches for |fd_| to become writable. Must be called on the I/O thread.
55 void WaitToWrite(); 54 void WaitToWrite();
56 55
57 // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
58 // thread WITHOUT |write_lock_| held.
59 void CallOnFatalError(Delegate::FatalError fatal_error);
60
61 // Writes the message at the front of |write_message_queue_|, starting at
62 // |write_message_offset_|. It removes and destroys if the write completes and
63 // otherwise updates |write_message_offset_|. Returns true on success. Must be
64 // called under |write_lock_|.
65 bool WriteFrontMessageNoLock();
66
67 // Cancels all pending writes and destroys the contents of
68 // |write_message_queue_|. Should only be called if |write_stopped_| is false;
69 // sets |write_stopped_| to true. Must be called under |write_lock_|.
70 void CancelPendingWritesNoLock();
71
72 embedder::ScopedPlatformHandle fd_; 56 embedder::ScopedPlatformHandle fd_;
73 57
74 // Only used on the I/O thread: 58 // The following members are only used on the I/O thread:
75 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; 59 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
76 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; 60 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
77 61
78 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| 62 char* pending_read_buffer_;
79 // is always aligned with a message boundary (we will copy memory to ensure 63 size_t pending_bytes_to_read_;
80 // this), but |read_buffer_| may be larger than the actual number of bytes we
81 // have.
82 std::vector<char> read_buffer_;
83 size_t read_buffer_num_valid_bytes_;
84 64
85 base::Lock write_lock_; // Protects the following members. 65 // The following members are used on multiple threads and protected by
86 bool write_stopped_; 66 // |write_lock()|:
87 // TODO(vtl): When C++11 is available, switch this to a deque of 67 const char* pending_write_buffer_;
88 // |scoped_ptr|/|unique_ptr|s. 68 size_t pending_bytes_to_write_;
89 std::deque<MessageInTransit*> write_message_queue_; 69
90 size_t write_message_offset_;
91 // This is used for posting tasks from write threads to the I/O thread. It 70 // This is used for posting tasks from write threads to the I/O thread. It
92 // must only be accessed under |write_lock_|. The weak pointers it produces 71 // must only be accessed under |write_lock_|. The weak pointers it produces
93 // are only used/invalidated on the I/O thread. 72 // are only used/invalidated on the I/O thread.
94 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; 73 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
95 74
96 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); 75 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
97 }; 76 };
98 77
99 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, 78 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle,
100 Delegate* delegate, 79 Delegate* delegate,
101 base::MessageLoopForIO* message_loop_for_io) 80 base::MessageLoopForIO* message_loop_for_io)
102 : RawChannel(delegate, message_loop_for_io), 81 : RawChannel(delegate, message_loop_for_io),
103 fd_(handle.Pass()), 82 fd_(handle.Pass()),
104 read_buffer_num_valid_bytes_(0), 83 pending_read_buffer_(NULL),
105 write_stopped_(false), 84 pending_bytes_to_read_(0),
106 write_message_offset_(0), 85 pending_write_buffer_(NULL),
86 pending_bytes_to_write_(0),
107 weak_ptr_factory_(this) { 87 weak_ptr_factory_(this) {
108 CHECK_EQ(RawChannel::message_loop_for_io()->type(),
109 base::MessageLoop::TYPE_IO);
110 DCHECK(fd_.is_valid()); 88 DCHECK(fd_.is_valid());
111 } 89 }
112 90
113 RawChannelPosix::~RawChannelPosix() { 91 RawChannelPosix::~RawChannelPosix() {
114 DCHECK(write_stopped_); 92 DCHECK(!pending_read_buffer_);
115 DCHECK(!fd_.is_valid()); 93 DCHECK(!pending_write_buffer_);
116 94
117 // No need to take the |write_lock_| here -- if there are still weak pointers 95 // No need to take the |write_lock()| here -- if there are still weak pointers
118 // outstanding, then we're hosed anyway (since we wouldn't be able to 96 // outstanding, then we're hosed anyway (since we wouldn't be able to
119 // invalidate them cleanly, since we might not be on the I/O thread). 97 // invalidate them cleanly, since we might not be on the I/O thread).
120 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 98 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
121 99
122 // These must have been shut down/destroyed on the I/O thread. 100 // These must have been shut down/destroyed on the I/O thread.
123 DCHECK(!read_watcher_.get()); 101 DCHECK(!read_watcher_.get());
124 DCHECK(!write_watcher_.get()); 102 DCHECK(!write_watcher_.get());
125 } 103 }
126 104
127 bool RawChannelPosix::Init() { 105 RawChannel::IOResult RawChannelPosix::Read(bool schedule_for_later,
106 char* buffer,
107 size_t bytes_to_read,
108 size_t* bytes_read) {
109 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
110 DCHECK(!pending_read_buffer_);
111
112 if (!schedule_for_later) {
113 ssize_t read_result = HANDLE_EINTR(
114 read(fd_.get().fd, buffer, bytes_to_read));
115
116 if (read_result >= 0) {
117 *bytes_read = static_cast<size_t>(read_result);
118 return IO_SUCCEEDED;
119 } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
120 PLOG(ERROR) << "read";
121
122 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
123 read_watcher_.reset();
124
125 return IO_FAILED;
126 }
127 }
128
129 // Either |schedule_for_later| is true or |read()| above would block.
130 pending_read_buffer_ = buffer;
131 pending_bytes_to_read_ = bytes_to_read;
132
133 return IO_PENDING;
134 }
135
136 RawChannel::IOResult RawChannelPosix::WriteNoLock(bool schedule_for_later,
137 const char* buffer,
138 size_t bytes_to_write,
139 size_t* bytes_written) {
140 write_lock().AssertAcquired();
141
142 DCHECK(!pending_write_buffer_);
143
144 if (!schedule_for_later) {
145 ssize_t write_result = HANDLE_EINTR(
146 write(fd_.get().fd, buffer, bytes_to_write));
147
148 if (write_result >= 0) {
149 *bytes_written = static_cast<size_t>(write_result);
150 return IO_SUCCEEDED;
151 } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
152 PLOG(ERROR) << "write of size " << bytes_to_write;
153 return IO_FAILED;
154 }
155 }
156
157 pending_write_buffer_ = buffer;
158 pending_bytes_to_write_ = bytes_to_write;
159
160 // Either |schedule_for_later| is true or |write()| above would block, set up
161 // to wait for the FD to become writable.
162 // If we're not on the I/O thread, we have to post a task to do this.
163 if (base::MessageLoop::current() != message_loop_for_io()) {
164 message_loop_for_io()->PostTask(
165 FROM_HERE,
166 base::Bind(&RawChannelPosix::WaitToWrite,
167 weak_ptr_factory_.GetWeakPtr()));
168 return IO_PENDING;
169 }
170
171 bool result = message_loop_for_io()->WatchFileDescriptor(
172 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
173 write_watcher_.get(), this);
174 return result ? IO_PENDING : IO_FAILED;
175 }
176
177 bool RawChannelPosix::OnInit() {
128 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 178 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
129 179
130 DCHECK(!read_watcher_.get()); 180 DCHECK(!read_watcher_.get());
131 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 181 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
132 DCHECK(!write_watcher_.get()); 182 DCHECK(!write_watcher_.get());
133 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 183 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
134 184
135 // No need to take the lock. No one should be using us yet.
136 DCHECK(write_message_queue_.empty());
137
138 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, 185 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true,
139 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { 186 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
140 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly 187 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
141 // (in the sense of returning the message loop's state to what it was before 188 // (in the sense of returning the message loop's state to what it was before
142 // it was called). 189 // it was called).
143 read_watcher_.reset(); 190 read_watcher_.reset();
144 write_watcher_.reset(); 191 write_watcher_.reset();
145 return false; 192 return false;
146 } 193 }
147 194
148 return true; 195 return true;
149 } 196 }
150 197
151 void RawChannelPosix::Shutdown() { 198 void RawChannelPosix::OnShutdownNoLock(
199 scoped_ptr<IOBufferPreserver> /* buffer_preserver */) {
152 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 200 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
153 201 write_lock().AssertAcquired();
154 base::AutoLock locker(write_lock_);
155 if (!write_stopped_)
156 CancelPendingWritesNoLock();
157 202
158 read_watcher_.reset(); // This will stop watching (if necessary). 203 read_watcher_.reset(); // This will stop watching (if necessary).
159 write_watcher_.reset(); // This will stop watching (if necessary). 204 write_watcher_.reset(); // This will stop watching (if necessary).
160 205
206 pending_read_buffer_ = NULL;
207 pending_bytes_to_read_ = 0;
208
209 pending_write_buffer_ = NULL;
210 pending_bytes_to_write_ = 0;
211
161 DCHECK(fd_.is_valid()); 212 DCHECK(fd_.is_valid());
162 fd_.reset(); 213 fd_.reset();
163 214
164 weak_ptr_factory_.InvalidateWeakPtrs(); 215 weak_ptr_factory_.InvalidateWeakPtrs();
165 } 216 }
166 217
167 // Reminder: This must be thread-safe, and takes ownership of |message|.
168 bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) {
169 base::AutoLock locker(write_lock_);
170 if (write_stopped_)
171 return false;
172
173 if (!write_message_queue_.empty()) {
174 write_message_queue_.push_back(message.release());
175 return true;
176 }
177
178 write_message_queue_.push_front(message.release());
179 DCHECK_EQ(write_message_offset_, 0u);
180 bool result = WriteFrontMessageNoLock();
181 DCHECK(result || write_message_queue_.empty());
182
183 if (!result) {
184 // Even if we're on the I/O thread, don't call |OnFatalError()| in the
185 // nested context.
186 message_loop_for_io()->PostTask(
187 FROM_HERE,
188 base::Bind(&RawChannelPosix::CallOnFatalError,
189 weak_ptr_factory_.GetWeakPtr(),
190 Delegate::FATAL_ERROR_FAILED_WRITE));
191 } else if (!write_message_queue_.empty()) {
192 // Set up to wait for the FD to become writable. If we're not on the I/O
193 // thread, we have to post a task to do this.
194 if (base::MessageLoop::current() == message_loop_for_io()) {
195 WaitToWrite();
196 } else {
197 message_loop_for_io()->PostTask(
198 FROM_HERE,
199 base::Bind(&RawChannelPosix::WaitToWrite,
200 weak_ptr_factory_.GetWeakPtr()));
201 }
202 }
203
204 return result;
205 }
206
207 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { 218 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
208 DCHECK_EQ(fd, fd_.get().fd); 219 DCHECK_EQ(fd, fd_.get().fd);
209 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 220 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
210 221
211 bool did_dispatch_message = false; 222 if (!pending_read_buffer_)
212 // Tracks the offset of the first undispatched message in |read_buffer_|. 223 return;
213 // Currently, we copy data to ensure that this is zero at the beginning.
214 size_t read_buffer_start = 0;
215 for (;;) {
216 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_)
217 < kReadSize) {
218 // Use power-of-2 buffer sizes.
219 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
220 // maximum message size to whatever extent necessary).
221 // TODO(vtl): We may often be able to peek at the header and get the real
222 // required extra space (which may be much bigger than |kReadSize|).
223 size_t new_size = std::max(read_buffer_.size(), kReadSize);
224 while (new_size <
225 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize)
226 new_size *= 2;
227 224
228 // TODO(vtl): It's suboptimal to zero out the fresh memory. 225 char* temp_buffer = pending_read_buffer_;
229 read_buffer_.resize(new_size, 0); 226 size_t temp_bytes_to_read = pending_bytes_to_read_;
230 } 227 size_t bytes_read = 0;
231 228
232 ssize_t bytes_read = HANDLE_EINTR( 229 pending_read_buffer_ = NULL;
233 read(fd_.get().fd, 230 pending_bytes_to_read_ = 0;
234 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
235 kReadSize));
236 if (bytes_read < 0) {
237 if (errno != EAGAIN && errno != EWOULDBLOCK) {
238 PLOG(ERROR) << "read";
239 231
240 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called 232 IOResult result = Read(false, temp_buffer, temp_bytes_to_read,
241 // again. 233 &bytes_read);
242 read_watcher_.reset(); 234 if (result != IO_PENDING)
243 235 OnReadCompleted(result == IO_SUCCEEDED, bytes_read);
244 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
245 return;
246 }
247
248 break;
249 }
250
251 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read);
252
253 // Dispatch all the messages that we can.
254 size_t message_size;
255 // Note that we rely on short-circuit evaluation here:
256 // - |read_buffer_start| may be an invalid index into |read_buffer_| if
257 // |read_buffer_num_valid_bytes_| is zero.
258 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
259 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
260 // next read).
261 while (read_buffer_num_valid_bytes_ > 0 &&
262 MessageInTransit::GetNextMessageSize(
263 &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_,
264 &message_size) &&
265 read_buffer_num_valid_bytes_ >= message_size) {
266 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
267 &read_buffer_[read_buffer_start]);
268 DCHECK_EQ(message.main_buffer_size(), message_size);
269
270 // Dispatch the message.
271 delegate()->OnReadMessage(message);
272 if (!read_watcher_.get()) {
273 // |Shutdown()| was called in |OnReadMessage()|.
274 // TODO(vtl): Add test for this case.
275 return;
276 }
277 did_dispatch_message = true;
278
279 // Update our state.
280 read_buffer_start += message_size;
281 read_buffer_num_valid_bytes_ -= message_size;
282 }
283
284 // If we dispatched any messages, stop reading for now (and let the message
285 // loop do its thing for another round).
286 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
287 // a single message. Risks: slower, more complex if we want to avoid lots of
288 // copying. ii. Keep reading until there's no more data and dispatch all the
289 // messages we can. Risks: starvation of other users of the message loop.)
290 if (did_dispatch_message)
291 break;
292
293 // If we didn't max out |kReadSize|, stop reading for now.
294 if (static_cast<size_t>(bytes_read) < kReadSize)
295 break;
296
297 // Else try to read some more....
298 }
299
300 // Move data back to start.
301 if (read_buffer_start > 0) {
302 if (read_buffer_num_valid_bytes_ > 0) {
303 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
304 read_buffer_num_valid_bytes_);
305 }
306 read_buffer_start = 0;
307 }
308 } 236 }
309 237
310 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { 238 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
311 DCHECK_EQ(fd, fd_.get().fd); 239 DCHECK_EQ(fd, fd_.get().fd);
312 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 240 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
313 241
314 bool did_fail = false; 242 IOResult result = IO_FAILED;
243 size_t bytes_written = 0;
315 { 244 {
316 base::AutoLock locker(write_lock_); 245 base::AutoLock locker(write_lock());
317 DCHECK_EQ(write_stopped_, write_message_queue_.empty());
318 246
319 if (write_stopped_) { 247 if (!pending_write_buffer_)
320 write_watcher_.reset();
321 return; 248 return;
322 }
323 249
324 bool result = WriteFrontMessageNoLock(); 250 const char* temp_write_buffer = pending_write_buffer_;
325 DCHECK(result || write_message_queue_.empty()); 251 size_t temp_bytes_to_write = pending_bytes_to_write_;
252 pending_write_buffer_ = NULL;
253 pending_bytes_to_write_ = 0;
326 254
327 if (!result) { 255 result = WriteNoLock(false, temp_write_buffer, temp_bytes_to_write,
328 did_fail = true; 256 &bytes_written);
329 write_watcher_.reset();
330 } else if (!write_message_queue_.empty()) {
331 WaitToWrite();
332 }
333 } 257 }
334 if (did_fail) 258
335 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); 259 if (result != IO_PENDING)
260 OnWriteCompleted(result == IO_SUCCEEDED, bytes_written);
336 } 261 }
337 262
338 void RawChannelPosix::WaitToWrite() { 263 void RawChannelPosix::WaitToWrite() {
339 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 264 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
340 265
341 DCHECK(write_watcher_.get()); 266 DCHECK(write_watcher_.get());
342 bool result = message_loop_for_io()->WatchFileDescriptor(
343 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
344 write_watcher_.get(), this);
345 DCHECK(result);
346 }
347 267
348 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { 268 if (!message_loop_for_io()->WatchFileDescriptor(
349 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 269 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
350 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 270 write_watcher_.get(), this)) {
351 delegate()->OnFatalError(fatal_error); 271 {
352 } 272 base::AutoLock locker(write_lock());
353 273
354 bool RawChannelPosix::WriteFrontMessageNoLock() { 274 DCHECK(pending_write_buffer_);
355 write_lock_.AssertAcquired(); 275 pending_write_buffer_ = NULL;
356 276 pending_bytes_to_write_ = 0;
357 DCHECK(!write_stopped_);
358 DCHECK(!write_message_queue_.empty());
359
360 MessageInTransit* message = write_message_queue_.front();
361 DCHECK_LT(write_message_offset_, message->main_buffer_size());
362 size_t bytes_to_write = message->main_buffer_size() - write_message_offset_;
363 ssize_t bytes_written = HANDLE_EINTR(
364 write(fd_.get().fd,
365 static_cast<const char*>(message->main_buffer()) +
366 write_message_offset_,
367 bytes_to_write));
368 if (bytes_written < 0) {
369 if (errno != EAGAIN && errno != EWOULDBLOCK) {
370 PLOG(ERROR) << "write of size " << bytes_to_write;
371 CancelPendingWritesNoLock();
372 return false;
373 } 277 }
374 278 OnWriteCompleted(false, 0);
375 // We simply failed to write since we'd block. The logic is the same as if
376 // we got a partial write.
377 bytes_written = 0;
378 } 279 }
379
380 DCHECK_GE(bytes_written, 0);
381 if (static_cast<size_t>(bytes_written) < bytes_to_write) {
382 // Partial (or no) write.
383 write_message_offset_ += static_cast<size_t>(bytes_written);
384 } else {
385 // Complete write.
386 DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write);
387 write_message_queue_.pop_front();
388 delete message;
389 write_message_offset_ = 0;
390 }
391
392 return true;
393 }
394
395 void RawChannelPosix::CancelPendingWritesNoLock() {
396 write_lock_.AssertAcquired();
397 DCHECK(!write_stopped_);
398
399 write_stopped_ = true;
400 STLDeleteElements(&write_message_queue_);
401 } 280 }
402 281
403 } // namespace 282 } // namespace
404 283
405 // ----------------------------------------------------------------------------- 284 // -----------------------------------------------------------------------------
406 285
407 // Static factory method declared in raw_channel.h. 286 // Static factory method declared in raw_channel.h.
408 // static 287 // static
409 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, 288 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle,
410 Delegate* delegate, 289 Delegate* delegate,
411 base::MessageLoopForIO* message_loop_for_io) { 290 base::MessageLoopForIO* message_loop_for_io) {
412 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io); 291 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io);
413 } 292 }
414 293
415 } // namespace system 294 } // namespace system
416 } // namespace mojo 295 } // namespace mojo
OLDNEW
« mojo/system/raw_channel.h ('K') | « mojo/system/raw_channel.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698