OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/system/raw_channel.h" | 5 #include "mojo/system/raw_channel.h" |
6 | 6 |
7 #include <errno.h> | 7 #include <errno.h> |
8 #include <string.h> | |
9 #include <unistd.h> | 8 #include <unistd.h> |
10 | 9 |
11 #include <algorithm> | |
12 #include <deque> | |
13 #include <vector> | |
14 | |
15 #include "base/basictypes.h" | 10 #include "base/basictypes.h" |
16 #include "base/bind.h" | 11 #include "base/bind.h" |
17 #include "base/compiler_specific.h" | 12 #include "base/compiler_specific.h" |
18 #include "base/location.h" | 13 #include "base/location.h" |
19 #include "base/logging.h" | 14 #include "base/logging.h" |
20 #include "base/memory/scoped_ptr.h" | 15 #include "base/memory/scoped_ptr.h" |
21 #include "base/memory/weak_ptr.h" | 16 #include "base/memory/weak_ptr.h" |
22 #include "base/message_loop/message_loop.h" | 17 #include "base/message_loop/message_loop.h" |
23 #include "base/posix/eintr_wrapper.h" | 18 #include "base/posix/eintr_wrapper.h" |
24 #include "base/stl_util.h" | |
25 #include "base/synchronization/lock.h" | 19 #include "base/synchronization/lock.h" |
26 #include "mojo/system/embedder/platform_handle.h" | 20 #include "mojo/system/embedder/platform_handle.h" |
27 #include "mojo/system/message_in_transit.h" | |
28 | 21 |
29 namespace mojo { | 22 namespace mojo { |
30 namespace system { | 23 namespace system { |
31 | 24 |
32 namespace { | 25 namespace { |
33 | 26 |
34 const size_t kReadSize = 4096; | |
35 | |
36 class RawChannelPosix : public RawChannel, | 27 class RawChannelPosix : public RawChannel, |
37 public base::MessageLoopForIO::Watcher { | 28 public base::MessageLoopForIO::Watcher { |
38 public: | 29 public: |
39 RawChannelPosix(embedder::ScopedPlatformHandle handle, | 30 RawChannelPosix(embedder::ScopedPlatformHandle handle, |
40 Delegate* delegate, | 31 Delegate* delegate, |
41 base::MessageLoopForIO* message_loop_for_io); | 32 base::MessageLoopForIO* message_loop_for_io); |
42 virtual ~RawChannelPosix(); | 33 virtual ~RawChannelPosix(); |
43 | 34 |
35 private: | |
44 // |RawChannel| implementation: | 36 // |RawChannel| implementation: |
45 virtual bool Init() OVERRIDE; | 37 virtual IOResult Read(size_t* bytes_read) OVERRIDE; |
46 virtual void Shutdown() OVERRIDE; | 38 virtual IOResult ScheduleRead() OVERRIDE; |
47 virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; | 39 virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE; |
40 virtual IOResult ScheduleWriteNoLock() OVERRIDE; | |
41 virtual bool OnInit() OVERRIDE; | |
42 virtual void OnShutdownNoLock( | |
43 scoped_ptr<ReadBuffer> read_buffer, | |
44 scoped_ptr<WriteBuffer> write_buffer) OVERRIDE; | |
48 | 45 |
49 private: | |
50 // |base::MessageLoopForIO::Watcher| implementation: | 46 // |base::MessageLoopForIO::Watcher| implementation: |
51 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; | 47 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; |
52 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; | 48 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; |
53 | 49 |
54 // Watches for |fd_| to become writable. Must be called on the I/O thread. | 50 // Watches for |fd_| to become writable. Must be called on the I/O thread. |
55 void WaitToWrite(); | 51 void WaitToWrite(); |
56 | 52 |
57 // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O | |
58 // thread WITHOUT |write_lock_| held. | |
59 void CallOnFatalError(Delegate::FatalError fatal_error); | |
60 | |
61 // Writes the message at the front of |write_message_queue_|, starting at | |
62 // |write_message_offset_|. It removes and destroys if the write completes and | |
63 // otherwise updates |write_message_offset_|. Returns true on success. Must be | |
64 // called under |write_lock_|. | |
65 bool WriteFrontMessageNoLock(); | |
66 | |
67 // Cancels all pending writes and destroys the contents of | |
68 // |write_message_queue_|. Should only be called if |write_stopped_| is false; | |
69 // sets |write_stopped_| to true. Must be called under |write_lock_|. | |
70 void CancelPendingWritesNoLock(); | |
71 | |
72 embedder::ScopedPlatformHandle fd_; | 53 embedder::ScopedPlatformHandle fd_; |
73 | 54 |
74 // Only used on the I/O thread: | 55 // The following members are only used on the I/O thread: |
75 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; | 56 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; |
76 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; | 57 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; |
77 | 58 |
78 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| | 59 bool pending_read_; |
79 // is always aligned with a message boundary (we will copy memory to ensure | |
80 // this), but |read_buffer_| may be larger than the actual number of bytes we | |
81 // have. | |
82 std::vector<char> read_buffer_; | |
83 size_t read_buffer_num_valid_bytes_; | |
84 | 60 |
85 base::Lock write_lock_; // Protects the following members. | 61 // The following members are used on multiple threads and protected by |
86 bool write_stopped_; | 62 // |write_lock()|: |
87 // TODO(vtl): When C++11 is available, switch this to a deque of | 63 bool pending_write_; |
88 // |scoped_ptr|/|unique_ptr|s. | 64 |
89 std::deque<MessageInTransit*> write_message_queue_; | |
90 size_t write_message_offset_; | |
91 // This is used for posting tasks from write threads to the I/O thread. It | 65 // This is used for posting tasks from write threads to the I/O thread. It |
92 // must only be accessed under |write_lock_|. The weak pointers it produces | 66 // must only be accessed under |write_lock_|. The weak pointers it produces |
93 // are only used/invalidated on the I/O thread. | 67 // are only used/invalidated on the I/O thread. |
94 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; | 68 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; |
95 | 69 |
96 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); | 70 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); |
97 }; | 71 }; |
98 | 72 |
99 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, | 73 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, |
100 Delegate* delegate, | 74 Delegate* delegate, |
101 base::MessageLoopForIO* message_loop_for_io) | 75 base::MessageLoopForIO* message_loop_for_io) |
102 : RawChannel(delegate, message_loop_for_io), | 76 : RawChannel(delegate, message_loop_for_io), |
103 fd_(handle.Pass()), | 77 fd_(handle.Pass()), |
104 read_buffer_num_valid_bytes_(0), | 78 pending_read_(false), |
105 write_stopped_(false), | 79 pending_write_(false), |
106 write_message_offset_(0), | |
107 weak_ptr_factory_(this) { | 80 weak_ptr_factory_(this) { |
108 CHECK_EQ(RawChannel::message_loop_for_io()->type(), | |
109 base::MessageLoop::TYPE_IO); | |
110 DCHECK(fd_.is_valid()); | 81 DCHECK(fd_.is_valid()); |
111 } | 82 } |
112 | 83 |
113 RawChannelPosix::~RawChannelPosix() { | 84 RawChannelPosix::~RawChannelPosix() { |
114 DCHECK(write_stopped_); | 85 DCHECK(!pending_read_); |
115 DCHECK(!fd_.is_valid()); | 86 DCHECK(!pending_write_); |
116 | 87 |
117 // No need to take the |write_lock_| here -- if there are still weak pointers | 88 // No need to take the |write_lock()| here -- if there are still weak pointers |
118 // outstanding, then we're hosed anyway (since we wouldn't be able to | 89 // outstanding, then we're hosed anyway (since we wouldn't be able to |
119 // invalidate them cleanly, since we might not be on the I/O thread). | 90 // invalidate them cleanly, since we might not be on the I/O thread). |
120 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); | 91 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
121 | 92 |
122 // These must have been shut down/destroyed on the I/O thread. | 93 // These must have been shut down/destroyed on the I/O thread. |
123 DCHECK(!read_watcher_.get()); | 94 DCHECK(!read_watcher_.get()); |
124 DCHECK(!write_watcher_.get()); | 95 DCHECK(!write_watcher_.get()); |
125 } | 96 } |
126 | 97 |
127 bool RawChannelPosix::Init() { | 98 RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { |
99 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | |
100 DCHECK(!pending_read_); | |
101 | |
102 ssize_t read_result = HANDLE_EINTR( | |
103 read(fd_.get().fd, read_buffer()->GetPosition(), | |
104 read_buffer()->GetBytesToRead())); | |
105 | |
106 if (read_result >= 0) { | |
107 *bytes_read = static_cast<size_t>(read_result); | |
108 return IO_SUCCEEDED; | |
109 } else if (errno != EAGAIN && errno != EWOULDBLOCK) { | |
110 PLOG(ERROR) << "read"; | |
111 | |
112 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again. | |
113 read_watcher_.reset(); | |
114 | |
115 return IO_FAILED; | |
116 } | |
117 | |
118 return ScheduleRead(); | |
119 } | |
120 | |
121 RawChannel::IOResult RawChannelPosix::ScheduleRead() { | |
122 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | |
123 DCHECK(!pending_read_); | |
124 | |
125 pending_read_ = true; | |
126 | |
127 return IO_PENDING; | |
128 } | |
129 | |
130 RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) { | |
131 write_lock().AssertAcquired(); | |
132 | |
133 DCHECK(!pending_write_); | |
134 | |
135 ssize_t write_result = HANDLE_EINTR( | |
136 write(fd_.get().fd, write_buffer()->GetPosition(), | |
137 write_buffer()->GetBytesToWrite())); | |
138 | |
139 if (write_result >= 0) { | |
140 *bytes_written = static_cast<size_t>(write_result); | |
141 return IO_SUCCEEDED; | |
142 } else if (errno != EAGAIN && errno != EWOULDBLOCK) { | |
143 PLOG(ERROR) << "write of size " << write_buffer()->GetBytesToWrite(); | |
144 return IO_FAILED; | |
145 } | |
146 | |
147 return ScheduleWriteNoLock(); | |
148 } | |
149 | |
150 RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() { | |
151 write_lock().AssertAcquired(); | |
152 | |
153 DCHECK(!pending_write_); | |
154 | |
155 // Set up to wait for the FD to become writable. | |
156 // If we're not on the I/O thread, we have to post a task to do this. | |
157 if (base::MessageLoop::current() != message_loop_for_io()) { | |
158 message_loop_for_io()->PostTask( | |
159 FROM_HERE, | |
160 base::Bind(&RawChannelPosix::WaitToWrite, | |
161 weak_ptr_factory_.GetWeakPtr())); | |
162 pending_write_ = true; | |
163 return IO_PENDING; | |
164 } | |
165 | |
166 if (message_loop_for_io()->WatchFileDescriptor( | |
167 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, | |
168 write_watcher_.get(), this)) { | |
169 pending_write_ = true; | |
170 return IO_PENDING; | |
171 } | |
172 | |
173 return IO_FAILED; | |
174 } | |
175 | |
176 bool RawChannelPosix::OnInit() { | |
128 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 177 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
129 | 178 |
130 DCHECK(!read_watcher_.get()); | 179 DCHECK(!read_watcher_.get()); |
131 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); | 180 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
132 DCHECK(!write_watcher_.get()); | 181 DCHECK(!write_watcher_.get()); |
133 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); | 182 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
134 | 183 |
135 // No need to take the lock. No one should be using us yet. | |
136 DCHECK(write_message_queue_.empty()); | |
137 | |
138 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, | 184 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, |
139 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { | 185 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { |
140 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly | 186 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly |
141 // (in the sense of returning the message loop's state to what it was before | 187 // (in the sense of returning the message loop's state to what it was before |
142 // it was called). | 188 // it was called). |
143 read_watcher_.reset(); | 189 read_watcher_.reset(); |
144 write_watcher_.reset(); | 190 write_watcher_.reset(); |
145 return false; | 191 return false; |
146 } | 192 } |
147 | 193 |
148 return true; | 194 return true; |
149 } | 195 } |
150 | 196 |
151 void RawChannelPosix::Shutdown() { | 197 void RawChannelPosix::OnShutdownNoLock( |
198 scoped_ptr<ReadBuffer> /* read_buffer */, | |
viettrungluu
2014/02/26 23:03:39
nit: In this directory, I've mostly left out space
yzshen1
2014/02/27 02:00:30
Done.
| |
199 scoped_ptr<WriteBuffer> /* write_buffer */) { | |
152 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 200 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
153 | 201 write_lock().AssertAcquired(); |
154 base::AutoLock locker(write_lock_); | |
155 if (!write_stopped_) | |
156 CancelPendingWritesNoLock(); | |
157 | 202 |
158 read_watcher_.reset(); // This will stop watching (if necessary). | 203 read_watcher_.reset(); // This will stop watching (if necessary). |
159 write_watcher_.reset(); // This will stop watching (if necessary). | 204 write_watcher_.reset(); // This will stop watching (if necessary). |
160 | 205 |
206 pending_read_ = false; | |
207 pending_write_ = false; | |
208 | |
161 DCHECK(fd_.is_valid()); | 209 DCHECK(fd_.is_valid()); |
162 fd_.reset(); | 210 fd_.reset(); |
163 | 211 |
164 weak_ptr_factory_.InvalidateWeakPtrs(); | 212 weak_ptr_factory_.InvalidateWeakPtrs(); |
165 } | 213 } |
166 | 214 |
167 // Reminder: This must be thread-safe, and takes ownership of |message|. | |
168 bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) { | |
169 base::AutoLock locker(write_lock_); | |
170 if (write_stopped_) | |
171 return false; | |
172 | |
173 if (!write_message_queue_.empty()) { | |
174 write_message_queue_.push_back(message.release()); | |
175 return true; | |
176 } | |
177 | |
178 write_message_queue_.push_front(message.release()); | |
179 DCHECK_EQ(write_message_offset_, 0u); | |
180 bool result = WriteFrontMessageNoLock(); | |
181 DCHECK(result || write_message_queue_.empty()); | |
182 | |
183 if (!result) { | |
184 // Even if we're on the I/O thread, don't call |OnFatalError()| in the | |
185 // nested context. | |
186 message_loop_for_io()->PostTask( | |
187 FROM_HERE, | |
188 base::Bind(&RawChannelPosix::CallOnFatalError, | |
189 weak_ptr_factory_.GetWeakPtr(), | |
190 Delegate::FATAL_ERROR_FAILED_WRITE)); | |
191 } else if (!write_message_queue_.empty()) { | |
192 // Set up to wait for the FD to become writable. If we're not on the I/O | |
193 // thread, we have to post a task to do this. | |
194 if (base::MessageLoop::current() == message_loop_for_io()) { | |
195 WaitToWrite(); | |
196 } else { | |
197 message_loop_for_io()->PostTask( | |
198 FROM_HERE, | |
199 base::Bind(&RawChannelPosix::WaitToWrite, | |
200 weak_ptr_factory_.GetWeakPtr())); | |
201 } | |
202 } | |
203 | |
204 return result; | |
205 } | |
206 | |
207 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { | 215 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { |
208 DCHECK_EQ(fd, fd_.get().fd); | 216 DCHECK_EQ(fd, fd_.get().fd); |
209 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 217 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
210 | 218 |
211 bool did_dispatch_message = false; | 219 if (!pending_read_) |
212 // Tracks the offset of the first undispatched message in |read_buffer_|. | 220 return; |
viettrungluu
2014/02/26 23:03:39
You should probably DCHECK that read_waiter_ is nu
yzshen1
2014/02/27 02:00:30
Thanks! :)
| |
213 // Currently, we copy data to ensure that this is zero at the beginning. | |
214 size_t read_buffer_start = 0; | |
215 for (;;) { | |
216 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) | |
217 < kReadSize) { | |
218 // Use power-of-2 buffer sizes. | |
219 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the | |
220 // maximum message size to whatever extent necessary). | |
221 // TODO(vtl): We may often be able to peek at the header and get the real | |
222 // required extra space (which may be much bigger than |kReadSize|). | |
223 size_t new_size = std::max(read_buffer_.size(), kReadSize); | |
224 while (new_size < | |
225 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) | |
226 new_size *= 2; | |
227 | 221 |
228 // TODO(vtl): It's suboptimal to zero out the fresh memory. | 222 pending_read_ = false; |
229 read_buffer_.resize(new_size, 0); | 223 size_t bytes_read = 0; |
230 } | 224 IOResult result = Read(&bytes_read); |
231 | 225 if (result != IO_PENDING) |
232 ssize_t bytes_read = HANDLE_EINTR( | 226 OnReadCompleted(result == IO_SUCCEEDED, bytes_read); |
233 read(fd_.get().fd, | |
234 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], | |
235 kReadSize)); | |
236 if (bytes_read < 0) { | |
237 if (errno != EAGAIN && errno != EWOULDBLOCK) { | |
238 PLOG(ERROR) << "read"; | |
239 | |
240 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called | |
241 // again. | |
242 read_watcher_.reset(); | |
243 | |
244 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); | |
245 return; | |
246 } | |
247 | |
248 break; | |
249 } | |
250 | |
251 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); | |
252 | |
253 // Dispatch all the messages that we can. | |
254 size_t message_size; | |
255 // Note that we rely on short-circuit evaluation here: | |
256 // - |read_buffer_start| may be an invalid index into |read_buffer_| if | |
257 // |read_buffer_num_valid_bytes_| is zero. | |
258 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | |
259 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | |
260 // next read). | |
261 while (read_buffer_num_valid_bytes_ > 0 && | |
262 MessageInTransit::GetNextMessageSize( | |
263 &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_, | |
264 &message_size) && | |
265 read_buffer_num_valid_bytes_ >= message_size) { | |
266 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, | |
267 &read_buffer_[read_buffer_start]); | |
268 DCHECK_EQ(message.main_buffer_size(), message_size); | |
269 | |
270 // Dispatch the message. | |
271 delegate()->OnReadMessage(message); | |
272 if (!read_watcher_.get()) { | |
273 // |Shutdown()| was called in |OnReadMessage()|. | |
274 // TODO(vtl): Add test for this case. | |
275 return; | |
276 } | |
277 did_dispatch_message = true; | |
278 | |
279 // Update our state. | |
280 read_buffer_start += message_size; | |
281 read_buffer_num_valid_bytes_ -= message_size; | |
282 } | |
283 | |
284 // If we dispatched any messages, stop reading for now (and let the message | |
285 // loop do its thing for another round). | |
286 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only | |
287 // a single message. Risks: slower, more complex if we want to avoid lots of | |
288 // copying. ii. Keep reading until there's no more data and dispatch all the | |
289 // messages we can. Risks: starvation of other users of the message loop.) | |
290 if (did_dispatch_message) | |
291 break; | |
292 | |
293 // If we didn't max out |kReadSize|, stop reading for now. | |
294 if (static_cast<size_t>(bytes_read) < kReadSize) | |
295 break; | |
296 | |
297 // Else try to read some more.... | |
298 } | |
299 | |
300 // Move data back to start. | |
301 if (read_buffer_start > 0) { | |
302 if (read_buffer_num_valid_bytes_ > 0) { | |
303 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], | |
304 read_buffer_num_valid_bytes_); | |
305 } | |
306 read_buffer_start = 0; | |
307 } | |
308 } | 227 } |
309 | 228 |
310 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { | 229 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { |
311 DCHECK_EQ(fd, fd_.get().fd); | 230 DCHECK_EQ(fd, fd_.get().fd); |
312 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 231 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
313 | 232 |
314 bool did_fail = false; | 233 IOResult result = IO_FAILED; |
234 size_t bytes_written = 0; | |
315 { | 235 { |
316 base::AutoLock locker(write_lock_); | 236 base::AutoLock locker(write_lock()); |
317 DCHECK_EQ(write_stopped_, write_message_queue_.empty()); | |
318 | 237 |
319 if (write_stopped_) { | 238 if (!pending_write_) |
320 write_watcher_.reset(); | |
321 return; | 239 return; |
viettrungluu
2014/02/26 23:03:39
"
(though the situation for pending_write_ isn't
yzshen1
2014/02/27 02:00:30
Right. I made it a DCHECK().
| |
322 } | |
323 | 240 |
324 bool result = WriteFrontMessageNoLock(); | 241 pending_write_ = false; |
325 DCHECK(result || write_message_queue_.empty()); | 242 result = WriteNoLock(&bytes_written); |
243 } | |
326 | 244 |
327 if (!result) { | 245 if (result != IO_PENDING) |
328 did_fail = true; | 246 OnWriteCompleted(result == IO_SUCCEEDED, bytes_written); |
329 write_watcher_.reset(); | |
330 } else if (!write_message_queue_.empty()) { | |
331 WaitToWrite(); | |
332 } | |
333 } | |
334 if (did_fail) | |
335 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); | |
336 } | 247 } |
337 | 248 |
338 void RawChannelPosix::WaitToWrite() { | 249 void RawChannelPosix::WaitToWrite() { |
339 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 250 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
340 | 251 |
341 DCHECK(write_watcher_.get()); | 252 DCHECK(write_watcher_.get()); |
342 bool result = message_loop_for_io()->WatchFileDescriptor( | |
343 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, | |
344 write_watcher_.get(), this); | |
345 DCHECK(result); | |
346 } | |
347 | 253 |
348 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { | 254 if (!message_loop_for_io()->WatchFileDescriptor( |
349 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 255 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
350 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | 256 write_watcher_.get(), this)) { |
351 delegate()->OnFatalError(fatal_error); | 257 { |
352 } | 258 base::AutoLock locker(write_lock()); |
353 | 259 |
354 bool RawChannelPosix::WriteFrontMessageNoLock() { | 260 DCHECK(pending_write_); |
355 write_lock_.AssertAcquired(); | 261 pending_write_ = false; |
356 | |
357 DCHECK(!write_stopped_); | |
358 DCHECK(!write_message_queue_.empty()); | |
359 | |
360 MessageInTransit* message = write_message_queue_.front(); | |
361 DCHECK_LT(write_message_offset_, message->main_buffer_size()); | |
362 size_t bytes_to_write = message->main_buffer_size() - write_message_offset_; | |
363 ssize_t bytes_written = HANDLE_EINTR( | |
364 write(fd_.get().fd, | |
365 static_cast<const char*>(message->main_buffer()) + | |
366 write_message_offset_, | |
367 bytes_to_write)); | |
368 if (bytes_written < 0) { | |
369 if (errno != EAGAIN && errno != EWOULDBLOCK) { | |
370 PLOG(ERROR) << "write of size " << bytes_to_write; | |
371 CancelPendingWritesNoLock(); | |
372 return false; | |
373 } | 262 } |
374 | 263 OnWriteCompleted(false, 0); |
375 // We simply failed to write since we'd block. The logic is the same as if | |
376 // we got a partial write. | |
377 bytes_written = 0; | |
378 } | 264 } |
379 | |
380 DCHECK_GE(bytes_written, 0); | |
381 if (static_cast<size_t>(bytes_written) < bytes_to_write) { | |
382 // Partial (or no) write. | |
383 write_message_offset_ += static_cast<size_t>(bytes_written); | |
384 } else { | |
385 // Complete write. | |
386 DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); | |
387 write_message_queue_.pop_front(); | |
388 delete message; | |
389 write_message_offset_ = 0; | |
390 } | |
391 | |
392 return true; | |
393 } | |
394 | |
395 void RawChannelPosix::CancelPendingWritesNoLock() { | |
396 write_lock_.AssertAcquired(); | |
397 DCHECK(!write_stopped_); | |
398 | |
399 write_stopped_ = true; | |
400 STLDeleteElements(&write_message_queue_); | |
401 } | 265 } |
402 | 266 |
403 } // namespace | 267 } // namespace |
404 | 268 |
405 // ----------------------------------------------------------------------------- | 269 // ----------------------------------------------------------------------------- |
406 | 270 |
407 // Static factory method declared in raw_channel.h. | 271 // Static factory method declared in raw_channel.h. |
408 // static | 272 // static |
409 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, | 273 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, |
410 Delegate* delegate, | 274 Delegate* delegate, |
411 base::MessageLoopForIO* message_loop_for_io) { | 275 base::MessageLoopForIO* message_loop_for_io) { |
412 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io); | 276 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io); |
413 } | 277 } |
414 | 278 |
415 } // namespace system | 279 } // namespace system |
416 } // namespace mojo | 280 } // namespace mojo |
OLD | NEW |