OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/channel.h" | 5 #include "mojo/edk/system/channel.h" |
6 | 6 |
7 #include <errno.h> | 7 #include <errno.h> |
8 #include <sys/socket.h> | 8 #include <sys/socket.h> |
9 | 9 |
10 #include <algorithm> | 10 #include <algorithm> |
(...skipping 193 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
204 DCHECK(!read_watcher_); | 204 DCHECK(!read_watcher_); |
205 DCHECK(!write_watcher_); | 205 DCHECK(!write_watcher_); |
206 for (auto handle : incoming_platform_handles_) | 206 for (auto handle : incoming_platform_handles_) |
207 handle.CloseIfNecessary(); | 207 handle.CloseIfNecessary(); |
208 } | 208 } |
209 | 209 |
210 void StartOnIOThread() { | 210 void StartOnIOThread() { |
211 DCHECK(!read_watcher_); | 211 DCHECK(!read_watcher_); |
212 DCHECK(!write_watcher_); | 212 DCHECK(!write_watcher_); |
213 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); | 213 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); |
214 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); | |
215 base::MessageLoopForIO::current()->WatchFileDescriptor( | |
216 handle_.get().handle, true /* persistent */, | |
217 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); | |
218 base::MessageLoop::current()->AddDestructionObserver(this); | 214 base::MessageLoop::current()->AddDestructionObserver(this); |
| 215 if (handle_.get().needs_connection) { |
| 216 base::MessageLoopForIO::current()->WatchFileDescriptor( |
| 217 handle_.get().handle, false /* persistent */, |
| 218 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); |
| 219 } else { |
| 220 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); |
| 221 base::MessageLoopForIO::current()->WatchFileDescriptor( |
| 222 handle_.get().handle, true /* persistent */, |
| 223 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); |
| 224 base::AutoLock lock(write_lock_); |
| 225 FlushOutgoingMessagesNoLock(); |
| 226 } |
219 } | 227 } |
220 | 228 |
221 void WaitForWriteOnIOThread() { | 229 void WaitForWriteOnIOThread() { |
222 base::AutoLock lock(write_lock_); | 230 base::AutoLock lock(write_lock_); |
223 WaitForWriteOnIOThreadNoLock(); | 231 WaitForWriteOnIOThreadNoLock(); |
224 } | 232 } |
225 | 233 |
226 void WaitForWriteOnIOThreadNoLock() { | 234 void WaitForWriteOnIOThreadNoLock() { |
227 if (pending_write_) | 235 if (pending_write_) |
228 return; | 236 return; |
(...skipping 29 matching lines...) Expand all Loading... |
258 // base::MessageLoop::DestructionObserver: | 266 // base::MessageLoop::DestructionObserver: |
259 void WillDestroyCurrentMessageLoop() override { | 267 void WillDestroyCurrentMessageLoop() override { |
260 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 268 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
261 if (self_) | 269 if (self_) |
262 ShutDownOnIOThread(); | 270 ShutDownOnIOThread(); |
263 } | 271 } |
264 | 272 |
265 // base::MessageLoopForIO::Watcher: | 273 // base::MessageLoopForIO::Watcher: |
266 void OnFileCanReadWithoutBlocking(int fd) override { | 274 void OnFileCanReadWithoutBlocking(int fd) override { |
267 CHECK_EQ(fd, handle_.get().handle); | 275 CHECK_EQ(fd, handle_.get().handle); |
| 276 if (handle_.get().needs_connection) { |
| 277 #if !defined(OS_NACL) |
| 278 read_watcher_.reset(); |
| 279 base::MessageLoop::current()->RemoveDestructionObserver(this); |
| 280 |
| 281 ScopedPlatformHandle accept_fd; |
| 282 ServerAcceptConnection(handle_.get(), &accept_fd); |
| 283 if (!accept_fd.is_valid()) { |
| 284 OnError(); |
| 285 return; |
| 286 } |
| 287 handle_ = std::move(accept_fd); |
| 288 StartOnIOThread(); |
| 289 #else |
| 290 NOTREACHED(); |
| 291 #endif |
| 292 return; |
| 293 } |
268 | 294 |
269 bool read_error = false; | 295 bool read_error = false; |
270 size_t next_read_size = 0; | 296 size_t next_read_size = 0; |
271 size_t buffer_capacity = 0; | 297 size_t buffer_capacity = 0; |
272 size_t total_bytes_read = 0; | 298 size_t total_bytes_read = 0; |
273 size_t bytes_read = 0; | 299 size_t bytes_read = 0; |
274 do { | 300 do { |
275 buffer_capacity = next_read_size; | 301 buffer_capacity = next_read_size; |
276 char* buffer = GetReadBuffer(&buffer_capacity); | 302 char* buffer = GetReadBuffer(&buffer_capacity); |
277 DCHECK_GT(buffer_capacity, 0u); | 303 DCHECK_GT(buffer_capacity, 0u); |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
314 reject_writes_ = write_error = true; | 340 reject_writes_ = write_error = true; |
315 } | 341 } |
316 if (write_error) | 342 if (write_error) |
317 OnError(); | 343 OnError(); |
318 } | 344 } |
319 | 345 |
320 // Attempts to write a message directly to the channel. If the full message | 346 // Attempts to write a message directly to the channel. If the full message |
321 // cannot be written, it's queued and a wait is initiated to write the message | 347 // cannot be written, it's queued and a wait is initiated to write the message |
322 // ASAP on the I/O thread. | 348 // ASAP on the I/O thread. |
323 bool WriteNoLock(MessageView message_view) { | 349 bool WriteNoLock(MessageView message_view) { |
| 350 if (handle_.get().needs_connection) { |
| 351 outgoing_messages_.emplace_front(std::move(message_view)); |
| 352 return true; |
| 353 } |
324 size_t bytes_written = 0; | 354 size_t bytes_written = 0; |
325 do { | 355 do { |
326 message_view.advance_data_offset(bytes_written); | 356 message_view.advance_data_offset(bytes_written); |
327 | 357 |
328 ssize_t result; | 358 ssize_t result; |
329 ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles(); | 359 ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles(); |
330 if (handles && handles->size()) { | 360 if (handles && handles->size()) { |
331 iovec iov = { | 361 iovec iov = { |
332 const_cast<void*>(message_view.data()), | 362 const_cast<void*>(message_view.data()), |
333 message_view.data_num_bytes() | 363 message_view.data_num_bytes() |
(...skipping 177 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
511 // static | 541 // static |
512 scoped_refptr<Channel> Channel::Create( | 542 scoped_refptr<Channel> Channel::Create( |
513 Delegate* delegate, | 543 Delegate* delegate, |
514 ScopedPlatformHandle platform_handle, | 544 ScopedPlatformHandle platform_handle, |
515 scoped_refptr<base::TaskRunner> io_task_runner) { | 545 scoped_refptr<base::TaskRunner> io_task_runner) { |
516 return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner); | 546 return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner); |
517 } | 547 } |
518 | 548 |
519 } // namespace edk | 549 } // namespace edk |
520 } // namespace mojo | 550 } // namespace mojo |
OLD | NEW |