Chromium Code Reviews| Index: chrome/browser/media/cast_remoting_sender.cc |
| diff --git a/chrome/browser/media/cast_remoting_sender.cc b/chrome/browser/media/cast_remoting_sender.cc |
| index f24aed064a12e9c121dc69c55091b7062ab511c9..bad9305417a5909cad46cb06f24867c259291d56 100644 |
| --- a/chrome/browser/media/cast_remoting_sender.cc |
| +++ b/chrome/browser/media/cast_remoting_sender.cc |
| @@ -4,6 +4,7 @@ |
| #include "chrome/browser/media/cast_remoting_sender.h" |
| +#include <algorithm> |
| #include <map> |
| #include "base/bind.h" |
| @@ -12,15 +13,18 @@ |
| #include "base/lazy_instance.h" |
| #include "base/memory/ptr_util.h" |
| #include "base/numerics/safe_conversions.h" |
| +#include "base/stl_util.h" |
| #include "base/trace_event/trace_event.h" |
| #include "content/public/browser/browser_thread.h" |
| #include "media/base/bind_to_current_loop.h" |
| #include "media/cast/constants.h" |
| +using content::BrowserThread; |
| + |
| namespace { |
| // Global map for looking-up CastRemotingSender instances by their |
| -// |remoting_stream_id_|. |
| +// |rtp_stream_id|. |
| using CastRemotingSenderMap = std::map<int32_t, cast::CastRemotingSender*>; |
| base::LazyInstance<CastRemotingSenderMap>::Leaky g_sender_map = |
| LAZY_INSTANCE_INITIALIZER; |
| @@ -28,7 +32,6 @@ base::LazyInstance<CastRemotingSenderMap>::Leaky g_sender_map = |
| constexpr base::TimeDelta kMinSchedulingDelay = |
| base::TimeDelta::FromMilliseconds(1); |
| constexpr base::TimeDelta kMaxAckDelay = base::TimeDelta::FromMilliseconds(800); |
| -constexpr base::TimeDelta kMinAckDelay = base::TimeDelta::FromMilliseconds(400); |
| constexpr base::TimeDelta kReceiverProcessTime = |
| base::TimeDelta::FromMilliseconds(250); |
| @@ -72,39 +75,95 @@ CastRemotingSender::CastRemotingSender( |
| scoped_refptr<media::cast::CastEnvironment> cast_environment, |
| media::cast::CastTransport* transport, |
| const media::cast::CastTransportRtpConfig& config) |
| - : remoting_stream_id_(config.rtp_stream_id), |
| + : rtp_stream_id_(config.rtp_stream_id), |
| cast_environment_(std::move(cast_environment)), |
| transport_(transport), |
| ssrc_(config.ssrc), |
| is_audio_(config.rtp_payload_type == |
| media::cast::RtpPayloadType::REMOTE_AUDIO), |
| + binding_(this), |
| max_ack_delay_(kMaxAckDelay), |
| last_sent_frame_id_(media::cast::FrameId::first() - 1), |
| latest_acked_frame_id_(media::cast::FrameId::first() - 1), |
| duplicate_ack_counter_(0), |
| - last_frame_was_canceled_(false), |
| + input_queue_discards_remaining_(0), |
| + flow_restart_pending_(true), |
| weak_factory_(this) { |
| + // Confirm this constructor is running on the IO BrowserThread and the |
| + // CastEnvironment::MAIN thread is the same thread. |
| + DCHECK_CURRENTLY_ON(BrowserThread::IO); |
| DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| - CastRemotingSender*& pointer_in_map = g_sender_map.Get()[remoting_stream_id_]; |
| + CastRemotingSender*& pointer_in_map = g_sender_map.Get()[rtp_stream_id_]; |
| DCHECK(!pointer_in_map); |
| pointer_in_map = this; |
| + |
| transport_->InitializeStream( |
| config, base::MakeUnique<RemotingRtcpClient>(weak_factory_.GetWeakPtr())); |
| } |
| CastRemotingSender::~CastRemotingSender() { |
| - DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| + DCHECK_CURRENTLY_ON(BrowserThread::IO); |
| + g_sender_map.Pointer()->erase(rtp_stream_id_); |
| +} |
| + |
| +// static |
| +void CastRemotingSender::FindAndBind( |
| + int32_t rtp_stream_id, |
| + mojo::ScopedDataPipeConsumerHandle pipe, |
| + media::mojom::RemotingDataStreamSenderRequest request, |
| + const base::Closure& error_callback) { |
| + // CastRemotingSender lives entirely on the IO thread, so trampoline if |
| + // necessary. |
| + if (!BrowserThread::CurrentlyOn(BrowserThread::IO)) { |
| + BrowserThread::PostTask( |
| + BrowserThread::IO, FROM_HERE, |
| + base::Bind(&CastRemotingSender::FindAndBind, rtp_stream_id, |
| + base::Passed(&pipe), base::Passed(&request), |
| + // Using media::BindToCurrentLoop() so the |error_callback| |
| + // is trampolined back to the original thread. |
| + media::BindToCurrentLoop(error_callback))); |
| + return; |
| + } |
| + |
| + DCHECK(!error_callback.is_null()); |
| + |
| + // Look-up the CastRemotingSender instance by its |rtp_stream_id|. |
| + const auto it = g_sender_map.Pointer()->find(rtp_stream_id); |
| + if (it == g_sender_map.Pointer()->end()) { |
| + DLOG(ERROR) << "Cannot find CastRemotingSender instance by ID: " |
| + << rtp_stream_id; |
| + error_callback.Run(); |
| + return; |
| + } |
| + CastRemotingSender* const sender = it->second; |
| - g_sender_map.Pointer()->erase(remoting_stream_id_); |
| + // Confirm that the CastRemotingSender isn't already bound to a message pipe. |
| + if (sender->binding_.is_bound()) { |
| + DLOG(ERROR) << "Attempt to bind to CastRemotingSender a second time (id=" |
| + << rtp_stream_id << ")!"; |
| + error_callback.Run(); |
| + return; |
| + } |
| + |
| + DCHECK(sender->error_callback_.is_null()); |
| + sender->error_callback_ = error_callback; |
| + |
| + sender->pipe_ = std::move(pipe); |
| + sender->pipe_watcher_.Start( |
| + sender->pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| + base::Bind(&CastRemotingSender::ProcessInputQueue, |
| + base::Unretained(sender))); |
| + sender->binding_.Bind(std::move(request)); |
| + sender->binding_.set_connection_error_handler(sender->error_callback_); |
| } |
| void CastRemotingSender::OnReceivedRtt(base::TimeDelta round_trip_time) { |
| DCHECK_GT(round_trip_time, base::TimeDelta()); |
| current_round_trip_time_ = round_trip_time; |
| - max_ack_delay_ = 2 * current_round_trip_time_ + kReceiverProcessTime; |
| - max_ack_delay_ = |
| - std::max(std::min(max_ack_delay_, kMaxAckDelay), kMinAckDelay); |
| + max_ack_delay_ = 2 * std::max(current_round_trip_time_, base::TimeDelta()) + |
| + kReceiverProcessTime; |
| + max_ack_delay_ = std::min(max_ack_delay_, kMaxAckDelay); |
| } |
| void CastRemotingSender::ResendCheck() { |
| @@ -243,16 +302,110 @@ void CastRemotingSender::OnReceivedCastMessage( |
| current_round_trip_time_.InMicroseconds()); |
| } while (latest_acked_frame_id_ < cast_feedback.ack_frame_id); |
| transport_->CancelSendingFrames(ssrc_, frames_to_cancel); |
| + |
| + // One or more frames were canceled. This may allow pending input operations |
| + // to complete. |
| + ProcessInputQueue(MOJO_RESULT_OK); |
| } |
| } |
| +void CastRemotingSender::ConsumeDataChunk(uint32_t offset, uint32_t size, |
| + uint32_t total_payload_size) { |
| + DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| + input_queue_.push( |
| + base::Bind(&CastRemotingSender::TryConsumeDataChunk, |
| + base::Unretained(this), offset, size, total_payload_size)); |
| + ProcessInputQueue(MOJO_RESULT_OK); |
| +} |
| + |
| void CastRemotingSender::SendFrame() { |
| DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| + input_queue_.push( |
| + base::Bind(&CastRemotingSender::TrySendFrame, base::Unretained(this))); |
| + ProcessInputQueue(MOJO_RESULT_OK); |
| +} |
| + |
| +void CastRemotingSender::ProcessInputQueue(MojoResult result) { |
| + DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| + while (!input_queue_.empty()) { |
| + if (!input_queue_.front().Run(input_queue_discards_remaining_ > 0)) |
| + break; // Operation must be retried later. Stop processing queue. |
| + input_queue_.pop(); |
| + if (input_queue_discards_remaining_ > 0) |
| + --input_queue_discards_remaining_; |
| + } |
| +} |
| + |
| +bool CastRemotingSender::TryConsumeDataChunk(uint32_t offset, uint32_t size, |
| + uint32_t total_payload_size, |
| + bool discard_data) { |
| + DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| + |
| + do { |
| + if (!pipe_.is_valid()) { |
| + VLOG(1) << SENDER_SSRC << "Data pipe handle no longer valid."; |
| + break; |
| + } |
| + |
| + if (offset + size > total_payload_size) { |
| + LOG(ERROR) |
| + << SENDER_SSRC << "BUG: offset + size > total_payload_size (" |
| + << offset << " + " << size << " > " << total_payload_size << ')'; |
| + break; |
| + } |
| + |
| + // If the data is to be discarded, do a data pipe read with the DISCARD flag |
| + // set. |
| + if (discard_data) { |
| + const MojoResult result = mojo::ReadDataRaw( |
| + pipe_.get(), nullptr, &size, |
| + MOJO_READ_DATA_FLAG_DISCARD | MOJO_READ_DATA_FLAG_ALL_OR_NONE); |
| + if (result == MOJO_RESULT_OK) |
| + return true; // Successfully discarded data. |
| + if (result == MOJO_RESULT_OUT_OF_RANGE) |
| + return false; // Retry later. |
| + LOG(ERROR) << SENDER_SSRC |
| + << "Unexpcted result when discarding from data pipe (" |
|
dcheng
2016/09/23 04:51:07
Nit: Unexpected
miu
2016/09/23 05:52:00
Done.
|
| + << result << ')'; |
| + break; |
| + } |
| + // If |total_payload_size| has changed, resize the data string. If it has |
| + // not changed, the following statement will be a no-op. |
| + next_frame_data_.resize(total_payload_size); |
| + |
| + const MojoResult result = mojo::ReadDataRaw( |
| + pipe_.get(), base::string_as_array(&next_frame_data_) + offset, &size, |
| + MOJO_READ_DATA_FLAG_ALL_OR_NONE); |
| + if (result == MOJO_RESULT_OK) |
| + return true; // Successfully consumed data. |
| + if (result == MOJO_RESULT_OUT_OF_RANGE) |
| + return false; // Retry later. |
| + LOG(ERROR) |
| + << SENDER_SSRC << "Read from data pipe failed (" << result << ')'; |
| + } while (false); |
| + |
| + // If this point is reached, there was a fatal error. Shut things down and run |
| + // the error callback. |
| + pipe_watcher_.Cancel(); |
| + pipe_.reset(); |
| + binding_.Close(); |
| + error_callback_.Run(); |
| + return true; |
| +} |
| + |
| +bool CastRemotingSender::TrySendFrame(bool discard_data) { |
| + DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| + |
| + // If the frame's data is to be discarded, just return immediately. |
| + if (discard_data) |
| + return true; |
| + |
| + // If there would be too many frames in-flight, do not proceed. |
| if (NumberOfFramesInFlight() >= media::cast::kMaxUnackedFrames) { |
| - // TODO(xjz): Delay the sending of this frame and stop reading the next |
| - // frame data. |
| - return; |
| + VLOG(1) << SENDER_SSRC |
| + << "Cannot send frame now because too many frames are in flight."; |
| + return false; |
| } |
| VLOG(2) << SENDER_SSRC |
| @@ -275,10 +428,13 @@ void CastRemotingSender::SendFrame() { |
| media::cast::EncodedFrame remoting_frame; |
| remoting_frame.frame_id = frame_id; |
| - remoting_frame.dependency = |
| - (is_first_frame_to_be_sent || last_frame_was_canceled_) |
| - ? media::cast::EncodedFrame::KEY |
| - : media::cast::EncodedFrame::DEPENDENT; |
| + if (flow_restart_pending_) { |
| + remoting_frame.dependency = media::cast::EncodedFrame::KEY; |
| + flow_restart_pending_ = false; |
| + } else { |
| + DCHECK(!is_first_frame_to_be_sent); |
| + remoting_frame.dependency = media::cast::EncodedFrame::DEPENDENT; |
| + } |
| remoting_frame.referenced_frame_id = |
| remoting_frame.dependency == media::cast::EncodedFrame::KEY |
| ? frame_id |
| @@ -317,16 +473,23 @@ void CastRemotingSender::SendFrame() { |
| cast_environment_->logger()->DispatchFrameEvent(std::move(remoting_event)); |
| RecordLatestFrameTimestamps(frame_id, remoting_frame.rtp_timestamp); |
| - last_frame_was_canceled_ = false; |
| transport_->InsertFrame(ssrc_, remoting_frame); |
| + |
| + return true; |
| } |
| -void CastRemotingSender::CancelFramesInFlight() { |
| +void CastRemotingSender::CancelInFlightData() { |
| DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
| base::STLClearObject(&next_frame_data_); |
| + // TODO(miu): The following code is something we want to do as an |
| + // optimization. However, as-is, it's not quite correct. We can only cancel |
| + // frames where no packets have actually hit the network yet. Said another |
| + // way, we can only cancel frames the receiver has definitely not seen any |
| + // part of (including kickstarting!). http://crbug.com/647423 |
| +#if 0 |
| if (latest_acked_frame_id_ < last_sent_frame_id_) { |
| std::vector<media::cast::FrameId> frames_to_cancel; |
| do { |
| @@ -335,8 +498,14 @@ void CastRemotingSender::CancelFramesInFlight() { |
| } while (latest_acked_frame_id_ < last_sent_frame_id_); |
| transport_->CancelSendingFrames(ssrc_, frames_to_cancel); |
| } |
| +#endif |
| + |
| + // Flag that all pending input operations should discard data. |
| + input_queue_discards_remaining_ = input_queue_.size(); |
| - last_frame_was_canceled_ = true; |
| + flow_restart_pending_ = true; |
| + VLOG(1) << SENDER_SSRC |
| + << "Now restarting because in-flight data was just canceled."; |
| } |
| } // namespace cast |