Index: chromecast/media/audio/cast_audio_mixer.cc |
diff --git a/chromecast/media/audio/cast_audio_mixer.cc b/chromecast/media/audio/cast_audio_mixer.cc |
index 4d236a1cb9f9f37aba7eef50212ccaed8d4a2c54..a7ed6f5bc1682434c460ad263ea0374ca1d52f0b 100644 |
--- a/chromecast/media/audio/cast_audio_mixer.cc |
+++ b/chromecast/media/audio/cast_audio_mixer.cc |
@@ -4,7 +4,9 @@ |
#include "chromecast/media/audio/cast_audio_mixer.h" |
+#include "base/bind.h" |
#include "base/logging.h" |
+#include "base/memory/ptr_util.h" |
#include "chromecast/media/audio/cast_audio_manager.h" |
#include "chromecast/media/audio/cast_audio_output_stream.h" |
#include "media/base/audio_timestamp_helper.h" |
@@ -21,30 +23,34 @@ namespace media { |
class CastAudioMixer::MixerProxyStream |
: public ::media::AudioOutputStream, |
- private ::media::AudioConverter::InputCallback { |
+ public ::media::AudioConverter::InputCallback { |
public: |
MixerProxyStream(const ::media::AudioParameters& input_params, |
const ::media::AudioParameters& output_params, |
- CastAudioManager* audio_manager, |
CastAudioMixer* audio_mixer) |
- : audio_manager_(audio_manager), |
- audio_mixer_(audio_mixer), |
- source_callback_(nullptr), |
+ : audio_mixer_(audio_mixer), |
input_params_(input_params), |
output_params_(output_params), |
opened_(false), |
- volume_(1.0) {} |
+ volume_(1.0), |
+ source_callback_(nullptr) { |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
+ } |
- ~MixerProxyStream() override {} |
+ ~MixerProxyStream() override { |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
+ } |
void OnError() { |
- DCHECK(audio_manager_->GetTaskRunner()->BelongsToCurrentThread()); |
- |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
if (source_callback_) |
source_callback_->OnError(); |
} |
private: |
+ // ResamplerProxy is an intermediate filter between MixerProxyStream and |
+ // CastAudioMixer::output_stream_ whose only responsibility is to resample |
+ // audio to the sample rate expected by CastAudioMixer::output_stream_. |
class ResamplerProxy : public ::media::AudioConverter::InputCallback { |
public: |
ResamplerProxy(::media::AudioConverter::InputCallback* input_callback, |
@@ -53,6 +59,7 @@ class CastAudioMixer::MixerProxyStream |
resampler_.reset( |
new ::media::AudioConverter(input_params, output_params, false)); |
resampler_->AddInput(input_callback); |
+ DETACH_FROM_THREAD(backend_thread_checker_); |
} |
~ResamplerProxy() override {} |
@@ -61,20 +68,21 @@ class CastAudioMixer::MixerProxyStream |
// ::media::AudioConverter::InputCallback implementation |
double ProvideInput(::media::AudioBus* audio_bus, |
uint32_t frames_delayed) override { |
+ DCHECK_CALLED_ON_VALID_THREAD(backend_thread_checker_); |
resampler_->ConvertWithDelay(frames_delayed, audio_bus); |
- |
// Volume multiplier has already been applied by |resampler_|. |
return 1.0; |
} |
std::unique_ptr<::media::AudioConverter> resampler_; |
+ THREAD_CHECKER(backend_thread_checker_); |
DISALLOW_COPY_AND_ASSIGN(ResamplerProxy); |
}; |
// ::media::AudioOutputStream implementation |
bool Open() override { |
- DCHECK(audio_manager_->GetTaskRunner()->BelongsToCurrentThread()); |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
::media::AudioParameters::Format format = input_params_.format(); |
DCHECK((format == ::media::AudioParameters::AUDIO_PCM_LINEAR) || |
@@ -93,46 +101,52 @@ class CastAudioMixer::MixerProxyStream |
} |
void Close() override { |
- DCHECK(audio_manager_->GetTaskRunner()->BelongsToCurrentThread()); |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
if (proxy_) |
Stop(); |
if (opened_) |
audio_mixer_->Unregister(this); |
+ |
// Signal to the manager that we're closed and can be removed. |
// This should be the last call in the function as it deletes "this". |
- audio_manager_->ReleaseOutputStream(this); |
+ audio_mixer_->audio_manager_->ReleaseOutputStream(this); |
} |
void Start(AudioSourceCallback* source_callback) override { |
- DCHECK(audio_manager_->GetTaskRunner()->BelongsToCurrentThread()); |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
DCHECK(source_callback); |
if (!opened_ || proxy_) |
return; |
source_callback_ = source_callback; |
- proxy_.reset(new ResamplerProxy(this, input_params_, output_params_)); |
+ proxy_ = |
+ base::MakeUnique<ResamplerProxy>(this, input_params_, output_params_); |
audio_mixer_->AddInput(proxy_.get()); |
} |
void Stop() override { |
- DCHECK(audio_manager_->GetTaskRunner()->BelongsToCurrentThread()); |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
if (!proxy_) |
return; |
audio_mixer_->RemoveInput(proxy_.get()); |
+ // Once the above function returns it is guaranteed that proxy_ or |
+ // source_callback_ would not be used on the backend thread, so it is safe |
+ // to reset them. |
proxy_.reset(); |
source_callback_ = nullptr; |
} |
void SetVolume(double volume) override { |
- DCHECK(audio_manager_->GetTaskRunner()->BelongsToCurrentThread()); |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
+ base::AutoLock auto_lock(volume_lock_); |
volume_ = volume; |
} |
void GetVolume(double* volume) override { |
- DCHECK(audio_manager_->GetTaskRunner()->BelongsToCurrentThread()); |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
*volume = volume_; |
} |
@@ -140,56 +154,57 @@ class CastAudioMixer::MixerProxyStream |
// ::media::AudioConverter::InputCallback implementation |
double ProvideInput(::media::AudioBus* audio_bus, |
uint32_t frames_delayed) override { |
+ // Called on backend thread. Member variables accessed from both backend |
+ // and audio thread must be thread-safe. |
DCHECK(source_callback_); |
const base::TimeDelta delay = ::media::AudioTimestampHelper::FramesToTime( |
frames_delayed, input_params_.sample_rate()); |
source_callback_->OnMoreData(delay, base::TimeTicks::Now(), 0, audio_bus); |
+ |
+ base::AutoLock auto_lock(volume_lock_); |
return volume_; |
} |
- CastAudioManager* const audio_manager_; |
CastAudioMixer* const audio_mixer_; |
- |
- std::unique_ptr<ResamplerProxy> proxy_; |
- AudioSourceCallback* source_callback_; |
const ::media::AudioParameters input_params_; |
const ::media::AudioParameters output_params_; |
+ |
bool opened_; |
double volume_; |
+ base::Lock volume_lock_; |
+ AudioSourceCallback* source_callback_; |
+ std::unique_ptr<ResamplerProxy> proxy_; |
+ THREAD_CHECKER(audio_thread_checker_); |
DISALLOW_COPY_AND_ASSIGN(MixerProxyStream); |
}; |
-CastAudioMixer::CastAudioMixer(const RealStreamFactory& real_stream_factory) |
- : error_(false), |
- real_stream_factory_(real_stream_factory), |
- output_stream_(nullptr) { |
+CastAudioMixer::CastAudioMixer(CastAudioManager* audio_manager) |
+ : audio_manager_(audio_manager), error_(false), output_stream_(nullptr) { |
output_params_ = ::media::AudioParameters( |
::media::AudioParameters::Format::AUDIO_PCM_LOW_LATENCY, |
::media::CHANNEL_LAYOUT_STEREO, kSampleRate, kBitsPerSample, |
kFramesPerBuffer); |
mixer_.reset( |
new ::media::AudioConverter(output_params_, output_params_, false)); |
- thread_checker_.DetachFromThread(); |
+ DETACH_FROM_THREAD(audio_thread_checker_); |
} |
CastAudioMixer::~CastAudioMixer() {} |
::media::AudioOutputStream* CastAudioMixer::MakeStream( |
- const ::media::AudioParameters& params, |
- CastAudioManager* audio_manager) { |
- return new MixerProxyStream(params, output_params_, audio_manager, this); |
+ const ::media::AudioParameters& params) { |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
+ return new MixerProxyStream(params, output_params_, this); |
} |
bool CastAudioMixer::Register(MixerProxyStream* proxy_stream) { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
DCHECK(std::find(proxy_streams_.begin(), proxy_streams_.end(), |
proxy_stream) == proxy_streams_.end()); |
- // If the mixer is in an error state, do not allow any new |
- // registrations until every active stream has been |
- // unregistered. |
+ // Do not allow opening new streams while in error state. |
if (error_) |
return false; |
@@ -198,7 +213,7 @@ bool CastAudioMixer::Register(MixerProxyStream* proxy_stream) { |
// is not opened properly. |
if (proxy_streams_.empty()) { |
DCHECK(!output_stream_); |
- output_stream_ = real_stream_factory_.Run(output_params_); |
+ output_stream_ = audio_manager_->MakeMixerOutputStream(output_params_); |
if (!output_stream_->Open()) { |
output_stream_->Close(); |
output_stream_ = nullptr; |
@@ -206,17 +221,16 @@ bool CastAudioMixer::Register(MixerProxyStream* proxy_stream) { |
} |
} |
- proxy_streams_.push_back(proxy_stream); |
+ proxy_streams_.insert(proxy_stream); |
return true; |
} |
void CastAudioMixer::Unregister(MixerProxyStream* proxy_stream) { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
- auto it = |
- std::find(proxy_streams_.begin(), proxy_streams_.end(), proxy_stream); |
- DCHECK(it != proxy_streams_.end()); |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
+ DCHECK(std::find(proxy_streams_.begin(), proxy_streams_.end(), |
+ proxy_stream) != proxy_streams_.end()); |
- proxy_streams_.erase(it); |
+ proxy_streams_.erase(proxy_stream); |
// Reset the state once all streams have been unregistered. |
if (proxy_streams_.empty()) { |
@@ -230,19 +244,25 @@ void CastAudioMixer::Unregister(MixerProxyStream* proxy_stream) { |
void CastAudioMixer::AddInput( |
::media::AudioConverter::InputCallback* input_callback) { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
// Start the backend if there are no current inputs. |
if (mixer_->empty() && output_stream_) |
output_stream_->Start(this); |
+ |
+ base::AutoLock auto_lock(mixer_lock_); |
mixer_->AddInput(input_callback); |
} |
void CastAudioMixer::RemoveInput( |
::media::AudioConverter::InputCallback* input_callback) { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
+ |
+ { |
+ base::AutoLock auto_lock(mixer_lock_); |
+ mixer_->RemoveInput(input_callback); |
+ } |
- mixer_->RemoveInput(input_callback); |
// Stop |output_stream_| if there are no inputs and the stream is running. |
if (mixer_->empty() && output_stream_) |
output_stream_->Stop(); |
@@ -252,36 +272,28 @@ int CastAudioMixer::OnMoreData(base::TimeDelta delay, |
base::TimeTicks /* delay_timestamp */, |
int /* prior_frames_skipped */, |
::media::AudioBus* dest) { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
- |
+ // Called on backend thread. |
uint32_t frames_delayed = ::media::AudioTimestampHelper::TimeToFrames( |
delay, output_params_.sample_rate()); |
+ |
+ base::AutoLock auto_lock(mixer_lock_); |
mixer_->ConvertWithDelay(frames_delayed, dest); |
return dest->frames(); |
} |
void CastAudioMixer::OnError() { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
- |
- // TODO(ameyak): Add rate limiting. If errors are seen to occur |
- // above some arbitrary value in a specified amount |
- // of time, signal errors to all currently active |
- // streams and close. |
+ // Called on backend thread. |
+ audio_manager_->GetTaskRunner()->PostTask( |
+ FROM_HERE, |
+ base::BindOnce(&CastAudioMixer::HandleError, base::Unretained(this))); |
+} |
- output_stream_->Stop(); |
- output_stream_->Close(); |
- output_stream_ = real_stream_factory_.Run(output_params_); |
+void CastAudioMixer::HandleError() { |
+ DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_); |
- if (output_stream_->Open()) { |
- output_stream_->Start(this); |
- } else { |
- // Assume error state |
- output_stream_->Close(); |
- output_stream_ = nullptr; |
- error_ = true; |
- for (auto it = proxy_streams_.begin(); it != proxy_streams_.end(); it++) |
- (*it)->OnError(); |
- } |
+ error_ = true; |
+ for (auto it = proxy_streams_.begin(); it != proxy_streams_.end(); ++it) |
+ (*it)->OnError(); |
} |
} // namespace media |