OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/system/raw_channel.h" | 5 #include "mojo/system/raw_channel.h" |
6 | 6 |
7 #include <errno.h> | 7 #include <errno.h> |
8 #include <string.h> | |
9 #include <unistd.h> | 8 #include <unistd.h> |
10 | 9 |
11 #include <algorithm> | |
12 #include <deque> | |
13 #include <vector> | |
14 | |
15 #include "base/basictypes.h" | 10 #include "base/basictypes.h" |
16 #include "base/bind.h" | 11 #include "base/bind.h" |
17 #include "base/compiler_specific.h" | 12 #include "base/compiler_specific.h" |
18 #include "base/location.h" | 13 #include "base/location.h" |
19 #include "base/logging.h" | 14 #include "base/logging.h" |
20 #include "base/memory/scoped_ptr.h" | 15 #include "base/memory/scoped_ptr.h" |
21 #include "base/memory/weak_ptr.h" | 16 #include "base/memory/weak_ptr.h" |
22 #include "base/message_loop/message_loop.h" | 17 #include "base/message_loop/message_loop.h" |
23 #include "base/posix/eintr_wrapper.h" | 18 #include "base/posix/eintr_wrapper.h" |
24 #include "base/stl_util.h" | |
25 #include "base/synchronization/lock.h" | 19 #include "base/synchronization/lock.h" |
26 #include "mojo/system/embedder/platform_handle.h" | 20 #include "mojo/system/embedder/platform_handle.h" |
27 #include "mojo/system/message_in_transit.h" | |
28 | 21 |
29 namespace mojo { | 22 namespace mojo { |
30 namespace system { | 23 namespace system { |
31 | 24 |
32 namespace { | 25 namespace { |
33 | 26 |
34 const size_t kReadSize = 4096; | |
35 | |
36 class RawChannelPosix : public RawChannel, | 27 class RawChannelPosix : public RawChannel, |
37 public base::MessageLoopForIO::Watcher { | 28 public base::MessageLoopForIO::Watcher { |
38 public: | 29 public: |
39 RawChannelPosix(embedder::ScopedPlatformHandle handle, | 30 RawChannelPosix(embedder::ScopedPlatformHandle handle, |
40 Delegate* delegate, | 31 Delegate* delegate, |
41 base::MessageLoopForIO* message_loop_for_io); | 32 base::MessageLoopForIO* message_loop_for_io); |
42 virtual ~RawChannelPosix(); | 33 virtual ~RawChannelPosix(); |
43 | 34 |
| 35 private: |
44 // |RawChannel| implementation: | 36 // |RawChannel| implementation: |
45 virtual bool Init() OVERRIDE; | 37 virtual IOResult Read(bool schedule_for_later, |
46 virtual void Shutdown() OVERRIDE; | 38 char* buffer, |
47 virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; | 39 size_t bytes_to_read, |
| 40 size_t* bytes_read) OVERRIDE; |
| 41 virtual IOResult WriteNoLock(bool schedule_for_later, |
| 42 const char* buffer, |
| 43 size_t bytes_to_write, |
| 44 size_t* bytes_written) OVERRIDE; |
| 45 virtual bool OnInit() OVERRIDE; |
| 46 virtual void OnShutdownNoLock( |
| 47 scoped_ptr<IOBufferPreserver> buffer_preserver) OVERRIDE; |
48 | 48 |
49 private: | |
50 // |base::MessageLoopForIO::Watcher| implementation: | 49 // |base::MessageLoopForIO::Watcher| implementation: |
51 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; | 50 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; |
52 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; | 51 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; |
53 | 52 |
54 // Watches for |fd_| to become writable. Must be called on the I/O thread. | 53 // Watches for |fd_| to become writable. Must be called on the I/O thread. |
55 void WaitToWrite(); | 54 void WaitToWrite(); |
56 | 55 |
57 // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O | |
58 // thread WITHOUT |write_lock_| held. | |
59 void CallOnFatalError(Delegate::FatalError fatal_error); | |
60 | |
61 // Writes the message at the front of |write_message_queue_|, starting at | |
62 // |write_message_offset_|. It removes and destroys if the write completes and | |
63 // otherwise updates |write_message_offset_|. Returns true on success. Must be | |
64 // called under |write_lock_|. | |
65 bool WriteFrontMessageNoLock(); | |
66 | |
67 // Cancels all pending writes and destroys the contents of | |
68 // |write_message_queue_|. Should only be called if |write_stopped_| is false; | |
69 // sets |write_stopped_| to true. Must be called under |write_lock_|. | |
70 void CancelPendingWritesNoLock(); | |
71 | |
72 embedder::ScopedPlatformHandle fd_; | 56 embedder::ScopedPlatformHandle fd_; |
73 | 57 |
74 // Only used on the I/O thread: | 58 // The following members are only used on the I/O thread: |
75 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; | 59 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; |
76 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; | 60 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; |
77 | 61 |
78 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| | 62 char* pending_read_buffer_; |
79 // is always aligned with a message boundary (we will copy memory to ensure | 63 size_t pending_bytes_to_read_; |
80 // this), but |read_buffer_| may be larger than the actual number of bytes we | |
81 // have. | |
82 std::vector<char> read_buffer_; | |
83 size_t read_buffer_num_valid_bytes_; | |
84 | 64 |
85 base::Lock write_lock_; // Protects the following members. | 65 // The following members are used on multiple threads and protected by |
86 bool write_stopped_; | 66 // |write_lock()|: |
87 // TODO(vtl): When C++11 is available, switch this to a deque of | 67 const char* pending_write_buffer_; |
88 // |scoped_ptr|/|unique_ptr|s. | 68 size_t pending_bytes_to_write_; |
89 std::deque<MessageInTransit*> write_message_queue_; | 69 |
90 size_t write_message_offset_; | |
91 // This is used for posting tasks from write threads to the I/O thread. It | 70 // This is used for posting tasks from write threads to the I/O thread. It |
92 // must only be accessed under |write_lock_|. The weak pointers it produces | 71 // must only be accessed under |write_lock_|. The weak pointers it produces |
93 // are only used/invalidated on the I/O thread. | 72 // are only used/invalidated on the I/O thread. |
94 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; | 73 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; |
95 | 74 |
96 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); | 75 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); |
97 }; | 76 }; |
98 | 77 |
99 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, | 78 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, |
100 Delegate* delegate, | 79 Delegate* delegate, |
101 base::MessageLoopForIO* message_loop_for_io) | 80 base::MessageLoopForIO* message_loop_for_io) |
102 : RawChannel(delegate, message_loop_for_io), | 81 : RawChannel(delegate, message_loop_for_io), |
103 fd_(handle.Pass()), | 82 fd_(handle.Pass()), |
104 read_buffer_num_valid_bytes_(0), | 83 pending_read_buffer_(NULL), |
105 write_stopped_(false), | 84 pending_bytes_to_read_(0), |
106 write_message_offset_(0), | 85 pending_write_buffer_(NULL), |
| 86 pending_bytes_to_write_(0), |
107 weak_ptr_factory_(this) { | 87 weak_ptr_factory_(this) { |
108 CHECK_EQ(RawChannel::message_loop_for_io()->type(), | |
109 base::MessageLoop::TYPE_IO); | |
110 DCHECK(fd_.is_valid()); | 88 DCHECK(fd_.is_valid()); |
111 } | 89 } |
112 | 90 |
113 RawChannelPosix::~RawChannelPosix() { | 91 RawChannelPosix::~RawChannelPosix() { |
114 DCHECK(write_stopped_); | 92 DCHECK(!pending_read_buffer_); |
115 DCHECK(!fd_.is_valid()); | 93 DCHECK(!pending_write_buffer_); |
116 | 94 |
117 // No need to take the |write_lock_| here -- if there are still weak pointers | 95 // No need to take the |write_lock()| here -- if there are still weak pointers |
118 // outstanding, then we're hosed anyway (since we wouldn't be able to | 96 // outstanding, then we're hosed anyway (since we wouldn't be able to |
119 // invalidate them cleanly, since we might not be on the I/O thread). | 97 // invalidate them cleanly, since we might not be on the I/O thread). |
120 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); | 98 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
121 | 99 |
122 // These must have been shut down/destroyed on the I/O thread. | 100 // These must have been shut down/destroyed on the I/O thread. |
123 DCHECK(!read_watcher_.get()); | 101 DCHECK(!read_watcher_.get()); |
124 DCHECK(!write_watcher_.get()); | 102 DCHECK(!write_watcher_.get()); |
125 } | 103 } |
126 | 104 |
127 bool RawChannelPosix::Init() { | 105 RawChannel::IOResult RawChannelPosix::Read(bool schedule_for_later, |
| 106 char* buffer, |
| 107 size_t bytes_to_read, |
| 108 size_t* bytes_read) { |
| 109 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| 110 DCHECK(!pending_read_buffer_); |
| 111 |
| 112 if (!schedule_for_later) { |
| 113 ssize_t read_result = HANDLE_EINTR( |
| 114 read(fd_.get().fd, buffer, bytes_to_read)); |
| 115 |
| 116 if (read_result >= 0) { |
| 117 *bytes_read = static_cast<size_t>(read_result); |
| 118 return IO_SUCCEEDED; |
| 119 } else if (errno != EAGAIN && errno != EWOULDBLOCK) { |
| 120 PLOG(ERROR) << "read"; |
| 121 |
| 122 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again. |
| 123 read_watcher_.reset(); |
| 124 |
| 125 return IO_FAILED; |
| 126 } |
| 127 } |
| 128 |
| 129 // Either |schedule_for_later| is true or |read()| above would block. |
| 130 pending_read_buffer_ = buffer; |
| 131 pending_bytes_to_read_ = bytes_to_read; |
| 132 |
| 133 return IO_PENDING; |
| 134 } |
| 135 |
| 136 RawChannel::IOResult RawChannelPosix::WriteNoLock(bool schedule_for_later, |
| 137 const char* buffer, |
| 138 size_t bytes_to_write, |
| 139 size_t* bytes_written) { |
| 140 write_lock().AssertAcquired(); |
| 141 |
| 142 DCHECK(!pending_write_buffer_); |
| 143 |
| 144 if (!schedule_for_later) { |
| 145 ssize_t write_result = HANDLE_EINTR( |
| 146 write(fd_.get().fd, buffer, bytes_to_write)); |
| 147 |
| 148 if (write_result >= 0) { |
| 149 *bytes_written = static_cast<size_t>(write_result); |
| 150 return IO_SUCCEEDED; |
| 151 } else if (errno != EAGAIN && errno != EWOULDBLOCK) { |
| 152 PLOG(ERROR) << "write of size " << bytes_to_write; |
| 153 return IO_FAILED; |
| 154 } |
| 155 } |
| 156 |
| 157 pending_write_buffer_ = buffer; |
| 158 pending_bytes_to_write_ = bytes_to_write; |
| 159 |
| 160 // Either |schedule_for_later| is true or |write()| above would block, set up |
| 161 // to wait for the FD to become writable. |
| 162 // If we're not on the I/O thread, we have to post a task to do this. |
| 163 if (base::MessageLoop::current() != message_loop_for_io()) { |
| 164 message_loop_for_io()->PostTask( |
| 165 FROM_HERE, |
| 166 base::Bind(&RawChannelPosix::WaitToWrite, |
| 167 weak_ptr_factory_.GetWeakPtr())); |
| 168 return IO_PENDING; |
| 169 } |
| 170 |
| 171 bool result = message_loop_for_io()->WatchFileDescriptor( |
| 172 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
| 173 write_watcher_.get(), this); |
| 174 return result ? IO_PENDING : IO_FAILED; |
| 175 } |
| 176 |
| 177 bool RawChannelPosix::OnInit() { |
128 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 178 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
129 | 179 |
130 DCHECK(!read_watcher_.get()); | 180 DCHECK(!read_watcher_.get()); |
131 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); | 181 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
132 DCHECK(!write_watcher_.get()); | 182 DCHECK(!write_watcher_.get()); |
133 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); | 183 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
134 | 184 |
135 // No need to take the lock. No one should be using us yet. | |
136 DCHECK(write_message_queue_.empty()); | |
137 | |
138 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, | 185 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, |
139 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { | 186 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { |
140 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly | 187 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly |
141 // (in the sense of returning the message loop's state to what it was before | 188 // (in the sense of returning the message loop's state to what it was before |
142 // it was called). | 189 // it was called). |
143 read_watcher_.reset(); | 190 read_watcher_.reset(); |
144 write_watcher_.reset(); | 191 write_watcher_.reset(); |
145 return false; | 192 return false; |
146 } | 193 } |
147 | 194 |
148 return true; | 195 return true; |
149 } | 196 } |
150 | 197 |
151 void RawChannelPosix::Shutdown() { | 198 void RawChannelPosix::OnShutdownNoLock( |
| 199 scoped_ptr<IOBufferPreserver> /* buffer_preserver */) { |
152 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 200 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
153 | 201 write_lock().AssertAcquired(); |
154 base::AutoLock locker(write_lock_); | |
155 if (!write_stopped_) | |
156 CancelPendingWritesNoLock(); | |
157 | 202 |
158 read_watcher_.reset(); // This will stop watching (if necessary). | 203 read_watcher_.reset(); // This will stop watching (if necessary). |
159 write_watcher_.reset(); // This will stop watching (if necessary). | 204 write_watcher_.reset(); // This will stop watching (if necessary). |
160 | 205 |
| 206 pending_read_buffer_ = NULL; |
| 207 pending_bytes_to_read_ = 0; |
| 208 |
| 209 pending_write_buffer_ = NULL; |
| 210 pending_bytes_to_write_ = 0; |
| 211 |
161 DCHECK(fd_.is_valid()); | 212 DCHECK(fd_.is_valid()); |
162 fd_.reset(); | 213 fd_.reset(); |
163 | 214 |
164 weak_ptr_factory_.InvalidateWeakPtrs(); | 215 weak_ptr_factory_.InvalidateWeakPtrs(); |
165 } | 216 } |
166 | 217 |
167 // Reminder: This must be thread-safe, and takes ownership of |message|. | |
168 bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) { | |
169 base::AutoLock locker(write_lock_); | |
170 if (write_stopped_) | |
171 return false; | |
172 | |
173 if (!write_message_queue_.empty()) { | |
174 write_message_queue_.push_back(message.release()); | |
175 return true; | |
176 } | |
177 | |
178 write_message_queue_.push_front(message.release()); | |
179 DCHECK_EQ(write_message_offset_, 0u); | |
180 bool result = WriteFrontMessageNoLock(); | |
181 DCHECK(result || write_message_queue_.empty()); | |
182 | |
183 if (!result) { | |
184 // Even if we're on the I/O thread, don't call |OnFatalError()| in the | |
185 // nested context. | |
186 message_loop_for_io()->PostTask( | |
187 FROM_HERE, | |
188 base::Bind(&RawChannelPosix::CallOnFatalError, | |
189 weak_ptr_factory_.GetWeakPtr(), | |
190 Delegate::FATAL_ERROR_FAILED_WRITE)); | |
191 } else if (!write_message_queue_.empty()) { | |
192 // Set up to wait for the FD to become writable. If we're not on the I/O | |
193 // thread, we have to post a task to do this. | |
194 if (base::MessageLoop::current() == message_loop_for_io()) { | |
195 WaitToWrite(); | |
196 } else { | |
197 message_loop_for_io()->PostTask( | |
198 FROM_HERE, | |
199 base::Bind(&RawChannelPosix::WaitToWrite, | |
200 weak_ptr_factory_.GetWeakPtr())); | |
201 } | |
202 } | |
203 | |
204 return result; | |
205 } | |
206 | |
207 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { | 218 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { |
208 DCHECK_EQ(fd, fd_.get().fd); | 219 DCHECK_EQ(fd, fd_.get().fd); |
209 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 220 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
210 | 221 |
211 bool did_dispatch_message = false; | 222 if (!pending_read_buffer_) |
212 // Tracks the offset of the first undispatched message in |read_buffer_|. | 223 return; |
213 // Currently, we copy data to ensure that this is zero at the beginning. | |
214 size_t read_buffer_start = 0; | |
215 for (;;) { | |
216 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) | |
217 < kReadSize) { | |
218 // Use power-of-2 buffer sizes. | |
219 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the | |
220 // maximum message size to whatever extent necessary). | |
221 // TODO(vtl): We may often be able to peek at the header and get the real | |
222 // required extra space (which may be much bigger than |kReadSize|). | |
223 size_t new_size = std::max(read_buffer_.size(), kReadSize); | |
224 while (new_size < | |
225 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) | |
226 new_size *= 2; | |
227 | 224 |
228 // TODO(vtl): It's suboptimal to zero out the fresh memory. | 225 char* temp_buffer = pending_read_buffer_; |
229 read_buffer_.resize(new_size, 0); | 226 size_t temp_bytes_to_read = pending_bytes_to_read_; |
230 } | 227 size_t bytes_read = 0; |
231 | 228 |
232 ssize_t bytes_read = HANDLE_EINTR( | 229 pending_read_buffer_ = NULL; |
233 read(fd_.get().fd, | 230 pending_bytes_to_read_ = 0; |
234 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], | |
235 kReadSize)); | |
236 if (bytes_read < 0) { | |
237 if (errno != EAGAIN && errno != EWOULDBLOCK) { | |
238 PLOG(ERROR) << "read"; | |
239 | 231 |
240 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called | 232 IOResult result = Read(false, temp_buffer, temp_bytes_to_read, |
241 // again. | 233 &bytes_read); |
242 read_watcher_.reset(); | 234 if (result != IO_PENDING) |
243 | 235 OnReadCompleted(result == IO_SUCCEEDED, bytes_read); |
244 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); | |
245 return; | |
246 } | |
247 | |
248 break; | |
249 } | |
250 | |
251 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); | |
252 | |
253 // Dispatch all the messages that we can. | |
254 size_t message_size; | |
255 // Note that we rely on short-circuit evaluation here: | |
256 // - |read_buffer_start| may be an invalid index into |read_buffer_| if | |
257 // |read_buffer_num_valid_bytes_| is zero. | |
258 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | |
259 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | |
260 // next read). | |
261 while (read_buffer_num_valid_bytes_ > 0 && | |
262 MessageInTransit::GetNextMessageSize( | |
263 &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_, | |
264 &message_size) && | |
265 read_buffer_num_valid_bytes_ >= message_size) { | |
266 MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, | |
267 &read_buffer_[read_buffer_start]); | |
268 DCHECK_EQ(message.main_buffer_size(), message_size); | |
269 | |
270 // Dispatch the message. | |
271 delegate()->OnReadMessage(message); | |
272 if (!read_watcher_.get()) { | |
273 // |Shutdown()| was called in |OnReadMessage()|. | |
274 // TODO(vtl): Add test for this case. | |
275 return; | |
276 } | |
277 did_dispatch_message = true; | |
278 | |
279 // Update our state. | |
280 read_buffer_start += message_size; | |
281 read_buffer_num_valid_bytes_ -= message_size; | |
282 } | |
283 | |
284 // If we dispatched any messages, stop reading for now (and let the message | |
285 // loop do its thing for another round). | |
286 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only | |
287 // a single message. Risks: slower, more complex if we want to avoid lots of | |
288 // copying. ii. Keep reading until there's no more data and dispatch all the | |
289 // messages we can. Risks: starvation of other users of the message loop.) | |
290 if (did_dispatch_message) | |
291 break; | |
292 | |
293 // If we didn't max out |kReadSize|, stop reading for now. | |
294 if (static_cast<size_t>(bytes_read) < kReadSize) | |
295 break; | |
296 | |
297 // Else try to read some more.... | |
298 } | |
299 | |
300 // Move data back to start. | |
301 if (read_buffer_start > 0) { | |
302 if (read_buffer_num_valid_bytes_ > 0) { | |
303 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], | |
304 read_buffer_num_valid_bytes_); | |
305 } | |
306 read_buffer_start = 0; | |
307 } | |
308 } | 236 } |
309 | 237 |
310 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { | 238 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { |
311 DCHECK_EQ(fd, fd_.get().fd); | 239 DCHECK_EQ(fd, fd_.get().fd); |
312 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 240 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
313 | 241 |
314 bool did_fail = false; | 242 IOResult result = IO_FAILED; |
| 243 size_t bytes_written = 0; |
315 { | 244 { |
316 base::AutoLock locker(write_lock_); | 245 base::AutoLock locker(write_lock()); |
317 DCHECK_EQ(write_stopped_, write_message_queue_.empty()); | |
318 | 246 |
319 if (write_stopped_) { | 247 if (!pending_write_buffer_) |
320 write_watcher_.reset(); | |
321 return; | 248 return; |
322 } | |
323 | 249 |
324 bool result = WriteFrontMessageNoLock(); | 250 const char* temp_write_buffer = pending_write_buffer_; |
325 DCHECK(result || write_message_queue_.empty()); | 251 size_t temp_bytes_to_write = pending_bytes_to_write_; |
| 252 pending_write_buffer_ = NULL; |
| 253 pending_bytes_to_write_ = 0; |
326 | 254 |
327 if (!result) { | 255 result = WriteNoLock(false, temp_write_buffer, temp_bytes_to_write, |
328 did_fail = true; | 256 &bytes_written); |
329 write_watcher_.reset(); | |
330 } else if (!write_message_queue_.empty()) { | |
331 WaitToWrite(); | |
332 } | |
333 } | 257 } |
334 if (did_fail) | 258 |
335 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); | 259 if (result != IO_PENDING) |
| 260 OnWriteCompleted(result == IO_SUCCEEDED, bytes_written); |
336 } | 261 } |
337 | 262 |
338 void RawChannelPosix::WaitToWrite() { | 263 void RawChannelPosix::WaitToWrite() { |
339 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 264 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
340 | 265 |
341 DCHECK(write_watcher_.get()); | 266 DCHECK(write_watcher_.get()); |
342 bool result = message_loop_for_io()->WatchFileDescriptor( | |
343 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, | |
344 write_watcher_.get(), this); | |
345 DCHECK(result); | |
346 } | |
347 | 267 |
348 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { | 268 if (!message_loop_for_io()->WatchFileDescriptor( |
349 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 269 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
350 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | 270 write_watcher_.get(), this)) { |
351 delegate()->OnFatalError(fatal_error); | 271 { |
352 } | 272 base::AutoLock locker(write_lock()); |
353 | 273 |
354 bool RawChannelPosix::WriteFrontMessageNoLock() { | 274 DCHECK(pending_write_buffer_); |
355 write_lock_.AssertAcquired(); | 275 pending_write_buffer_ = NULL; |
356 | 276 pending_bytes_to_write_ = 0; |
357 DCHECK(!write_stopped_); | |
358 DCHECK(!write_message_queue_.empty()); | |
359 | |
360 MessageInTransit* message = write_message_queue_.front(); | |
361 DCHECK_LT(write_message_offset_, message->main_buffer_size()); | |
362 size_t bytes_to_write = message->main_buffer_size() - write_message_offset_; | |
363 ssize_t bytes_written = HANDLE_EINTR( | |
364 write(fd_.get().fd, | |
365 static_cast<const char*>(message->main_buffer()) + | |
366 write_message_offset_, | |
367 bytes_to_write)); | |
368 if (bytes_written < 0) { | |
369 if (errno != EAGAIN && errno != EWOULDBLOCK) { | |
370 PLOG(ERROR) << "write of size " << bytes_to_write; | |
371 CancelPendingWritesNoLock(); | |
372 return false; | |
373 } | 277 } |
374 | 278 OnWriteCompleted(false, 0); |
375 // We simply failed to write since we'd block. The logic is the same as if | |
376 // we got a partial write. | |
377 bytes_written = 0; | |
378 } | 279 } |
379 | |
380 DCHECK_GE(bytes_written, 0); | |
381 if (static_cast<size_t>(bytes_written) < bytes_to_write) { | |
382 // Partial (or no) write. | |
383 write_message_offset_ += static_cast<size_t>(bytes_written); | |
384 } else { | |
385 // Complete write. | |
386 DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); | |
387 write_message_queue_.pop_front(); | |
388 delete message; | |
389 write_message_offset_ = 0; | |
390 } | |
391 | |
392 return true; | |
393 } | |
394 | |
395 void RawChannelPosix::CancelPendingWritesNoLock() { | |
396 write_lock_.AssertAcquired(); | |
397 DCHECK(!write_stopped_); | |
398 | |
399 write_stopped_ = true; | |
400 STLDeleteElements(&write_message_queue_); | |
401 } | 280 } |
402 | 281 |
403 } // namespace | 282 } // namespace |
404 | 283 |
405 // ----------------------------------------------------------------------------- | 284 // ----------------------------------------------------------------------------- |
406 | 285 |
407 // Static factory method declared in raw_channel.h. | 286 // Static factory method declared in raw_channel.h. |
408 // static | 287 // static |
409 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, | 288 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, |
410 Delegate* delegate, | 289 Delegate* delegate, |
411 base::MessageLoopForIO* message_loop_for_io) { | 290 base::MessageLoopForIO* message_loop_for_io) { |
412 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io); | 291 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io); |
413 } | 292 } |
414 | 293 |
415 } // namespace system | 294 } // namespace system |
416 } // namespace mojo | 295 } // namespace mojo |
OLD | NEW |