OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/channel.h" | 5 #include "mojo/edk/system/channel.h" |
6 | 6 |
7 #include <errno.h> | 7 #include <errno.h> |
8 #include <sys/socket.h> | 8 #include <sys/socket.h> |
9 | 9 |
10 #include <algorithm> | 10 #include <algorithm> |
11 #include <deque> | 11 #include <deque> |
12 #include <limits> | 12 #include <limits> |
13 #include <memory> | 13 #include <memory> |
14 | 14 |
15 #include "base/bind.h" | 15 #include "base/bind.h" |
16 #include "base/location.h" | 16 #include "base/location.h" |
17 #include "base/macros.h" | 17 #include "base/macros.h" |
18 #include "base/memory/ref_counted.h" | 18 #include "base/memory/ref_counted.h" |
19 #include "base/message_loop/message_loop.h" | 19 #include "base/message_loop/message_loop.h" |
20 #include "base/synchronization/lock.h" | 20 #include "base/synchronization/lock.h" |
21 #include "base/task_runner.h" | 21 #include "base/task_runner.h" |
| 22 #include "ipc/unix_domain_socket_util.h" |
22 #include "mojo/edk/embedder/platform_channel_utils_posix.h" | 23 #include "mojo/edk/embedder/platform_channel_utils_posix.h" |
23 #include "mojo/edk/embedder/platform_handle_vector.h" | 24 #include "mojo/edk/embedder/platform_handle_vector.h" |
24 | 25 |
25 #if !defined(OS_NACL) | 26 #if !defined(OS_NACL) |
26 #include <sys/uio.h> | 27 #include <sys/uio.h> |
27 #endif | 28 #endif |
28 | 29 |
29 namespace mojo { | 30 namespace mojo { |
30 namespace edk { | 31 namespace edk { |
31 | 32 |
(...skipping 172 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
204 DCHECK(!read_watcher_); | 205 DCHECK(!read_watcher_); |
205 DCHECK(!write_watcher_); | 206 DCHECK(!write_watcher_); |
206 for (auto handle : incoming_platform_handles_) | 207 for (auto handle : incoming_platform_handles_) |
207 handle.CloseIfNecessary(); | 208 handle.CloseIfNecessary(); |
208 } | 209 } |
209 | 210 |
210 void StartOnIOThread() { | 211 void StartOnIOThread() { |
211 DCHECK(!read_watcher_); | 212 DCHECK(!read_watcher_); |
212 DCHECK(!write_watcher_); | 213 DCHECK(!write_watcher_); |
213 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); | 214 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); |
214 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); | |
215 base::MessageLoopForIO::current()->WatchFileDescriptor( | |
216 handle_.get().handle, true /* persistent */, | |
217 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); | |
218 base::MessageLoop::current()->AddDestructionObserver(this); | 215 base::MessageLoop::current()->AddDestructionObserver(this); |
| 216 if (handle_.get().needs_connection) { |
| 217 base::MessageLoopForIO::current()->WatchFileDescriptor( |
| 218 handle_.get().handle, false /* persistent */, |
| 219 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); |
| 220 } else { |
| 221 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); |
| 222 base::MessageLoopForIO::current()->WatchFileDescriptor( |
| 223 handle_.get().handle, true /* persistent */, |
| 224 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); |
| 225 base::AutoLock lock(write_lock_); |
| 226 FlushOutgoingMessagesNoLock(); |
| 227 } |
219 } | 228 } |
220 | 229 |
221 void WaitForWriteOnIOThread() { | 230 void WaitForWriteOnIOThread() { |
222 base::AutoLock lock(write_lock_); | 231 base::AutoLock lock(write_lock_); |
223 WaitForWriteOnIOThreadNoLock(); | 232 WaitForWriteOnIOThreadNoLock(); |
224 } | 233 } |
225 | 234 |
226 void WaitForWriteOnIOThreadNoLock() { | 235 void WaitForWriteOnIOThreadNoLock() { |
227 if (pending_write_) | 236 if (pending_write_) |
228 return; | 237 return; |
(...skipping 29 matching lines...) Expand all Loading... |
258 // base::MessageLoop::DestructionObserver: | 267 // base::MessageLoop::DestructionObserver: |
259 void WillDestroyCurrentMessageLoop() override { | 268 void WillDestroyCurrentMessageLoop() override { |
260 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 269 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
261 if (self_) | 270 if (self_) |
262 ShutDownOnIOThread(); | 271 ShutDownOnIOThread(); |
263 } | 272 } |
264 | 273 |
265 // base::MessageLoopForIO::Watcher: | 274 // base::MessageLoopForIO::Watcher: |
266 void OnFileCanReadWithoutBlocking(int fd) override { | 275 void OnFileCanReadWithoutBlocking(int fd) override { |
267 CHECK_EQ(fd, handle_.get().handle); | 276 CHECK_EQ(fd, handle_.get().handle); |
| 277 if (handle_.get().needs_connection) { |
| 278 #if !defined(OS_NACL) |
| 279 read_watcher_.reset(); |
| 280 base::MessageLoop::current()->RemoveDestructionObserver(this); |
| 281 |
| 282 int accept_fd = -1; |
| 283 if (!IPC::ServerOnConnect(handle_.get().handle, &accept_fd)) { |
| 284 OnError(); |
| 285 return; |
| 286 } |
| 287 |
| 288 handle_.reset(PlatformHandle(accept_fd)); |
| 289 StartOnIOThread(); |
| 290 #else |
| 291 NOTREACHED(); |
| 292 #endif |
| 293 return; |
| 294 } |
268 | 295 |
269 bool read_error = false; | 296 bool read_error = false; |
270 size_t next_read_size = 0; | 297 size_t next_read_size = 0; |
271 size_t buffer_capacity = 0; | 298 size_t buffer_capacity = 0; |
272 size_t total_bytes_read = 0; | 299 size_t total_bytes_read = 0; |
273 size_t bytes_read = 0; | 300 size_t bytes_read = 0; |
274 do { | 301 do { |
275 buffer_capacity = next_read_size; | 302 buffer_capacity = next_read_size; |
276 char* buffer = GetReadBuffer(&buffer_capacity); | 303 char* buffer = GetReadBuffer(&buffer_capacity); |
277 DCHECK_GT(buffer_capacity, 0u); | 304 DCHECK_GT(buffer_capacity, 0u); |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
314 reject_writes_ = write_error = true; | 341 reject_writes_ = write_error = true; |
315 } | 342 } |
316 if (write_error) | 343 if (write_error) |
317 OnError(); | 344 OnError(); |
318 } | 345 } |
319 | 346 |
320 // Attempts to write a message directly to the channel. If the full message | 347 // Attempts to write a message directly to the channel. If the full message |
321 // cannot be written, it's queued and a wait is initiated to write the message | 348 // cannot be written, it's queued and a wait is initiated to write the message |
322 // ASAP on the I/O thread. | 349 // ASAP on the I/O thread. |
323 bool WriteNoLock(MessageView message_view) { | 350 bool WriteNoLock(MessageView message_view) { |
| 351 if (handle_.get().needs_connection) { |
| 352 outgoing_messages_.emplace_front(std::move(message_view)); |
| 353 return true; |
| 354 } |
| 355 |
324 size_t bytes_written = 0; | 356 size_t bytes_written = 0; |
325 do { | 357 do { |
326 message_view.advance_data_offset(bytes_written); | 358 message_view.advance_data_offset(bytes_written); |
327 | 359 |
328 ssize_t result; | 360 ssize_t result; |
329 ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles(); | 361 ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles(); |
330 if (handles && handles->size()) { | 362 if (handles && handles->size()) { |
331 iovec iov = { | 363 iovec iov = { |
332 const_cast<void*>(message_view.data()), | 364 const_cast<void*>(message_view.data()), |
333 message_view.data_num_bytes() | 365 message_view.data_num_bytes() |
(...skipping 177 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
511 // static | 543 // static |
512 scoped_refptr<Channel> Channel::Create( | 544 scoped_refptr<Channel> Channel::Create( |
513 Delegate* delegate, | 545 Delegate* delegate, |
514 ScopedPlatformHandle platform_handle, | 546 ScopedPlatformHandle platform_handle, |
515 scoped_refptr<base::TaskRunner> io_task_runner) { | 547 scoped_refptr<base::TaskRunner> io_task_runner) { |
516 return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner); | 548 return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner); |
517 } | 549 } |
518 | 550 |
519 } // namespace edk | 551 } // namespace edk |
520 } // namespace mojo | 552 } // namespace mojo |
OLD | NEW |