OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/raw_channel.h" | 5 #include "mojo/edk/system/raw_channel.h" |
6 | 6 |
7 #include <errno.h> | 7 #include <errno.h> |
8 #include <sys/uio.h> | 8 #include <sys/uio.h> |
9 #include <unistd.h> | 9 #include <unistd.h> |
10 | 10 |
(...skipping 24 matching lines...) Expand all Loading... |
35 explicit RawChannelPosix(embedder::ScopedPlatformHandle handle); | 35 explicit RawChannelPosix(embedder::ScopedPlatformHandle handle); |
36 ~RawChannelPosix() override; | 36 ~RawChannelPosix() override; |
37 | 37 |
38 // |RawChannel| public methods: | 38 // |RawChannel| public methods: |
39 size_t GetSerializedPlatformHandleSize() const override; | 39 size_t GetSerializedPlatformHandleSize() const override; |
40 | 40 |
41 private: | 41 private: |
42 // |RawChannel| protected methods: | 42 // |RawChannel| protected methods: |
43 // Actually override this so that we can send multiple messages with (only) | 43 // Actually override this so that we can send multiple messages with (only) |
44 // FDs if necessary. | 44 // FDs if necessary. |
45 void EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) override; | 45 void EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) override |
| 46 MOJO_EXCLUSIVE_LOCKS_REQUIRED(write_mutex()); |
46 // Override this to handle those extra FD-only messages. | 47 // Override this to handle those extra FD-only messages. |
47 bool OnReadMessageForRawChannel( | 48 bool OnReadMessageForRawChannel( |
48 const MessageInTransit::View& message_view) override; | 49 const MessageInTransit::View& message_view) override; |
49 IOResult Read(size_t* bytes_read) override; | 50 IOResult Read(size_t* bytes_read) override; |
50 IOResult ScheduleRead() override; | 51 IOResult ScheduleRead() override; |
51 embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles( | 52 embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles( |
52 size_t num_platform_handles, | 53 size_t num_platform_handles, |
53 const void* platform_handle_table) override; | 54 const void* platform_handle_table) override; |
54 IOResult WriteNoLock(size_t* platform_handles_written, | 55 IOResult WriteNoLock(size_t* platform_handles_written, |
55 size_t* bytes_written) override; | 56 size_t* bytes_written) override; |
(...skipping 15 matching lines...) Expand all Loading... |
71 embedder::ScopedPlatformHandle fd_; | 72 embedder::ScopedPlatformHandle fd_; |
72 | 73 |
73 // The following members are only used on the I/O thread: | 74 // The following members are only used on the I/O thread: |
74 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; | 75 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; |
75 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; | 76 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; |
76 | 77 |
77 bool pending_read_; | 78 bool pending_read_; |
78 | 79 |
79 std::deque<embedder::PlatformHandle> read_platform_handles_; | 80 std::deque<embedder::PlatformHandle> read_platform_handles_; |
80 | 81 |
81 // The following members are used on multiple threads and protected by | 82 bool pending_write_ MOJO_GUARDED_BY(write_mutex()); |
82 // |write_lock()|: | |
83 bool pending_write_; | |
84 | 83 |
85 // This is used for posting tasks from write threads to the I/O thread. It | 84 // This is used for posting tasks from write threads to the I/O thread. The |
86 // must only be accessed under |write_lock_|. The weak pointers it produces | 85 // weak pointers it produces are only used/invalidated on the I/O thread. |
87 // are only used/invalidated on the I/O thread. | 86 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_ |
88 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; | 87 MOJO_GUARDED_BY(write_mutex()); |
89 | 88 |
90 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); | 89 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); |
91 }; | 90 }; |
92 | 91 |
93 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle) | 92 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle) |
94 : fd_(handle.Pass()), | 93 : fd_(handle.Pass()), |
95 pending_read_(false), | 94 pending_read_(false), |
96 pending_write_(false), | 95 pending_write_(false), |
97 weak_ptr_factory_(this) { | 96 weak_ptr_factory_(this) { |
98 DCHECK(fd_.is_valid()); | 97 DCHECK(fd_.is_valid()); |
99 } | 98 } |
100 | 99 |
101 RawChannelPosix::~RawChannelPosix() { | 100 RawChannelPosix::~RawChannelPosix() { |
102 DCHECK(!pending_read_); | 101 DCHECK(!pending_read_); |
103 DCHECK(!pending_write_); | 102 DCHECK(!pending_write_); |
104 | 103 |
105 // No need to take the |write_lock()| here -- if there are still weak pointers | 104 // No need to take |write_mutex()| here -- if there are still weak pointers |
106 // outstanding, then we're hosed anyway (since we wouldn't be able to | 105 // outstanding, then we're hosed anyway (since we wouldn't be able to |
107 // invalidate them cleanly, since we might not be on the I/O thread). | 106 // invalidate them cleanly, since we might not be on the I/O thread). |
108 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); | 107 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
109 | 108 |
110 // These must have been shut down/destroyed on the I/O thread. | 109 // These must have been shut down/destroyed on the I/O thread. |
111 DCHECK(!read_watcher_); | 110 DCHECK(!read_watcher_); |
112 DCHECK(!write_watcher_); | 111 DCHECK(!write_watcher_); |
113 | 112 |
114 embedder::CloseAllPlatformHandles(&read_platform_handles_); | 113 embedder::CloseAllPlatformHandles(&read_platform_handles_); |
115 } | 114 } |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
208 read_platform_handles_.begin() + num_platform_handles); | 207 read_platform_handles_.begin() + num_platform_handles); |
209 read_platform_handles_.erase( | 208 read_platform_handles_.erase( |
210 read_platform_handles_.begin(), | 209 read_platform_handles_.begin(), |
211 read_platform_handles_.begin() + num_platform_handles); | 210 read_platform_handles_.begin() + num_platform_handles); |
212 return rv.Pass(); | 211 return rv.Pass(); |
213 } | 212 } |
214 | 213 |
215 RawChannel::IOResult RawChannelPosix::WriteNoLock( | 214 RawChannel::IOResult RawChannelPosix::WriteNoLock( |
216 size_t* platform_handles_written, | 215 size_t* platform_handles_written, |
217 size_t* bytes_written) { | 216 size_t* bytes_written) { |
218 write_lock().AssertAcquired(); | 217 write_mutex().AssertHeld(); |
219 | 218 |
220 DCHECK(!pending_write_); | 219 DCHECK(!pending_write_); |
221 | 220 |
222 size_t num_platform_handles = 0; | 221 size_t num_platform_handles = 0; |
223 ssize_t write_result; | 222 ssize_t write_result; |
224 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) { | 223 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) { |
225 embedder::PlatformHandle* platform_handles; | 224 embedder::PlatformHandle* platform_handles; |
226 void* serialization_data; // Actually unused. | 225 void* serialization_data; // Actually unused. |
227 write_buffer_no_lock()->GetPlatformHandlesToSend( | 226 write_buffer_no_lock()->GetPlatformHandlesToSend( |
228 &num_platform_handles, &platform_handles, &serialization_data); | 227 &num_platform_handles, &platform_handles, &serialization_data); |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
281 | 280 |
282 if (errno != EAGAIN && errno != EWOULDBLOCK) { | 281 if (errno != EAGAIN && errno != EWOULDBLOCK) { |
283 PLOG(WARNING) << "sendmsg/write/writev"; | 282 PLOG(WARNING) << "sendmsg/write/writev"; |
284 return IO_FAILED_UNKNOWN; | 283 return IO_FAILED_UNKNOWN; |
285 } | 284 } |
286 | 285 |
287 return ScheduleWriteNoLock(); | 286 return ScheduleWriteNoLock(); |
288 } | 287 } |
289 | 288 |
290 RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() { | 289 RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() { |
291 write_lock().AssertAcquired(); | 290 write_mutex().AssertHeld(); |
292 | 291 |
293 DCHECK(!pending_write_); | 292 DCHECK(!pending_write_); |
294 | 293 |
295 // Set up to wait for the FD to become writable. | 294 // Set up to wait for the FD to become writable. |
296 // If we're not on the I/O thread, we have to post a task to do this. | 295 // If we're not on the I/O thread, we have to post a task to do this. |
297 if (base::MessageLoop::current() != message_loop_for_io()) { | 296 if (base::MessageLoop::current() != message_loop_for_io()) { |
298 message_loop_for_io()->PostTask(FROM_HERE, | 297 message_loop_for_io()->PostTask(FROM_HERE, |
299 base::Bind(&RawChannelPosix::WaitToWrite, | 298 base::Bind(&RawChannelPosix::WaitToWrite, |
300 weak_ptr_factory_.GetWeakPtr())); | 299 weak_ptr_factory_.GetWeakPtr())); |
301 pending_write_ = true; | 300 pending_write_ = true; |
(...skipping 23 matching lines...) Expand all Loading... |
325 // fails cleanly. | 324 // fails cleanly. |
326 CHECK(message_loop_for_io()->WatchFileDescriptor( | 325 CHECK(message_loop_for_io()->WatchFileDescriptor( |
327 fd_.get().fd, true, base::MessageLoopForIO::WATCH_READ, | 326 fd_.get().fd, true, base::MessageLoopForIO::WATCH_READ, |
328 read_watcher_.get(), this)); | 327 read_watcher_.get(), this)); |
329 } | 328 } |
330 | 329 |
331 void RawChannelPosix::OnShutdownNoLock( | 330 void RawChannelPosix::OnShutdownNoLock( |
332 scoped_ptr<ReadBuffer> /*read_buffer*/, | 331 scoped_ptr<ReadBuffer> /*read_buffer*/, |
333 scoped_ptr<WriteBuffer> /*write_buffer*/) { | 332 scoped_ptr<WriteBuffer> /*write_buffer*/) { |
334 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 333 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
335 write_lock().AssertAcquired(); | 334 write_mutex().AssertHeld(); |
336 | 335 |
337 read_watcher_.reset(); // This will stop watching (if necessary). | 336 read_watcher_.reset(); // This will stop watching (if necessary). |
338 write_watcher_.reset(); // This will stop watching (if necessary). | 337 write_watcher_.reset(); // This will stop watching (if necessary). |
339 | 338 |
340 pending_read_ = false; | 339 pending_read_ = false; |
341 pending_write_ = false; | 340 pending_write_ = false; |
342 | 341 |
343 DCHECK(fd_.is_valid()); | 342 DCHECK(fd_.is_valid()); |
344 fd_.reset(); | 343 fd_.reset(); |
345 | 344 |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
378 } | 377 } |
379 | 378 |
380 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { | 379 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { |
381 DCHECK_EQ(fd, fd_.get().fd); | 380 DCHECK_EQ(fd, fd_.get().fd); |
382 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 381 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
383 | 382 |
384 IOResult io_result; | 383 IOResult io_result; |
385 size_t platform_handles_written = 0; | 384 size_t platform_handles_written = 0; |
386 size_t bytes_written = 0; | 385 size_t bytes_written = 0; |
387 { | 386 { |
388 base::AutoLock locker(write_lock()); | 387 MutexLocker locker(&write_mutex()); |
389 | 388 |
390 DCHECK(pending_write_); | 389 DCHECK(pending_write_); |
391 | 390 |
392 pending_write_ = false; | 391 pending_write_ = false; |
393 io_result = WriteNoLock(&platform_handles_written, &bytes_written); | 392 io_result = WriteNoLock(&platform_handles_written, &bytes_written); |
394 } | 393 } |
395 | 394 |
396 if (io_result != IO_PENDING) { | 395 if (io_result != IO_PENDING) { |
397 OnWriteCompleted(io_result, platform_handles_written, bytes_written); | 396 OnWriteCompleted(io_result, platform_handles_written, bytes_written); |
398 return; // |this| may have been destroyed in |OnWriteCompleted()|. | 397 return; // |this| may have been destroyed in |OnWriteCompleted()|. |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
447 | 446 |
448 void RawChannelPosix::WaitToWrite() { | 447 void RawChannelPosix::WaitToWrite() { |
449 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | 448 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
450 | 449 |
451 DCHECK(write_watcher_); | 450 DCHECK(write_watcher_); |
452 | 451 |
453 if (!message_loop_for_io()->WatchFileDescriptor( | 452 if (!message_loop_for_io()->WatchFileDescriptor( |
454 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, | 453 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
455 write_watcher_.get(), this)) { | 454 write_watcher_.get(), this)) { |
456 { | 455 { |
457 base::AutoLock locker(write_lock()); | 456 MutexLocker locker(&write_mutex()); |
458 | 457 |
459 DCHECK(pending_write_); | 458 DCHECK(pending_write_); |
460 pending_write_ = false; | 459 pending_write_ = false; |
461 } | 460 } |
462 OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0); | 461 OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0); |
463 return; // |this| may have been destroyed in |OnWriteCompleted()|. | 462 return; // |this| may have been destroyed in |OnWriteCompleted()|. |
464 } | 463 } |
465 } | 464 } |
466 | 465 |
467 } // namespace | 466 } // namespace |
468 | 467 |
469 // ----------------------------------------------------------------------------- | 468 // ----------------------------------------------------------------------------- |
470 | 469 |
471 // Static factory method declared in raw_channel.h. | 470 // Static factory method declared in raw_channel.h. |
472 // static | 471 // static |
473 scoped_ptr<RawChannel> RawChannel::Create( | 472 scoped_ptr<RawChannel> RawChannel::Create( |
474 embedder::ScopedPlatformHandle handle) { | 473 embedder::ScopedPlatformHandle handle) { |
475 return make_scoped_ptr(new RawChannelPosix(handle.Pass())); | 474 return make_scoped_ptr(new RawChannelPosix(handle.Pass())); |
476 } | 475 } |
477 | 476 |
478 } // namespace system | 477 } // namespace system |
479 } // namespace mojo | 478 } // namespace mojo |
OLD | NEW |