Chromium Code Reviews| Index: chrome/browser/media/cast_remoting_sender.cc |
| diff --git a/chrome/browser/media/cast_remoting_sender.cc b/chrome/browser/media/cast_remoting_sender.cc |
| index f24aed064a12e9c121dc69c55091b7062ab511c9..2e759888a875a5e61ec26a01a36c654c3c10daf9 100644 |
| --- a/chrome/browser/media/cast_remoting_sender.cc |
| +++ b/chrome/browser/media/cast_remoting_sender.cc |
| @@ -4,6 +4,7 @@ |
| #include "chrome/browser/media/cast_remoting_sender.h" |
| +#include <algorithm> |
| #include <map> |
| #include "base/bind.h" |
| @@ -12,15 +13,18 @@ |
| #include "base/lazy_instance.h" |
| #include "base/memory/ptr_util.h" |
| #include "base/numerics/safe_conversions.h" |
| +#include "base/stl_util.h" |
| #include "base/trace_event/trace_event.h" |
| #include "content/public/browser/browser_thread.h" |
| #include "media/base/bind_to_current_loop.h" |
| #include "media/cast/constants.h" |
| +using content::BrowserThread; |
| + |
| namespace { |
| // Global map for looking-up CastRemotingSender instances by their |
| -// |remoting_stream_id_|. |
| +// |rtp_stream_id|. |
| using CastRemotingSenderMap = std::map<int32_t, cast::CastRemotingSender*>; |
| base::LazyInstance<CastRemotingSenderMap>::Leaky g_sender_map = |
| LAZY_INSTANCE_INITIALIZER; |
| @@ -28,7 +32,6 @@ base::LazyInstance<CastRemotingSenderMap>::Leaky g_sender_map = |
| constexpr base::TimeDelta kMinSchedulingDelay = |
| base::TimeDelta::FromMilliseconds(1); |
| constexpr base::TimeDelta kMaxAckDelay = base::TimeDelta::FromMilliseconds(800); |
| -constexpr base::TimeDelta kMinAckDelay = base::TimeDelta::FromMilliseconds(400); |
| constexpr base::TimeDelta kReceiverProcessTime = |
| base::TimeDelta::FromMilliseconds(250); |
| @@ -72,39 +75,92 @@ CastRemotingSender::CastRemotingSender( |
| scoped_refptr<media::cast::CastEnvironment> cast_environment, |
| media::cast::CastTransport* transport, |
| const media::cast::CastTransportRtpConfig& config) |
| - : remoting_stream_id_(config.rtp_stream_id), |
| + : rtp_stream_id_(config.rtp_stream_id), |
| cast_environment_(std::move(cast_environment)), |
| transport_(transport), |
| ssrc_(config.ssrc), |
| is_audio_(config.rtp_payload_type == |
| media::cast::RtpPayloadType::REMOTE_AUDIO), |
| + binding_(this), |
| max_ack_delay_(kMaxAckDelay), |
| last_sent_frame_id_(media::cast::FrameId::first() - 1), |
| latest_acked_frame_id_(media::cast::FrameId::first() - 1), |
| duplicate_ack_counter_(0), |
| - last_frame_was_canceled_(false), |
| + flow_control_state_(STARTING), |
| weak_factory_(this) { |
| + // Confirm this constructor is running on the IO BrowserThread and the |
| + // CastEnvironment::MAIN thread is the same thread. |
| + DCHECK_CURRENTLY_ON(BrowserThread::IO); |
| DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| - CastRemotingSender*& pointer_in_map = g_sender_map.Get()[remoting_stream_id_]; |
| + CastRemotingSender*& pointer_in_map = g_sender_map.Get()[rtp_stream_id_]; |
| DCHECK(!pointer_in_map); |
| pointer_in_map = this; |
| + |
| transport_->InitializeStream( |
| config, base::MakeUnique<RemotingRtcpClient>(weak_factory_.GetWeakPtr())); |
| } |
| CastRemotingSender::~CastRemotingSender() { |
| - DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| + DCHECK_CURRENTLY_ON(BrowserThread::IO); |
| + g_sender_map.Pointer()->erase(rtp_stream_id_); |
| +} |
| + |
| +// static |
| +void CastRemotingSender::FindAndBind( |
| + int32_t rtp_stream_id, |
| + mojo::ScopedDataPipeConsumerHandle pipe, |
| + media::mojom::RemotingDataStreamSenderRequest request, |
| + const base::Closure& error_callback) { |
| + // CastRemotingSender lives entirely on the IO thread, so trampoline if |
| + // necessary. |
| + if (!BrowserThread::CurrentlyOn(BrowserThread::IO)) { |
| + BrowserThread::PostTask( |
| + BrowserThread::IO, FROM_HERE, |
| + base::Bind(&CastRemotingSender::FindAndBind, rtp_stream_id, |
| + base::Passed(&pipe), base::Passed(&request), |
| + // Using media::BindToCurrentLoop() so the |error_callback| |
| + // is trampolined back to the original thread. |
| + media::BindToCurrentLoop(error_callback))); |
| + return; |
| + } |
| + |
| + DCHECK(!error_callback.is_null()); |
| + |
| + // Look-up the CastRemotingSender instance by its |rtp_stream_id|. |
| + const auto it = g_sender_map.Pointer()->find(rtp_stream_id); |
| + if (it == g_sender_map.Pointer()->end()) { |
| + DLOG(ERROR) << "Cannot find CastRemotingSender instance by ID: " |
| + << rtp_stream_id; |
| + error_callback.Run(); |
| + return; |
| + } |
| + CastRemotingSender* const sender = it->second; |
| + |
| + // Confirm that the CastRemotingSender isn't already bound to a message pipe. |
| + if (sender->binding_.is_bound()) { |
| + DLOG(ERROR) << "Attempt to bind to CastRemotingSender a second time (id=" |
| + << rtp_stream_id << ")!"; |
| + error_callback.Run(); |
| + return; |
| + } |
| - g_sender_map.Pointer()->erase(remoting_stream_id_); |
| + DCHECK(sender->error_callback_.is_null()); |
| + sender->error_callback_ = error_callback; |
| + |
| + sender->pipe_ = std::move(pipe); |
| + sender->binding_.Bind(std::move(request)); |
| + sender->binding_.set_connection_error_handler( |
| + base::Bind(&base::Closure::Run, |
| + base::Unretained(&sender->error_callback_))); |
|
dcheng
2016/09/20 08:20:12
Does set_connection_error(sender->error_callback_)
miu
2016/09/21 03:15:50
Whoops! Left-overs, ya know? Done.
|
| } |
| void CastRemotingSender::OnReceivedRtt(base::TimeDelta round_trip_time) { |
| DCHECK_GT(round_trip_time, base::TimeDelta()); |
| current_round_trip_time_ = round_trip_time; |
| - max_ack_delay_ = 2 * current_round_trip_time_ + kReceiverProcessTime; |
| - max_ack_delay_ = |
| - std::max(std::min(max_ack_delay_, kMaxAckDelay), kMinAckDelay); |
| + max_ack_delay_ = 2 * std::max(current_round_trip_time_, base::TimeDelta()) + |
| + kReceiverProcessTime; |
| + max_ack_delay_ = std::min(max_ack_delay_, kMaxAckDelay); |
| } |
| void CastRemotingSender::ResendCheck() { |
| @@ -243,15 +299,78 @@ void CastRemotingSender::OnReceivedCastMessage( |
| current_round_trip_time_.InMicroseconds()); |
| } while (latest_acked_frame_id_ < cast_feedback.ack_frame_id); |
| transport_->CancelSendingFrames(ssrc_, frames_to_cancel); |
| + |
| + if (flow_control_state_ == CONSUME_PAUSED) { |
| + // One or more frames were canceled, so resume reading from the Mojo data |
| + // pipes and sending frames. |
| + binding_.ResumeIncomingMethodCallProcessing(); |
| + flow_control_state_ = CONSUMING; |
| + VLOG(1) << SENDER_SSRC |
| + << "Now CONSUMING because one or more frames were ACK'ed."; |
| + |
| + // The last call to SendFrame() placed the sender into the CONSUME_PAUSED |
| + // state without sending the frame. So, send that frame now. |
| + SendFrame(); |
| + } |
| } |
| } |
| +void CastRemotingSender::ConsumeDataChunk(uint32_t offset, uint32_t size, |
| + uint32_t total_payload_size) { |
| + DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| + |
| + // Method calls from Mojo should not be happening when in the paused state. |
| + DCHECK_NE(flow_control_state_, CONSUME_PAUSED); |
|
dcheng
2016/09/20 08:20:12
why is that? What stops these messages from being
miu
2016/09/21 03:15:50
Short answer: We call binding_.PauseIncomingMethod
dcheng
2016/09/22 02:29:15
OK,thanks, I just wanted to make sure we weren't r
|
| + |
| + // If |total_payload_size| has changed, resize the data string. If it has not |
| + // changed, the following statement will be a no-op. |
| + next_frame_data_.resize(total_payload_size); |
| + |
| + do { |
| + if (!pipe_.is_valid()) { |
| + VLOG(1) << SENDER_SSRC << "Data pipe handle no longer valid."; |
| + break; |
| + } |
| + |
| + if (offset + size > total_payload_size) { |
| + LOG(DFATAL) |
|
dcheng
2016/09/20 08:20:12
We probably shouldn't dcheck here, this is coming
miu
2016/09/21 03:15:50
Relaxed to ERROR level.
|
| + << "BUG: offset + size > total_payload_size (" |
| + << offset << " + " << size << " > " << total_payload_size << ')'; |
| + break; |
| + } |
| + |
| + uint32_t num_bytes = size; |
| + const MojoResult result = mojo::ReadDataRaw( |
| + pipe_.get(), base::string_as_array(&next_frame_data_) + offset, |
| + &num_bytes, MOJO_READ_DATA_FLAG_ALL_OR_NONE); |
| + if (result != MOJO_RESULT_OK) { |
|
dcheng
2016/09/20 08:20:12
Is it possible for MOJO_RESULT_OUT_OF_RANGE to be
miu
2016/09/21 03:15:50
This should never happen. The render process is al
dcheng
2016/09/22 02:29:15
Right, it's not clear to me if there's synchroniza
miu
2016/09/22 08:39:14
Good catch on this, and rockot@ confirmed.
I re-w
|
| + LOG(DFATAL) << "BUG: Read from data pipe failed (" << result << ')'; |
| + break; |
| + } |
| + |
| + return; // Success. |
| + } while (false); |
| + |
| + // If this point is reached, there was a fatal error. Shut things down and run |
| + // the error callback. |
| + pipe_.reset(); |
| + binding_.Close(); |
| + error_callback_.Run(); |
| +} |
| + |
| void CastRemotingSender::SendFrame() { |
| DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| + // Method calls from Mojo should not be happening when in the paused state. |
| + DCHECK_NE(flow_control_state_, CONSUME_PAUSED); |
| + |
| + // If there would be too many frames in-flight, suspend consuming more input |
| + // from the Mojo data pipes until one or more frames are ACKed. |
| if (NumberOfFramesInFlight() >= media::cast::kMaxUnackedFrames) { |
| - // TODO(xjz): Delay the sending of this frame and stop reading the next |
| - // frame data. |
| + binding_.PauseIncomingMethodCallProcessing(); |
| + flow_control_state_ = CONSUME_PAUSED; |
| + VLOG(1) << SENDER_SSRC |
| + << "Now CONSUME_PAUSED because too many frames are in flight."; |
| return; |
| } |
| @@ -275,10 +394,13 @@ void CastRemotingSender::SendFrame() { |
| media::cast::EncodedFrame remoting_frame; |
| remoting_frame.frame_id = frame_id; |
| - remoting_frame.dependency = |
| - (is_first_frame_to_be_sent || last_frame_was_canceled_) |
| - ? media::cast::EncodedFrame::KEY |
| - : media::cast::EncodedFrame::DEPENDENT; |
| + if (flow_control_state_ == STARTING) { |
| + remoting_frame.dependency = media::cast::EncodedFrame::KEY; |
| + flow_control_state_ = CONSUMING; |
| + } else { |
| + DCHECK(!is_first_frame_to_be_sent); |
| + remoting_frame.dependency = media::cast::EncodedFrame::DEPENDENT; |
| + } |
| remoting_frame.referenced_frame_id = |
| remoting_frame.dependency == media::cast::EncodedFrame::KEY |
| ? frame_id |
| @@ -317,16 +439,24 @@ void CastRemotingSender::SendFrame() { |
| cast_environment_->logger()->DispatchFrameEvent(std::move(remoting_event)); |
| RecordLatestFrameTimestamps(frame_id, remoting_frame.rtp_timestamp); |
| - last_frame_was_canceled_ = false; |
| transport_->InsertFrame(ssrc_, remoting_frame); |
| } |
| -void CastRemotingSender::CancelFramesInFlight() { |
| +void CastRemotingSender::CancelInFlightData() { |
| DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| + // Method calls from Mojo should not be happening when in the paused state. |
| + DCHECK_NE(flow_control_state_, CONSUME_PAUSED); |
| + |
| base::STLClearObject(&next_frame_data_); |
| + // TODO(miu): The following code is something we want to do as an |
| + // optimization. However, as-is, it's not quite correct. We can only cancel |
| + // frames where no packets have actually hit the network yet. Said another |
| + // way, we can only cancel frames the receiver has definitely not seen any |
| + // part of (including kickstarting!). http://crbug.com/647423 |
| +#if 0 |
| if (latest_acked_frame_id_ < last_sent_frame_id_) { |
| std::vector<media::cast::FrameId> frames_to_cancel; |
| do { |
| @@ -335,8 +465,11 @@ void CastRemotingSender::CancelFramesInFlight() { |
| } while (latest_acked_frame_id_ < last_sent_frame_id_); |
| transport_->CancelSendingFrames(ssrc_, frames_to_cancel); |
| } |
| +#endif |
| - last_frame_was_canceled_ = true; |
| + flow_control_state_ = STARTING; |
| + VLOG(1) << SENDER_SSRC |
| + << "Now re-STARTING because in-flight data was just canceled."; |
| } |
| } // namespace cast |