Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(259)

Side by Side Diff: mojo/edk/system/raw_channel_posix.cc

Issue 1337953004: base::Lock -> Mutex in RawChannel. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/raw_channel.cc ('k') | mojo/edk/system/raw_channel_win.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/raw_channel.h" 5 #include "mojo/edk/system/raw_channel.h"
6 6
7 #include <errno.h> 7 #include <errno.h>
8 #include <sys/uio.h> 8 #include <sys/uio.h>
9 #include <unistd.h> 9 #include <unistd.h>
10 10
(...skipping 24 matching lines...) Expand all
35 explicit RawChannelPosix(embedder::ScopedPlatformHandle handle); 35 explicit RawChannelPosix(embedder::ScopedPlatformHandle handle);
36 ~RawChannelPosix() override; 36 ~RawChannelPosix() override;
37 37
38 // |RawChannel| public methods: 38 // |RawChannel| public methods:
39 size_t GetSerializedPlatformHandleSize() const override; 39 size_t GetSerializedPlatformHandleSize() const override;
40 40
41 private: 41 private:
42 // |RawChannel| protected methods: 42 // |RawChannel| protected methods:
43 // Actually override this so that we can send multiple messages with (only) 43 // Actually override this so that we can send multiple messages with (only)
44 // FDs if necessary. 44 // FDs if necessary.
45 void EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) override; 45 void EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) override
46 MOJO_EXCLUSIVE_LOCKS_REQUIRED(write_mutex());
46 // Override this to handle those extra FD-only messages. 47 // Override this to handle those extra FD-only messages.
47 bool OnReadMessageForRawChannel( 48 bool OnReadMessageForRawChannel(
48 const MessageInTransit::View& message_view) override; 49 const MessageInTransit::View& message_view) override;
49 IOResult Read(size_t* bytes_read) override; 50 IOResult Read(size_t* bytes_read) override;
50 IOResult ScheduleRead() override; 51 IOResult ScheduleRead() override;
51 embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles( 52 embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
52 size_t num_platform_handles, 53 size_t num_platform_handles,
53 const void* platform_handle_table) override; 54 const void* platform_handle_table) override;
54 IOResult WriteNoLock(size_t* platform_handles_written, 55 IOResult WriteNoLock(size_t* platform_handles_written,
55 size_t* bytes_written) override; 56 size_t* bytes_written) override;
(...skipping 15 matching lines...) Expand all
71 embedder::ScopedPlatformHandle fd_; 72 embedder::ScopedPlatformHandle fd_;
72 73
73 // The following members are only used on the I/O thread: 74 // The following members are only used on the I/O thread:
74 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; 75 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
75 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; 76 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
76 77
77 bool pending_read_; 78 bool pending_read_;
78 79
79 std::deque<embedder::PlatformHandle> read_platform_handles_; 80 std::deque<embedder::PlatformHandle> read_platform_handles_;
80 81
81 // The following members are used on multiple threads and protected by 82 bool pending_write_ MOJO_GUARDED_BY(write_mutex());
82 // |write_lock()|:
83 bool pending_write_;
84 83
85 // This is used for posting tasks from write threads to the I/O thread. It 84 // This is used for posting tasks from write threads to the I/O thread. The
86 // must only be accessed under |write_lock_|. The weak pointers it produces 85 // weak pointers it produces are only used/invalidated on the I/O thread.
87 // are only used/invalidated on the I/O thread. 86 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_
88 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; 87 MOJO_GUARDED_BY(write_mutex());
89 88
90 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); 89 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
91 }; 90 };
92 91
93 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle) 92 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle)
94 : fd_(handle.Pass()), 93 : fd_(handle.Pass()),
95 pending_read_(false), 94 pending_read_(false),
96 pending_write_(false), 95 pending_write_(false),
97 weak_ptr_factory_(this) { 96 weak_ptr_factory_(this) {
98 DCHECK(fd_.is_valid()); 97 DCHECK(fd_.is_valid());
99 } 98 }
100 99
101 RawChannelPosix::~RawChannelPosix() { 100 RawChannelPosix::~RawChannelPosix() {
102 DCHECK(!pending_read_); 101 DCHECK(!pending_read_);
103 DCHECK(!pending_write_); 102 DCHECK(!pending_write_);
104 103
105 // No need to take the |write_lock()| here -- if there are still weak pointers 104 // No need to take |write_mutex()| here -- if there are still weak pointers
106 // outstanding, then we're hosed anyway (since we wouldn't be able to 105 // outstanding, then we're hosed anyway (since we wouldn't be able to
107 // invalidate them cleanly, since we might not be on the I/O thread). 106 // invalidate them cleanly, since we might not be on the I/O thread).
108 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 107 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
109 108
110 // These must have been shut down/destroyed on the I/O thread. 109 // These must have been shut down/destroyed on the I/O thread.
111 DCHECK(!read_watcher_); 110 DCHECK(!read_watcher_);
112 DCHECK(!write_watcher_); 111 DCHECK(!write_watcher_);
113 112
114 embedder::CloseAllPlatformHandles(&read_platform_handles_); 113 embedder::CloseAllPlatformHandles(&read_platform_handles_);
115 } 114 }
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
208 read_platform_handles_.begin() + num_platform_handles); 207 read_platform_handles_.begin() + num_platform_handles);
209 read_platform_handles_.erase( 208 read_platform_handles_.erase(
210 read_platform_handles_.begin(), 209 read_platform_handles_.begin(),
211 read_platform_handles_.begin() + num_platform_handles); 210 read_platform_handles_.begin() + num_platform_handles);
212 return rv.Pass(); 211 return rv.Pass();
213 } 212 }
214 213
215 RawChannel::IOResult RawChannelPosix::WriteNoLock( 214 RawChannel::IOResult RawChannelPosix::WriteNoLock(
216 size_t* platform_handles_written, 215 size_t* platform_handles_written,
217 size_t* bytes_written) { 216 size_t* bytes_written) {
218 write_lock().AssertAcquired(); 217 write_mutex().AssertHeld();
219 218
220 DCHECK(!pending_write_); 219 DCHECK(!pending_write_);
221 220
222 size_t num_platform_handles = 0; 221 size_t num_platform_handles = 0;
223 ssize_t write_result; 222 ssize_t write_result;
224 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) { 223 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
225 embedder::PlatformHandle* platform_handles; 224 embedder::PlatformHandle* platform_handles;
226 void* serialization_data; // Actually unused. 225 void* serialization_data; // Actually unused.
227 write_buffer_no_lock()->GetPlatformHandlesToSend( 226 write_buffer_no_lock()->GetPlatformHandlesToSend(
228 &num_platform_handles, &platform_handles, &serialization_data); 227 &num_platform_handles, &platform_handles, &serialization_data);
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
281 280
282 if (errno != EAGAIN && errno != EWOULDBLOCK) { 281 if (errno != EAGAIN && errno != EWOULDBLOCK) {
283 PLOG(WARNING) << "sendmsg/write/writev"; 282 PLOG(WARNING) << "sendmsg/write/writev";
284 return IO_FAILED_UNKNOWN; 283 return IO_FAILED_UNKNOWN;
285 } 284 }
286 285
287 return ScheduleWriteNoLock(); 286 return ScheduleWriteNoLock();
288 } 287 }
289 288
290 RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() { 289 RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
291 write_lock().AssertAcquired(); 290 write_mutex().AssertHeld();
292 291
293 DCHECK(!pending_write_); 292 DCHECK(!pending_write_);
294 293
295 // Set up to wait for the FD to become writable. 294 // Set up to wait for the FD to become writable.
296 // If we're not on the I/O thread, we have to post a task to do this. 295 // If we're not on the I/O thread, we have to post a task to do this.
297 if (base::MessageLoop::current() != message_loop_for_io()) { 296 if (base::MessageLoop::current() != message_loop_for_io()) {
298 message_loop_for_io()->PostTask(FROM_HERE, 297 message_loop_for_io()->PostTask(FROM_HERE,
299 base::Bind(&RawChannelPosix::WaitToWrite, 298 base::Bind(&RawChannelPosix::WaitToWrite,
300 weak_ptr_factory_.GetWeakPtr())); 299 weak_ptr_factory_.GetWeakPtr()));
301 pending_write_ = true; 300 pending_write_ = true;
(...skipping 23 matching lines...) Expand all
325 // fails cleanly. 324 // fails cleanly.
326 CHECK(message_loop_for_io()->WatchFileDescriptor( 325 CHECK(message_loop_for_io()->WatchFileDescriptor(
327 fd_.get().fd, true, base::MessageLoopForIO::WATCH_READ, 326 fd_.get().fd, true, base::MessageLoopForIO::WATCH_READ,
328 read_watcher_.get(), this)); 327 read_watcher_.get(), this));
329 } 328 }
330 329
331 void RawChannelPosix::OnShutdownNoLock( 330 void RawChannelPosix::OnShutdownNoLock(
332 scoped_ptr<ReadBuffer> /*read_buffer*/, 331 scoped_ptr<ReadBuffer> /*read_buffer*/,
333 scoped_ptr<WriteBuffer> /*write_buffer*/) { 332 scoped_ptr<WriteBuffer> /*write_buffer*/) {
334 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 333 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
335 write_lock().AssertAcquired(); 334 write_mutex().AssertHeld();
336 335
337 read_watcher_.reset(); // This will stop watching (if necessary). 336 read_watcher_.reset(); // This will stop watching (if necessary).
338 write_watcher_.reset(); // This will stop watching (if necessary). 337 write_watcher_.reset(); // This will stop watching (if necessary).
339 338
340 pending_read_ = false; 339 pending_read_ = false;
341 pending_write_ = false; 340 pending_write_ = false;
342 341
343 DCHECK(fd_.is_valid()); 342 DCHECK(fd_.is_valid());
344 fd_.reset(); 343 fd_.reset();
345 344
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
378 } 377 }
379 378
380 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { 379 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
381 DCHECK_EQ(fd, fd_.get().fd); 380 DCHECK_EQ(fd, fd_.get().fd);
382 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 381 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
383 382
384 IOResult io_result; 383 IOResult io_result;
385 size_t platform_handles_written = 0; 384 size_t platform_handles_written = 0;
386 size_t bytes_written = 0; 385 size_t bytes_written = 0;
387 { 386 {
388 base::AutoLock locker(write_lock()); 387 MutexLocker locker(&write_mutex());
389 388
390 DCHECK(pending_write_); 389 DCHECK(pending_write_);
391 390
392 pending_write_ = false; 391 pending_write_ = false;
393 io_result = WriteNoLock(&platform_handles_written, &bytes_written); 392 io_result = WriteNoLock(&platform_handles_written, &bytes_written);
394 } 393 }
395 394
396 if (io_result != IO_PENDING) { 395 if (io_result != IO_PENDING) {
397 OnWriteCompleted(io_result, platform_handles_written, bytes_written); 396 OnWriteCompleted(io_result, platform_handles_written, bytes_written);
398 return; // |this| may have been destroyed in |OnWriteCompleted()|. 397 return; // |this| may have been destroyed in |OnWriteCompleted()|.
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
447 446
448 void RawChannelPosix::WaitToWrite() { 447 void RawChannelPosix::WaitToWrite() {
449 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 448 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
450 449
451 DCHECK(write_watcher_); 450 DCHECK(write_watcher_);
452 451
453 if (!message_loop_for_io()->WatchFileDescriptor( 452 if (!message_loop_for_io()->WatchFileDescriptor(
454 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, 453 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
455 write_watcher_.get(), this)) { 454 write_watcher_.get(), this)) {
456 { 455 {
457 base::AutoLock locker(write_lock()); 456 MutexLocker locker(&write_mutex());
458 457
459 DCHECK(pending_write_); 458 DCHECK(pending_write_);
460 pending_write_ = false; 459 pending_write_ = false;
461 } 460 }
462 OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0); 461 OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0);
463 return; // |this| may have been destroyed in |OnWriteCompleted()|. 462 return; // |this| may have been destroyed in |OnWriteCompleted()|.
464 } 463 }
465 } 464 }
466 465
467 } // namespace 466 } // namespace
468 467
469 // ----------------------------------------------------------------------------- 468 // -----------------------------------------------------------------------------
470 469
471 // Static factory method declared in raw_channel.h. 470 // Static factory method declared in raw_channel.h.
472 // static 471 // static
473 scoped_ptr<RawChannel> RawChannel::Create( 472 scoped_ptr<RawChannel> RawChannel::Create(
474 embedder::ScopedPlatformHandle handle) { 473 embedder::ScopedPlatformHandle handle) {
475 return make_scoped_ptr(new RawChannelPosix(handle.Pass())); 474 return make_scoped_ptr(new RawChannelPosix(handle.Pass()));
476 } 475 }
477 476
478 } // namespace system 477 } // namespace system
479 } // namespace mojo 478 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/raw_channel.cc ('k') | mojo/edk/system/raw_channel_win.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698