Chromium Code Reviews| Index: media/audio/audio_device_thread.cc |
| diff --git a/media/audio/audio_device_thread.cc b/media/audio/audio_device_thread.cc |
| index 20874599e68f5f7cfddeff0022291e64aa718b6c..0d84e1f2f4ffdf7314aa036da707e68e66b7af24 100644 |
| --- a/media/audio/audio_device_thread.cc |
| +++ b/media/audio/audio_device_thread.cc |
| @@ -16,6 +16,7 @@ |
| #include "base/memory/aligned_memory.h" |
| #include "base/message_loop/message_loop.h" |
| #include "base/numerics/safe_conversions.h" |
| +#include "base/synchronization/condition_variable.h" |
| #include "base/threading/platform_thread.h" |
| #include "base/threading/thread_restrictions.h" |
| #include "media/base/audio_bus.h" |
| @@ -32,55 +33,79 @@ class AudioDeviceThread::Thread |
| : public PlatformThread::Delegate, |
| public base::RefCountedThreadSafe<AudioDeviceThread::Thread> { |
| public: |
| - Thread(AudioDeviceThread::Callback* callback, |
| - base::SyncSocket::Handle socket, |
| - const char* thread_name, |
| - bool synchronized_buffers); |
| + Thread(const char* thread_name, bool synchronized_buffers); |
| + // Starts the thread which will wait in ThreadMain(). Resume() must then be |
| + // called to actually start running it. |
| void Start(); |
| - // Stops the thread. If |loop_for_join| is non-NULL, the function posts |
| + // Shuts down the thread. If |loop_for_join| is non-NULL, the function posts |
| // a task to join (close) the thread handle later instead of waiting for |
| // the thread. If loop_for_join is NULL, then the function waits |
| // synchronously for the thread to terminate. |
| + // Must be called. |
| void Stop(base::MessageLoop* loop_for_join); |
| + // Releases the thread from waiting and starts running it with the given |
| + // |callback| and |socket|. |
| + void Resume(AudioDeviceThread::Callback* callback, |
| + base::SyncSocket::Handle socket); |
| + |
| + // Pauses the thread, it will wait in ThreadMain(). Shuts down the |
| + // socket. Resume() can then be called to restart it. |
| + void Pause(); |
| + |
| private: |
| + enum ResumeAction { kActionPause, kActionPlay, kActionStop }; |
| + |
| friend class base::RefCountedThreadSafe<AudioDeviceThread::Thread>; |
| + |
| ~Thread() override; |
| // Overrides from PlatformThread::Delegate. |
| void ThreadMain() override; |
| - // Runs the loop that reads from the socket. |
| + // Runs the loop that reads from the socket, called in ThreadMain(), |
| void Run(); |
| - private: |
| - base::PlatformThreadHandle thread_; |
| - AudioDeviceThread::Callback* callback_; |
| - base::CancelableSyncSocket socket_; |
| - base::Lock callback_lock_; |
| + // Start(), Resume(), Pause() must be serialized; Stop() can be called any |
| + // moment. |
| + base::ThreadChecker thread_checker_; |
| + |
| const char* thread_name_; |
| const bool synchronized_buffers_; |
| + // Modified in ThreadMain only, no lock protection needed. |
| + AudioDeviceThread::Callback* callback_; |
| + |
| + base::Lock action_lock_; // Protects everything below. |
| + base::ConditionVariable action_condition_; |
| + ResumeAction action_; |
| + |
| + base::PlatformThreadHandle thread_; |
| + |
| + // Reset on client thread. |
| + std::unique_ptr<base::CancelableSyncSocket> socket_; |
| + |
| + // Set on client thead. |
| + AudioDeviceThread::Callback* new_callback_; |
| + base::SyncSocket::Handle new_socket_handle_; |
| + |
| DISALLOW_COPY_AND_ASSIGN(Thread); |
| }; |
| -// AudioDeviceThread implementation |
| +// AudioDeviceThread implementation TODO(olka): get rid of it. It just adds one |
|
o1ka
2016/05/03 15:47:32
You've got a comment on it in the header I think.
Henrik Grunell
2016/05/04 09:05:11
Yes. Removed the whole comment here actually.
|
| +// extra lock. |
| -AudioDeviceThread::AudioDeviceThread() { |
| -} |
| +AudioDeviceThread::AudioDeviceThread() {} |
| AudioDeviceThread::~AudioDeviceThread() { DCHECK(!thread_.get()); } |
| -void AudioDeviceThread::Start(AudioDeviceThread::Callback* callback, |
| - base::SyncSocket::Handle socket, |
| - const char* thread_name, |
| +void AudioDeviceThread::Start(const char* thread_name, |
| bool synchronized_buffers) { |
| base::AutoLock auto_lock(thread_lock_); |
| CHECK(!thread_.get()); |
| - thread_ = new AudioDeviceThread::Thread( |
| - callback, socket, thread_name, synchronized_buffers); |
| + thread_ = new AudioDeviceThread::Thread(thread_name, synchronized_buffers); |
| thread_->Start(); |
| } |
| @@ -92,21 +117,34 @@ void AudioDeviceThread::Stop(base::MessageLoop* loop_for_join) { |
| } |
| } |
| +void AudioDeviceThread::Resume(AudioDeviceThread::Callback* callback, |
| + base::SyncSocket::Handle socket) { |
| + base::AutoLock auto_lock(thread_lock_); |
| + thread_->Resume(callback, socket); |
|
o1ka
2016/05/03 15:47:32
if (thread_.get())?
Henrik Grunell
2016/05/04 09:05:11
Haha, yes, this I commented on in your CL with cha
|
| +} |
| + |
| +void AudioDeviceThread::Pause() { |
| + base::AutoLock auto_lock(thread_lock_); |
| + if (thread_.get()) |
| + thread_->Pause(); |
| +} |
| + |
| bool AudioDeviceThread::IsStopped() { |
| base::AutoLock auto_lock(thread_lock_); |
| return !thread_.get(); |
| } |
| // AudioDeviceThread::Thread implementation |
| -AudioDeviceThread::Thread::Thread(AudioDeviceThread::Callback* callback, |
| - base::SyncSocket::Handle socket, |
| - const char* thread_name, |
| +AudioDeviceThread::Thread::Thread(const char* thread_name, |
| bool synchronized_buffers) |
| - : thread_(), |
| - callback_(callback), |
| - socket_(socket), |
| - thread_name_(thread_name), |
| - synchronized_buffers_(synchronized_buffers) { |
| + : thread_name_(thread_name), |
| + synchronized_buffers_(synchronized_buffers), |
| + callback_(nullptr), |
| + action_condition_(&action_lock_), |
| + action_(kActionPause), |
| + new_callback_(nullptr), |
| + new_socket_handle_(base::CancelableSyncSocket::kInvalidHandle) { |
| + thread_checker_.DetachFromThread(); |
| } |
| AudioDeviceThread::Thread::~Thread() { |
| @@ -114,8 +152,13 @@ AudioDeviceThread::Thread::~Thread() { |
| } |
| void AudioDeviceThread::Thread::Start() { |
| - base::AutoLock auto_lock(callback_lock_); |
| - DCHECK(thread_.is_null()); |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + |
| + base::AutoLock auto_lock(action_lock_); |
| + DCHECK(thread_.is_null()) << "Calling Start() on a started thread?"; |
| + if (action_ == kActionStop) |
| + return; // Stop has already been executed on another thread. |
| + |
| // This reference will be released when the thread exists. |
| AddRef(); |
| @@ -125,15 +168,27 @@ void AudioDeviceThread::Thread::Start() { |
| } |
| void AudioDeviceThread::Thread::Stop(base::MessageLoop* loop_for_join) { |
| - socket_.Shutdown(); |
| + // Note: Stop() can be called any time, even before Start(), because it's |
| + // allowed to be executed on a different thread. |
| base::PlatformThreadHandle thread = base::PlatformThreadHandle(); |
| - |
| - { // NOLINT |
| - base::AutoLock auto_lock(callback_lock_); |
| - callback_ = NULL; |
| + { |
| + base::AutoLock auto_lock(action_lock_); |
| + DCHECK(action_ != kActionStop) << "Calling Stop() on a stopped thread?"; |
| + |
| + // |socket_| is only set/reset under the |action_lock_|, so it's safe to |
| + // access under that lock here. |
| + // TODO BEFORE COMMIT: Is it safe to shutdown socket twice or shutdown a |
| + // socket with an invalid handle? If not, shutdown only if kActionPlay? |
| + if (socket_) |
| + socket_->Shutdown(); |
| + |
| + // Must be done under lock to avoid race wuth Start(). |
| std::swap(thread, thread_); |
| - } |
| + |
| + action_ = kActionStop; |
| + action_condition_.Signal(); |
| + } // auto_lock(action_lock_) |
| if (!thread.is_null()) { |
| if (loop_for_join) { |
| @@ -145,6 +200,43 @@ void AudioDeviceThread::Thread::Stop(base::MessageLoop* loop_for_join) { |
| } |
| } |
| +void AudioDeviceThread::Thread::Resume(AudioDeviceThread::Callback* callback, |
| + base::SyncSocket::Handle socket_handle) { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + |
| + base::AutoLock auto_lock(action_lock_); |
| + if (action_ == kActionStop) |
| + return; // Stop is the last operation to be executed. |
| + |
| + DCHECK(action_ != kActionPlay) << "Resume a non-paused thread?"; |
|
o1ka
2016/05/03 15:47:32
(action_ == kActionPause) for clarity?
Henrik Grunell
2016/05/04 09:05:11
Done.
|
| + |
| + DCHECK_EQ(new_socket_handle_, base::CancelableSyncSocket::kInvalidHandle); |
| + |
| + new_callback_ = callback; |
| + new_socket_handle_ = socket_handle; |
| + |
| + action_ = kActionPlay; |
| + action_condition_.Signal(); |
| +} |
| + |
| +void AudioDeviceThread::Thread::Pause() { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + |
| + base::AutoLock auto_lock(action_lock_); |
| + if (action_ == kActionStop) |
| + return; // Stop is the last operation to be executed. |
| + |
| + DCHECK(action_ != kActionPause) << "Pause a paused thread?"; |
|
o1ka
2016/05/03 15:47:32
(action_ == kActionPlay)?
Henrik Grunell
2016/05/04 09:05:11
Done.
|
| + |
| + // |socket_| is only set/reset under the |action_lock_|, so it's safe to |
| + // access under that lock here. |
| + // TODO BEFORE COMMIT: Is it safe to shutdown socket twice? If not, shutdown |
| + // only if kActionPlay? |
| + if (socket_) |
| + socket_->Shutdown(); |
| + action_ = kActionPause; |
| +} |
| + |
| void AudioDeviceThread::Thread::ThreadMain() { |
| PlatformThread::SetName(thread_name_); |
| @@ -154,13 +246,34 @@ void AudioDeviceThread::Thread::ThreadMain() { |
| // by another thread on which singleton access is OK as well. |
| base::ThreadRestrictions::SetSingletonAllowed(true); |
| - { // NOLINT |
| - base::AutoLock auto_lock(callback_lock_); |
| - if (callback_) |
| - callback_->InitializeOnAudioThread(); |
| - } |
| + while (true) { |
| + { |
| + base::AutoLock auto_lock(action_lock_); |
| + while (action_ == kActionPause) |
| + action_condition_.Wait(); |
| + |
| + if (action_ == kActionStop) |
| + break; |
| + // else reinitialize and resume. |
| + |
| + socket_.reset(new base::CancelableSyncSocket(new_socket_handle_)); |
| + |
| + callback_ = new_callback_; |
| + if (callback_) |
| + callback_->InitializeOnAudioThread(); |
| + |
| + new_callback_ = nullptr; |
| + new_socket_handle_ = base::CancelableSyncSocket::kInvalidHandle; |
| + } // auto_lock(action_lock_) |
| - Run(); |
| + Run(); |
| + |
| + // We are here if and only if socket operation failed. The possible reasons |
| + // are: (a) Shutdown() has been called on the socket (=Stop() or Pause()); |
| + // (b) socket failed. In both cases go back and wait for stop or resume |
| + // action. |
| + callback_ = nullptr; |
| + } |
| // Release the reference for the thread. Note that after this, the Thread |
| // instance will most likely be deleted. |
| @@ -169,9 +282,11 @@ void AudioDeviceThread::Thread::ThreadMain() { |
| void AudioDeviceThread::Thread::Run() { |
| uint32_t buffer_index = 0; |
| + |
| while (true) { |
| uint32_t pending_data = 0; |
| - size_t bytes_read = socket_.Receive(&pending_data, sizeof(pending_data)); |
| + size_t bytes_read = socket_->Receive(&pending_data, sizeof(pending_data)); |
|
o1ka
2016/05/03 15:47:32
Add a comment that both socket_ and callback_ are
Henrik Grunell
2016/05/04 09:05:11
Done. I put it before the loop as a general commen
|
| + |
| if (bytes_read != sizeof(pending_data)) |
| break; |
| @@ -184,7 +299,6 @@ void AudioDeviceThread::Thread::Run() { |
| // |
| // See comments in AudioOutputController::DoPause() for details on why. |
| if (pending_data != std::numeric_limits<uint32_t>::max()) { |
| - base::AutoLock auto_lock(callback_lock_); |
| if (callback_) |
| callback_->Process(pending_data); |
| } |
| @@ -201,11 +315,12 @@ void AudioDeviceThread::Thread::Run() { |
| // details on how this works see AudioSyncReader::WaitUntilDataIsReady(). |
| if (synchronized_buffers_) { |
| ++buffer_index; |
| - size_t bytes_sent = socket_.Send(&buffer_index, sizeof(buffer_index)); |
| + size_t bytes_sent = socket_->Send(&buffer_index, sizeof(buffer_index)); |
| + |
| if (bytes_sent != sizeof(buffer_index)) |
| break; |
| } |
| - } |
| + } // while |
| } |
| // AudioDeviceThread::Callback implementation |