Chromium Code Reviews| Index: media/audio/audio_device_thread.cc |
| diff --git a/media/audio/audio_device_thread.cc b/media/audio/audio_device_thread.cc |
| index 20874599e68f5f7cfddeff0022291e64aa718b6c..05778d5e080267d305c7fce5b67109251d894d52 100644 |
| --- a/media/audio/audio_device_thread.cc |
| +++ b/media/audio/audio_device_thread.cc |
| @@ -16,7 +16,9 @@ |
| #include "base/memory/aligned_memory.h" |
| #include "base/message_loop/message_loop.h" |
| #include "base/numerics/safe_conversions.h" |
| +#include "base/synchronization/condition_variable.h" |
| #include "base/threading/platform_thread.h" |
| +#include "base/threading/thread_checker.h" |
| #include "base/threading/thread_restrictions.h" |
| #include "media/base/audio_bus.h" |
| @@ -32,55 +34,76 @@ class AudioDeviceThread::Thread |
| : public PlatformThread::Delegate, |
| public base::RefCountedThreadSafe<AudioDeviceThread::Thread> { |
| public: |
| - Thread(AudioDeviceThread::Callback* callback, |
| - base::SyncSocket::Handle socket, |
| - const char* thread_name, |
| - bool synchronized_buffers); |
| + Thread(const char* thread_name, bool synchronized_buffers); |
| + // Starts the thread which will wait in ThreadMain(). Resume() must then be |
| + // called to actually start running it. |
|
tommi (sloooow) - chröme
2016/05/14 11:06:28
Can we rename the method to StartSuspended?
Henrik Grunell
2016/05/16 14:37:35
Good idea. Done.
|
| void Start(); |
| - // Stops the thread. If |loop_for_join| is non-NULL, the function posts |
| + // Shuts down the thread. If |loop_for_join| is non-NULL, the function posts |
| // a task to join (close) the thread handle later instead of waiting for |
| // the thread. If loop_for_join is NULL, then the function waits |
| // synchronously for the thread to terminate. |
| + // Must be called before destruction. |
| void Stop(base::MessageLoop* loop_for_join); |
| + // Releases the thread from waiting and starts running it with the given |
| + // |callback| and |socket|. |
| + void Resume(AudioDeviceThread::Callback* callback, |
| + base::SyncSocket::Handle socket); |
| + |
| + // Pauses the thread, it will wait in ThreadMain(). Shuts down the |
| + // socket. Resume() can then be called to restart it. |
| + void Pause(); |
|
tommi (sloooow) - chröme
2016/05/14 11:06:28
Suspend() (to match with Resume())
Henrik Grunell
2016/05/16 14:37:35
Also good. Done.
|
| + |
| private: |
| + enum State { STATE_PAUSED, STATE_PLAYING, STATE_STOPPED }; |
| + |
| friend class base::RefCountedThreadSafe<AudioDeviceThread::Thread>; |
| + |
| ~Thread() override; |
| // Overrides from PlatformThread::Delegate. |
| void ThreadMain() override; |
| - // Runs the loop that reads from the socket. |
| + // Runs the loop that reads from the socket, called in ThreadMain(). |
| void Run(); |
| - private: |
| - base::PlatformThreadHandle thread_; |
| - AudioDeviceThread::Callback* callback_; |
| - base::CancelableSyncSocket socket_; |
| - base::Lock callback_lock_; |
| + // Ensures that Start(), Resume() and Pause() are called on the same thread. |
| + // Stop() can be called on any thread. |
| + base::ThreadChecker thread_checker_; |
| + |
| const char* thread_name_; |
| const bool synchronized_buffers_; |
| + base::Lock lock_; // Protects everything below. |
| + base::ConditionVariable exit_paused_state_; |
| + State state_; |
| + AudioDeviceThread::Callback* callback_; |
| + |
| + base::PlatformThreadHandle thread_handle_; |
| + |
| + // Reset in ThreadMain(). Shutdown on client thread. |
|
tommi (sloooow) - chröme
2016/05/14 11:06:27
what does 'client thread' mean? is it the thread_
Henrik Grunell
2016/05/16 14:37:36
It's actually any other thread since Stop() may be
|
| + std::unique_ptr<base::CancelableSyncSocket> socket_; |
| + |
| + // Set on client thead. |
|
tommi (sloooow) - chröme
2016/05/14 11:06:27
thread
Henrik Grunell
2016/05/16 14:37:35
I removed this comment, I don't think it adds anyt
|
| + base::SyncSocket::Handle new_socket_handle_; |
| + |
| DISALLOW_COPY_AND_ASSIGN(Thread); |
| }; |
| // AudioDeviceThread implementation |
| +AudioDeviceThread::AudioDeviceThread() {} |
| -AudioDeviceThread::AudioDeviceThread() { |
| +AudioDeviceThread::~AudioDeviceThread() { |
| + DCHECK(!thread_.get()); |
| } |
| -AudioDeviceThread::~AudioDeviceThread() { DCHECK(!thread_.get()); } |
| - |
| -void AudioDeviceThread::Start(AudioDeviceThread::Callback* callback, |
| - base::SyncSocket::Handle socket, |
| - const char* thread_name, |
| +void AudioDeviceThread::Start(const char* thread_name, |
| bool synchronized_buffers) { |
| base::AutoLock auto_lock(thread_lock_); |
|
tommi (sloooow) - chröme
2016/05/14 11:06:27
are Start()/Stop()/Resume()/Pause() not always cal
Henrik Grunell
2016/05/16 14:37:35
Unfortunately no. AudioInputDevice still calls Sto
|
| CHECK(!thread_.get()); |
| - thread_ = new AudioDeviceThread::Thread( |
| - callback, socket, thread_name, synchronized_buffers); |
| + thread_ = new AudioDeviceThread::Thread(thread_name, synchronized_buffers); |
|
tommi (sloooow) - chröme
2016/05/14 11:06:27
if thread_name and synchronize_buffers will always
Henrik Grunell
2016/05/16 14:37:35
It can be in the ctor. I'd like to not change that
|
| thread_->Start(); |
| } |
| @@ -88,63 +111,131 @@ void AudioDeviceThread::Stop(base::MessageLoop* loop_for_join) { |
| base::AutoLock auto_lock(thread_lock_); |
| if (thread_.get()) { |
| thread_->Stop(loop_for_join); |
| - thread_ = NULL; |
| + thread_ = nullptr; |
| } |
| } |
| +void AudioDeviceThread::Resume(AudioDeviceThread::Callback* callback, |
| + base::SyncSocket::Handle socket) { |
| + base::AutoLock auto_lock(thread_lock_); |
| + if (thread_.get()) |
| + thread_->Resume(callback, socket); |
| +} |
| + |
| +void AudioDeviceThread::Pause() { |
| + base::AutoLock auto_lock(thread_lock_); |
| + if (thread_.get()) |
| + thread_->Pause(); |
| +} |
| + |
| bool AudioDeviceThread::IsStopped() { |
| base::AutoLock auto_lock(thread_lock_); |
| return !thread_.get(); |
| } |
| // AudioDeviceThread::Thread implementation |
| -AudioDeviceThread::Thread::Thread(AudioDeviceThread::Callback* callback, |
| - base::SyncSocket::Handle socket, |
| - const char* thread_name, |
| +AudioDeviceThread::Thread::Thread(const char* thread_name, |
| bool synchronized_buffers) |
| - : thread_(), |
| - callback_(callback), |
| - socket_(socket), |
| - thread_name_(thread_name), |
| - synchronized_buffers_(synchronized_buffers) { |
| + : thread_name_(thread_name), |
| + synchronized_buffers_(synchronized_buffers), |
| + exit_paused_state_(&lock_), |
| + state_(STATE_PAUSED), |
| + callback_(nullptr), |
| + new_socket_handle_(base::CancelableSyncSocket::kInvalidHandle) { |
| + thread_checker_.DetachFromThread(); |
|
tommi (sloooow) - chröme
2016/05/14 11:06:27
why this detach? From what I can tell, Start() is
Henrik Grunell
2016/05/16 14:37:35
Indeed. removed.
|
| } |
| AudioDeviceThread::Thread::~Thread() { |
| - DCHECK(thread_.is_null()); |
| + DCHECK(thread_handle_.is_null()); |
| + DCHECK_EQ(state_, STATE_STOPPED) |
| + << "Thread must be stopped before destruction"; |
| } |
| void AudioDeviceThread::Thread::Start() { |
| - base::AutoLock auto_lock(callback_lock_); |
| - DCHECK(thread_.is_null()); |
| - // This reference will be released when the thread exists. |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + |
| + base::AutoLock auto_lock(lock_); |
| + DCHECK(thread_handle_.is_null()) << "Calling Start() on a started thread?"; |
| + if (state_ == STATE_STOPPED) |
| + return; // Stop has already been executed on another thread. |
| + |
| + // This reference will be released when the thread exits. |
| AddRef(); |
| - PlatformThread::CreateWithPriority(0, this, &thread_, |
| + PlatformThread::CreateWithPriority(0, this, &thread_handle_, |
| base::ThreadPriority::REALTIME_AUDIO); |
| - CHECK(!thread_.is_null()); |
| + CHECK(!thread_handle_.is_null()); |
| } |
| void AudioDeviceThread::Thread::Stop(base::MessageLoop* loop_for_join) { |
| - socket_.Shutdown(); |
| + // Note: Stop() can be called at any time, even before Start(), because it's |
| + // allowed to be executed on a different thread. |
|
tommi (sloooow) - chröme
2016/05/14 11:06:28
Is that something we can fix/avoid?
Henrik Grunell
2016/05/16 14:37:35
AudioOutputDevice doesn't do this, but AudioInputD
|
| - base::PlatformThreadHandle thread = base::PlatformThreadHandle(); |
| + base::PlatformThreadHandle thread_handle_to_join = |
| + base::PlatformThreadHandle(); |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + DCHECK(state_ != STATE_STOPPED) << "Calling Stop() on a stopped thread?"; |
|
tommi (sloooow) - chröme
2016/05/14 11:06:28
nit: DCHECK_NE
Henrik Grunell
2016/05/16 14:37:35
Done.
|
| + callback_ = nullptr; |
| - { // NOLINT |
| - base::AutoLock auto_lock(callback_lock_); |
| - callback_ = NULL; |
| - std::swap(thread, thread_); |
| - } |
| + // |socket_| is only set/reset under the |lock_|, so it's safe to |
| + // access under that lock here. |
| + if (socket_) |
| + socket_->Shutdown(); |
| - if (!thread.is_null()) { |
| + // Must be done under lock to avoid race with Start(). |
| + std::swap(thread_handle_to_join, thread_handle_); |
| + |
| + state_ = STATE_STOPPED; |
| + exit_paused_state_.Signal(); |
| + } // auto_lock(lock_) |
| + |
| + if (!thread_handle_to_join.is_null()) { |
| if (loop_for_join) { |
| - loop_for_join->PostTask(FROM_HERE, |
| - base::Bind(&base::PlatformThread::Join, thread)); |
| + loop_for_join->PostTask(FROM_HERE, base::Bind(&base::PlatformThread::Join, |
| + thread_handle_to_join)); |
| } else { |
| - base::PlatformThread::Join(thread); |
| + base::PlatformThread::Join(thread_handle_to_join); |
| } |
| } |
| } |
| +void AudioDeviceThread::Thread::Resume(AudioDeviceThread::Callback* callback, |
| + base::SyncSocket::Handle socket_handle) { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + |
| + base::AutoLock auto_lock(lock_); |
| + if (state_ == STATE_STOPPED) |
| + return; // Stop is the last operation to be executed. |
| + |
| + DCHECK(state_ == STATE_PAUSED) << "Resume a non-paused thread?"; |
|
tommi (sloooow) - chröme
2016/05/14 11:06:28
nit: DCHECK_EQ
Henrik Grunell
2016/05/16 14:37:35
Done.
|
| + |
| + DCHECK_EQ(new_socket_handle_, base::CancelableSyncSocket::kInvalidHandle); |
| + |
| + callback_ = callback; |
| + new_socket_handle_ = socket_handle; |
| + |
| + state_ = STATE_PLAYING; |
| + exit_paused_state_.Signal(); |
|
tommi (sloooow) - chröme
2016/05/14 11:06:27
any chance that this may already be signaled and w
Henrik Grunell
2016/05/16 14:37:35
You tell me. :D When we enter wait we have the loc
|
| +} |
| + |
| +void AudioDeviceThread::Thread::Pause() { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + |
| + base::AutoLock auto_lock(lock_); |
| + if (state_ == STATE_STOPPED) |
| + return; // Stop is the last operation to be executed. |
| + |
| + DCHECK(state_ == STATE_PLAYING) << "Pause a paused thread?"; |
| + callback_ = nullptr; |
| + |
| + // |socket_| is only set/reset under the |lock_|, so it's safe to |
| + // access under that lock here. |
| + if (socket_) |
| + socket_->Shutdown(); |
|
tommi (sloooow) - chröme
2016/05/14 11:06:27
nit: would it be worth it to swap the socket to a
Henrik Grunell
2016/05/16 14:37:35
Gut feeling says no. Apart from that, I haven't lo
|
| + state_ = STATE_PAUSED; |
| +} |
| + |
| void AudioDeviceThread::Thread::ThreadMain() { |
| PlatformThread::SetName(thread_name_); |
| @@ -154,13 +245,38 @@ void AudioDeviceThread::Thread::ThreadMain() { |
| // by another thread on which singleton access is OK as well. |
| base::ThreadRestrictions::SetSingletonAllowed(true); |
| - { // NOLINT |
| - base::AutoLock auto_lock(callback_lock_); |
| - if (callback_) |
| - callback_->InitializeOnAudioThread(); |
| - } |
| + while (true) { |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + while (state_ == STATE_PAUSED) { |
| + // This releases |lock_| while waiting. |
| + exit_paused_state_.Wait(); |
| + } |
| + |
| + if (state_ == STATE_STOPPED) |
| + break; |
| + |
| + // If we are playing but don't have a valid new socket handle, we are here |
| + // because the socket receive or send failed for another reason than |
| + // Shutdown() being called in Stop() or Pause(). In that case, quit the |
| + // thread. |
| + if (new_socket_handle_ == base::CancelableSyncSocket::kInvalidHandle) { |
| + LOG(WARNING) << "Audio socket failed, exiting audio thread."; |
| + break; |
| + } |
| + |
| + // Else reinitialize and resume. |
| + |
| + socket_.reset(new base::CancelableSyncSocket(new_socket_handle_)); |
| + |
| + if (callback_) |
| + callback_->InitializeOnAudioThread(); |
| + |
| + new_socket_handle_ = base::CancelableSyncSocket::kInvalidHandle; |
| + } // auto_lock(lock_) |
| - Run(); |
| + Run(); |
| + } // while |
| // Release the reference for the thread. Note that after this, the Thread |
| // instance will most likely be deleted. |
| @@ -169,9 +285,13 @@ void AudioDeviceThread::Thread::ThreadMain() { |
| void AudioDeviceThread::Thread::Run() { |
| uint32_t buffer_index = 0; |
| + |
| while (true) { |
| uint32_t pending_data = 0; |
| - size_t bytes_read = socket_.Receive(&pending_data, sizeof(pending_data)); |
| + // |socket_| is only modified on this thread, so we don't need a lock to |
| + // access it here. |
| + size_t bytes_read = socket_->Receive(&pending_data, sizeof(pending_data)); |
| + |
| if (bytes_read != sizeof(pending_data)) |
| break; |
| @@ -184,7 +304,10 @@ void AudioDeviceThread::Thread::Run() { |
| // |
| // See comments in AudioOutputController::DoPause() for details on why. |
| if (pending_data != std::numeric_limits<uint32_t>::max()) { |
| - base::AutoLock auto_lock(callback_lock_); |
| + // We can't use an atomic pointer for |callback_|, but need to call on it |
| + // under the state lock. This is because after clearing it in Pause() or |
| + // Stop(), the caller may delete the callback object. |
| + base::AutoLock auto_lock(lock_); |
| if (callback_) |
| callback_->Process(pending_data); |
| } |
| @@ -201,11 +324,12 @@ void AudioDeviceThread::Thread::Run() { |
| // details on how this works see AudioSyncReader::WaitUntilDataIsReady(). |
| if (synchronized_buffers_) { |
| ++buffer_index; |
| - size_t bytes_sent = socket_.Send(&buffer_index, sizeof(buffer_index)); |
| + size_t bytes_sent = socket_->Send(&buffer_index, sizeof(buffer_index)); |
| + |
| if (bytes_sent != sizeof(buffer_index)) |
| break; |
| } |
| - } |
| + } // while |
| } |
| // AudioDeviceThread::Callback implementation |