Chromium Code Reviews| Index: media/audio/audio_device_thread.cc |
| diff --git a/media/audio/audio_device_thread.cc b/media/audio/audio_device_thread.cc |
| index 20874599e68f5f7cfddeff0022291e64aa718b6c..d3643f753060fcd9ee99ef7cef824a154fbf51a7 100644 |
| --- a/media/audio/audio_device_thread.cc |
| +++ b/media/audio/audio_device_thread.cc |
| @@ -16,7 +16,9 @@ |
| #include "base/memory/aligned_memory.h" |
| #include "base/message_loop/message_loop.h" |
| #include "base/numerics/safe_conversions.h" |
| +#include "base/synchronization/condition_variable.h" |
| #include "base/threading/platform_thread.h" |
| +#include "base/threading/thread_checker.h" |
| #include "base/threading/thread_restrictions.h" |
| #include "media/base/audio_bus.h" |
| @@ -32,119 +34,205 @@ class AudioDeviceThread::Thread |
| : public PlatformThread::Delegate, |
| public base::RefCountedThreadSafe<AudioDeviceThread::Thread> { |
| public: |
| - Thread(AudioDeviceThread::Callback* callback, |
| - base::SyncSocket::Handle socket, |
| - const char* thread_name, |
| - bool synchronized_buffers); |
| + Thread(const char* thread_name, bool synchronized_buffers); |
| - void Start(); |
| + // Starts the thread and suspends it, which means it will wait in |
| + // ThreadMain(). Resume() must then be called to actually start running it. |
| + void StartSuspended(); |
| - // Stops the thread. If |loop_for_join| is non-NULL, the function posts |
| + // Shuts down the thread. If |loop_for_join| is non-NULL, the function posts |
| // a task to join (close) the thread handle later instead of waiting for |
| // the thread. If loop_for_join is NULL, then the function waits |
| // synchronously for the thread to terminate. |
| + // Must be called before destruction. |
| void Stop(base::MessageLoop* loop_for_join); |
| + // Resumes the thread and starts running it with the given |callback| and |
| + // |socket|. |
| + void Resume(AudioDeviceThread::Callback* callback, |
| + base::SyncSocket::Handle socket); |
| + |
| + // Suspends the thread, it will wait in ThreadMain(). Shuts down the socket. |
| + // It can be resumed again by calling Resume(). |
| + void Suspend(); |
| + |
| private: |
| + enum State { STATE_SUSPENDED, STATE_PLAYING, STATE_STOPPED }; |
|
DaleCurtis
2016/05/17 17:54:35
Instead of adding STATE_ consider making this an "
Henrik Grunell
2016/05/18 12:21:32
Good points, though can't use DCHECK_EQ etc. with
|
| + |
| friend class base::RefCountedThreadSafe<AudioDeviceThread::Thread>; |
|
DaleCurtis
2016/05/17 17:54:35
friend goes right under private:
Henrik Grunell
2016/05/18 12:21:32
Done.
|
| + |
| ~Thread() override; |
| // Overrides from PlatformThread::Delegate. |
| void ThreadMain() override; |
| - // Runs the loop that reads from the socket. |
| + // Runs the loop that reads from the socket, called in ThreadMain(). |
| void Run(); |
| - private: |
| - base::PlatformThreadHandle thread_; |
| - AudioDeviceThread::Callback* callback_; |
| - base::CancelableSyncSocket socket_; |
| - base::Lock callback_lock_; |
| + // Ensures that Start(), Resume() and Suspend() are called on the same thread. |
| + // Stop() can be called on any thread. |
| + base::ThreadChecker thread_checker_; |
| + |
| const char* thread_name_; |
| const bool synchronized_buffers_; |
| + base::Lock lock_; // Protects everything below. |
| + base::ConditionVariable exit_suspended_state_; |
| + State state_; |
| + AudioDeviceThread::Callback* callback_; |
| + |
| + base::PlatformThreadHandle thread_handle_; |
| + |
| + // Reset in ThreadMain(). Shutdown on another thread. |
| + std::unique_ptr<base::CancelableSyncSocket> socket_; |
| + |
| + base::SyncSocket::Handle new_socket_handle_; |
| + |
| DISALLOW_COPY_AND_ASSIGN(Thread); |
| }; |
| // AudioDeviceThread implementation |
| +AudioDeviceThread::AudioDeviceThread() {} |
| -AudioDeviceThread::AudioDeviceThread() { |
| +AudioDeviceThread::~AudioDeviceThread() { |
| + DCHECK(!thread_.get()); |
| } |
| -AudioDeviceThread::~AudioDeviceThread() { DCHECK(!thread_.get()); } |
| - |
| -void AudioDeviceThread::Start(AudioDeviceThread::Callback* callback, |
| - base::SyncSocket::Handle socket, |
| - const char* thread_name, |
| - bool synchronized_buffers) { |
| +void AudioDeviceThread::StartSuspended(const char* thread_name, |
| + bool synchronized_buffers) { |
| base::AutoLock auto_lock(thread_lock_); |
| CHECK(!thread_.get()); |
| - thread_ = new AudioDeviceThread::Thread( |
| - callback, socket, thread_name, synchronized_buffers); |
| - thread_->Start(); |
| + thread_ = new AudioDeviceThread::Thread(thread_name, synchronized_buffers); |
| + thread_->StartSuspended(); |
| } |
| void AudioDeviceThread::Stop(base::MessageLoop* loop_for_join) { |
| base::AutoLock auto_lock(thread_lock_); |
| if (thread_.get()) { |
| thread_->Stop(loop_for_join); |
| - thread_ = NULL; |
| + thread_ = nullptr; |
| } |
| } |
| +void AudioDeviceThread::Resume(AudioDeviceThread::Callback* callback, |
| + base::SyncSocket::Handle socket) { |
| + base::AutoLock auto_lock(thread_lock_); |
| + if (thread_.get()) |
|
DaleCurtis
2016/05/17 17:54:35
Drop .get() here and elsewhere.
Henrik Grunell
2016/05/18 12:21:32
Done.
|
| + thread_->Resume(callback, socket); |
| +} |
| + |
| +void AudioDeviceThread::Suspend() { |
| + base::AutoLock auto_lock(thread_lock_); |
| + if (thread_.get()) |
| + thread_->Suspend(); |
| +} |
| + |
| bool AudioDeviceThread::IsStopped() { |
| base::AutoLock auto_lock(thread_lock_); |
| return !thread_.get(); |
| } |
| // AudioDeviceThread::Thread implementation |
| -AudioDeviceThread::Thread::Thread(AudioDeviceThread::Callback* callback, |
| - base::SyncSocket::Handle socket, |
| - const char* thread_name, |
| +AudioDeviceThread::Thread::Thread(const char* thread_name, |
| bool synchronized_buffers) |
| - : thread_(), |
| - callback_(callback), |
| - socket_(socket), |
| - thread_name_(thread_name), |
| - synchronized_buffers_(synchronized_buffers) { |
| -} |
| + : thread_name_(thread_name), |
| + synchronized_buffers_(synchronized_buffers), |
| + exit_suspended_state_(&lock_), |
| + state_(STATE_SUSPENDED), |
| + callback_(nullptr), |
| + new_socket_handle_(base::CancelableSyncSocket::kInvalidHandle) {} |
| AudioDeviceThread::Thread::~Thread() { |
| - DCHECK(thread_.is_null()); |
| + DCHECK(thread_handle_.is_null()); |
| + DCHECK_EQ(state_, STATE_STOPPED) |
| + << "Thread must be stopped before destruction"; |
| } |
| -void AudioDeviceThread::Thread::Start() { |
| - base::AutoLock auto_lock(callback_lock_); |
| - DCHECK(thread_.is_null()); |
| - // This reference will be released when the thread exists. |
| +void AudioDeviceThread::Thread::StartSuspended() { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + |
| + base::AutoLock auto_lock(lock_); |
| + DCHECK(thread_handle_.is_null()) << "Calling Start() on a started thread?"; |
| + if (state_ == STATE_STOPPED) |
| + return; // Stop has already been executed on another thread. |
| + |
| + // This reference will be released when the thread exits. |
|
DaleCurtis
2016/05/17 17:54:35
We should try to remove this in the future too; pr
Henrik Grunell
2016/05/18 12:21:32
Agree, added TODO.
|
| AddRef(); |
| - PlatformThread::CreateWithPriority(0, this, &thread_, |
| + PlatformThread::CreateWithPriority(0, this, &thread_handle_, |
| base::ThreadPriority::REALTIME_AUDIO); |
| - CHECK(!thread_.is_null()); |
| + CHECK(!thread_handle_.is_null()); |
| } |
| void AudioDeviceThread::Thread::Stop(base::MessageLoop* loop_for_join) { |
| - socket_.Shutdown(); |
| + // Note: Stop() can be called at any time, even before Start(), because it's |
| + // allowed to be executed on a different thread. |
| - base::PlatformThreadHandle thread = base::PlatformThreadHandle(); |
| + base::PlatformThreadHandle thread_handle_to_join = |
| + base::PlatformThreadHandle(); |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + DCHECK_NE(state_, STATE_STOPPED) << "Calling Stop() on a stopped thread?"; |
| + callback_ = nullptr; |
| - { // NOLINT |
| - base::AutoLock auto_lock(callback_lock_); |
| - callback_ = NULL; |
| - std::swap(thread, thread_); |
| - } |
| + // |socket_| is only set/reset under the |lock_|, so it's safe to |
| + // access under that lock here. |
| + if (socket_) |
| + socket_->Shutdown(); |
| - if (!thread.is_null()) { |
| + // Must be done under lock to avoid race with Start(). |
| + std::swap(thread_handle_to_join, thread_handle_); |
| + |
| + state_ = STATE_STOPPED; |
| + exit_suspended_state_.Signal(); |
| + } // auto_lock(lock_) |
| + |
| + if (!thread_handle_to_join.is_null()) { |
| if (loop_for_join) { |
| - loop_for_join->PostTask(FROM_HERE, |
| - base::Bind(&base::PlatformThread::Join, thread)); |
| + loop_for_join->PostTask(FROM_HERE, base::Bind(&base::PlatformThread::Join, |
| + thread_handle_to_join)); |
| } else { |
| - base::PlatformThread::Join(thread); |
| + base::PlatformThread::Join(thread_handle_to_join); |
| } |
| } |
| } |
| +void AudioDeviceThread::Thread::Resume(AudioDeviceThread::Callback* callback, |
| + base::SyncSocket::Handle socket_handle) { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + |
| + base::AutoLock auto_lock(lock_); |
| + if (state_ == STATE_STOPPED) |
| + return; // Stop is the last operation to be executed. |
| + |
| + DCHECK_EQ(state_, STATE_SUSPENDED) << "Resume a non-suspended thread?"; |
| + |
| + DCHECK_EQ(new_socket_handle_, base::CancelableSyncSocket::kInvalidHandle); |
| + |
| + callback_ = callback; |
| + new_socket_handle_ = socket_handle; |
| + |
| + state_ = STATE_PLAYING; |
| + exit_suspended_state_.Signal(); |
| +} |
| + |
| +void AudioDeviceThread::Thread::Suspend() { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + |
| + base::AutoLock auto_lock(lock_); |
| + if (state_ == STATE_STOPPED) |
| + return; // Stop is the last operation to be executed. |
| + |
| + DCHECK_EQ(state_, STATE_PLAYING) << "Suspend a suspended thread?"; |
| + callback_ = nullptr; |
| + |
| + // |socket_| is only set/reset under the |lock_|, so it's safe to |
| + // access under that lock here. |
| + if (socket_) |
| + socket_->Shutdown(); |
| + state_ = STATE_SUSPENDED; |
| +} |
| + |
| void AudioDeviceThread::Thread::ThreadMain() { |
| PlatformThread::SetName(thread_name_); |
| @@ -154,13 +242,38 @@ void AudioDeviceThread::Thread::ThreadMain() { |
| // by another thread on which singleton access is OK as well. |
| base::ThreadRestrictions::SetSingletonAllowed(true); |
| - { // NOLINT |
| - base::AutoLock auto_lock(callback_lock_); |
| - if (callback_) |
| - callback_->InitializeOnAudioThread(); |
| - } |
| + while (true) { |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + while (state_ == STATE_SUSPENDED) { |
|
DaleCurtis
2016/05/17 17:54:35
What's the point of doing this versus just restart
Henrik Grunell
2016/05/18 12:21:32
The problem with spinning up a new thread each tim
DaleCurtis
2016/05/18 17:20:43
I feel like overcomplicating our production code t
Henrik Grunell
2016/05/19 13:03:11
The problems is more specifically the components u
DaleCurtis
2016/05/19 20:41:00
I don't think that's a good enough reason to make
Henrik Grunell
2016/05/23 11:41:58
Actually, I have brought up this thread re-use app
|
| + // This releases |lock_| while waiting. |
| + exit_suspended_state_.Wait(); |
| + } |
| + |
| + if (state_ == STATE_STOPPED) |
| + break; |
| + |
| + // If we are playing but don't have a valid new socket handle, we are here |
| + // because the socket receive or send failed for another reason than |
| + // Shutdown() being called in Stop() or Suspend(). In that case, quit the |
| + // thread. |
| + if (new_socket_handle_ == base::CancelableSyncSocket::kInvalidHandle) { |
| + LOG(WARNING) << "Audio socket failed, exiting audio thread."; |
| + break; |
| + } |
| + |
| + // Else reinitialize and resume. |
| + |
| + socket_.reset(new base::CancelableSyncSocket(new_socket_handle_)); |
| + |
| + if (callback_) |
| + callback_->InitializeOnAudioThread(); |
| + |
| + new_socket_handle_ = base::CancelableSyncSocket::kInvalidHandle; |
| + } // auto_lock(lock_) |
| - Run(); |
| + Run(); |
| + } // while |
| // Release the reference for the thread. Note that after this, the Thread |
| // instance will most likely be deleted. |
| @@ -169,9 +282,13 @@ void AudioDeviceThread::Thread::ThreadMain() { |
| void AudioDeviceThread::Thread::Run() { |
| uint32_t buffer_index = 0; |
| + |
| while (true) { |
| uint32_t pending_data = 0; |
| - size_t bytes_read = socket_.Receive(&pending_data, sizeof(pending_data)); |
| + // |socket_| is only modified on this thread, so we don't need a lock to |
| + // access it here. |
| + size_t bytes_read = socket_->Receive(&pending_data, sizeof(pending_data)); |
| + |
| if (bytes_read != sizeof(pending_data)) |
| break; |
| @@ -184,7 +301,10 @@ void AudioDeviceThread::Thread::Run() { |
| // |
| // See comments in AudioOutputController::DoPause() for details on why. |
| if (pending_data != std::numeric_limits<uint32_t>::max()) { |
| - base::AutoLock auto_lock(callback_lock_); |
| + // We can't use an atomic pointer for |callback_|, but need to call on it |
| + // under the state lock. This is because after clearing it in Suspend() or |
| + // Stop(), the caller may delete the callback object. |
| + base::AutoLock auto_lock(lock_); |
| if (callback_) |
| callback_->Process(pending_data); |
| } |
| @@ -201,11 +321,12 @@ void AudioDeviceThread::Thread::Run() { |
| // details on how this works see AudioSyncReader::WaitUntilDataIsReady(). |
| if (synchronized_buffers_) { |
| ++buffer_index; |
| - size_t bytes_sent = socket_.Send(&buffer_index, sizeof(buffer_index)); |
| + size_t bytes_sent = socket_->Send(&buffer_index, sizeof(buffer_index)); |
| + |
| if (bytes_sent != sizeof(buffer_index)) |
| break; |
| } |
| - } |
| + } // while |
| } |
| // AudioDeviceThread::Callback implementation |