| Index: chrome/browser/media/cast_remoting_sender.cc
|
| diff --git a/chrome/browser/media/cast_remoting_sender.cc b/chrome/browser/media/cast_remoting_sender.cc
|
| index f24aed064a12e9c121dc69c55091b7062ab511c9..4d6ae380b4754bf53f7301f46ad845ac14efa5b9 100644
|
| --- a/chrome/browser/media/cast_remoting_sender.cc
|
| +++ b/chrome/browser/media/cast_remoting_sender.cc
|
| @@ -4,6 +4,7 @@
|
|
|
| #include "chrome/browser/media/cast_remoting_sender.h"
|
|
|
| +#include <algorithm>
|
| #include <map>
|
|
|
| #include "base/bind.h"
|
| @@ -12,15 +13,18 @@
|
| #include "base/lazy_instance.h"
|
| #include "base/memory/ptr_util.h"
|
| #include "base/numerics/safe_conversions.h"
|
| +#include "base/stl_util.h"
|
| #include "base/trace_event/trace_event.h"
|
| #include "content/public/browser/browser_thread.h"
|
| #include "media/base/bind_to_current_loop.h"
|
| #include "media/cast/constants.h"
|
|
|
| +using content::BrowserThread;
|
| +
|
| namespace {
|
|
|
| // Global map for looking-up CastRemotingSender instances by their
|
| -// |remoting_stream_id_|.
|
| +// |rtp_stream_id|.
|
| using CastRemotingSenderMap = std::map<int32_t, cast::CastRemotingSender*>;
|
| base::LazyInstance<CastRemotingSenderMap>::Leaky g_sender_map =
|
| LAZY_INSTANCE_INITIALIZER;
|
| @@ -28,7 +32,6 @@ base::LazyInstance<CastRemotingSenderMap>::Leaky g_sender_map =
|
| constexpr base::TimeDelta kMinSchedulingDelay =
|
| base::TimeDelta::FromMilliseconds(1);
|
| constexpr base::TimeDelta kMaxAckDelay = base::TimeDelta::FromMilliseconds(800);
|
| -constexpr base::TimeDelta kMinAckDelay = base::TimeDelta::FromMilliseconds(400);
|
| constexpr base::TimeDelta kReceiverProcessTime =
|
| base::TimeDelta::FromMilliseconds(250);
|
|
|
| @@ -72,39 +75,95 @@ CastRemotingSender::CastRemotingSender(
|
| scoped_refptr<media::cast::CastEnvironment> cast_environment,
|
| media::cast::CastTransport* transport,
|
| const media::cast::CastTransportRtpConfig& config)
|
| - : remoting_stream_id_(config.rtp_stream_id),
|
| + : rtp_stream_id_(config.rtp_stream_id),
|
| cast_environment_(std::move(cast_environment)),
|
| transport_(transport),
|
| ssrc_(config.ssrc),
|
| is_audio_(config.rtp_payload_type ==
|
| media::cast::RtpPayloadType::REMOTE_AUDIO),
|
| + binding_(this),
|
| max_ack_delay_(kMaxAckDelay),
|
| last_sent_frame_id_(media::cast::FrameId::first() - 1),
|
| latest_acked_frame_id_(media::cast::FrameId::first() - 1),
|
| duplicate_ack_counter_(0),
|
| - last_frame_was_canceled_(false),
|
| + input_queue_discards_remaining_(0),
|
| + flow_restart_pending_(true),
|
| weak_factory_(this) {
|
| + // Confirm this constructor is running on the IO BrowserThread and the
|
| + // CastEnvironment::MAIN thread is the same thread.
|
| + DCHECK_CURRENTLY_ON(BrowserThread::IO);
|
| DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
|
|
|
| - CastRemotingSender*& pointer_in_map = g_sender_map.Get()[remoting_stream_id_];
|
| + CastRemotingSender*& pointer_in_map = g_sender_map.Get()[rtp_stream_id_];
|
| DCHECK(!pointer_in_map);
|
| pointer_in_map = this;
|
| +
|
| transport_->InitializeStream(
|
| config, base::MakeUnique<RemotingRtcpClient>(weak_factory_.GetWeakPtr()));
|
| }
|
|
|
| CastRemotingSender::~CastRemotingSender() {
|
| - DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
|
| + DCHECK_CURRENTLY_ON(BrowserThread::IO);
|
| + g_sender_map.Pointer()->erase(rtp_stream_id_);
|
| +}
|
| +
|
| +// static
|
| +void CastRemotingSender::FindAndBind(
|
| + int32_t rtp_stream_id,
|
| + mojo::ScopedDataPipeConsumerHandle pipe,
|
| + media::mojom::RemotingDataStreamSenderRequest request,
|
| + const base::Closure& error_callback) {
|
| + // CastRemotingSender lives entirely on the IO thread, so trampoline if
|
| + // necessary.
|
| + if (!BrowserThread::CurrentlyOn(BrowserThread::IO)) {
|
| + BrowserThread::PostTask(
|
| + BrowserThread::IO, FROM_HERE,
|
| + base::Bind(&CastRemotingSender::FindAndBind, rtp_stream_id,
|
| + base::Passed(&pipe), base::Passed(&request),
|
| + // Using media::BindToCurrentLoop() so the |error_callback|
|
| + // is trampolined back to the original thread.
|
| + media::BindToCurrentLoop(error_callback)));
|
| + return;
|
| + }
|
| +
|
| + DCHECK(!error_callback.is_null());
|
| +
|
| + // Look-up the CastRemotingSender instance by its |rtp_stream_id|.
|
| + const auto it = g_sender_map.Pointer()->find(rtp_stream_id);
|
| + if (it == g_sender_map.Pointer()->end()) {
|
| + DLOG(ERROR) << "Cannot find CastRemotingSender instance by ID: "
|
| + << rtp_stream_id;
|
| + error_callback.Run();
|
| + return;
|
| + }
|
| + CastRemotingSender* const sender = it->second;
|
|
|
| - g_sender_map.Pointer()->erase(remoting_stream_id_);
|
| + // Confirm that the CastRemotingSender isn't already bound to a message pipe.
|
| + if (sender->binding_.is_bound()) {
|
| + DLOG(ERROR) << "Attempt to bind to CastRemotingSender a second time (id="
|
| + << rtp_stream_id << ")!";
|
| + error_callback.Run();
|
| + return;
|
| + }
|
| +
|
| + DCHECK(sender->error_callback_.is_null());
|
| + sender->error_callback_ = error_callback;
|
| +
|
| + sender->pipe_ = std::move(pipe);
|
| + sender->pipe_watcher_.Start(
|
| + sender->pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
|
| + base::Bind(&CastRemotingSender::ProcessInputQueue,
|
| + base::Unretained(sender)));
|
| + sender->binding_.Bind(std::move(request));
|
| + sender->binding_.set_connection_error_handler(sender->error_callback_);
|
| }
|
|
|
| void CastRemotingSender::OnReceivedRtt(base::TimeDelta round_trip_time) {
|
| DCHECK_GT(round_trip_time, base::TimeDelta());
|
| current_round_trip_time_ = round_trip_time;
|
| - max_ack_delay_ = 2 * current_round_trip_time_ + kReceiverProcessTime;
|
| - max_ack_delay_ =
|
| - std::max(std::min(max_ack_delay_, kMaxAckDelay), kMinAckDelay);
|
| + max_ack_delay_ = 2 * std::max(current_round_trip_time_, base::TimeDelta()) +
|
| + kReceiverProcessTime;
|
| + max_ack_delay_ = std::min(max_ack_delay_, kMaxAckDelay);
|
| }
|
|
|
| void CastRemotingSender::ResendCheck() {
|
| @@ -243,16 +302,110 @@ void CastRemotingSender::OnReceivedCastMessage(
|
| current_round_trip_time_.InMicroseconds());
|
| } while (latest_acked_frame_id_ < cast_feedback.ack_frame_id);
|
| transport_->CancelSendingFrames(ssrc_, frames_to_cancel);
|
| +
|
| + // One or more frames were canceled. This may allow pending input operations
|
| + // to complete.
|
| + ProcessInputQueue(MOJO_RESULT_OK);
|
| }
|
| }
|
|
|
| +void CastRemotingSender::ConsumeDataChunk(uint32_t offset, uint32_t size,
|
| + uint32_t total_payload_size) {
|
| + DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
|
| + input_queue_.push(
|
| + base::Bind(&CastRemotingSender::TryConsumeDataChunk,
|
| + base::Unretained(this), offset, size, total_payload_size));
|
| + ProcessInputQueue(MOJO_RESULT_OK);
|
| +}
|
| +
|
| void CastRemotingSender::SendFrame() {
|
| DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
|
| + input_queue_.push(
|
| + base::Bind(&CastRemotingSender::TrySendFrame, base::Unretained(this)));
|
| + ProcessInputQueue(MOJO_RESULT_OK);
|
| +}
|
| +
|
| +void CastRemotingSender::ProcessInputQueue(MojoResult result) {
|
| + DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
|
| + while (!input_queue_.empty()) {
|
| + if (!input_queue_.front().Run(input_queue_discards_remaining_ > 0))
|
| + break; // Operation must be retried later. Stop processing queue.
|
| + input_queue_.pop();
|
| + if (input_queue_discards_remaining_ > 0)
|
| + --input_queue_discards_remaining_;
|
| + }
|
| +}
|
| +
|
| +bool CastRemotingSender::TryConsumeDataChunk(uint32_t offset, uint32_t size,
|
| + uint32_t total_payload_size,
|
| + bool discard_data) {
|
| + DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
|
| +
|
| + do {
|
| + if (!pipe_.is_valid()) {
|
| + VLOG(1) << SENDER_SSRC << "Data pipe handle no longer valid.";
|
| + break;
|
| + }
|
| +
|
| + if (offset + size > total_payload_size) {
|
| + LOG(ERROR)
|
| + << SENDER_SSRC << "BUG: offset + size > total_payload_size ("
|
| + << offset << " + " << size << " > " << total_payload_size << ')';
|
| + break;
|
| + }
|
| +
|
| + // If the data is to be discarded, do a data pipe read with the DISCARD flag
|
| + // set.
|
| + if (discard_data) {
|
| + const MojoResult result = mojo::ReadDataRaw(
|
| + pipe_.get(), nullptr, &size,
|
| + MOJO_READ_DATA_FLAG_DISCARD | MOJO_READ_DATA_FLAG_ALL_OR_NONE);
|
| + if (result == MOJO_RESULT_OK)
|
| + return true; // Successfully discarded data.
|
| + if (result == MOJO_RESULT_OUT_OF_RANGE)
|
| + return false; // Retry later.
|
| + LOG(ERROR) << SENDER_SSRC
|
| + << "Unexpected result when discarding from data pipe ("
|
| + << result << ')';
|
| + break;
|
| + }
|
|
|
| + // If |total_payload_size| has changed, resize the data string. If it has
|
| + // not changed, the following statement will be a no-op.
|
| + next_frame_data_.resize(total_payload_size);
|
| +
|
| + const MojoResult result = mojo::ReadDataRaw(
|
| + pipe_.get(), base::string_as_array(&next_frame_data_) + offset, &size,
|
| + MOJO_READ_DATA_FLAG_ALL_OR_NONE);
|
| + if (result == MOJO_RESULT_OK)
|
| + return true; // Successfully consumed data.
|
| + if (result == MOJO_RESULT_OUT_OF_RANGE)
|
| + return false; // Retry later.
|
| + LOG(ERROR)
|
| + << SENDER_SSRC << "Read from data pipe failed (" << result << ')';
|
| + } while (false);
|
| +
|
| + // If this point is reached, there was a fatal error. Shut things down and run
|
| + // the error callback.
|
| + pipe_watcher_.Cancel();
|
| + pipe_.reset();
|
| + binding_.Close();
|
| + error_callback_.Run();
|
| + return true;
|
| +}
|
| +
|
| +bool CastRemotingSender::TrySendFrame(bool discard_data) {
|
| + DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
|
| +
|
| + // If the frame's data is to be discarded, just return immediately.
|
| + if (discard_data)
|
| + return true;
|
| +
|
| + // If there would be too many frames in-flight, do not proceed.
|
| if (NumberOfFramesInFlight() >= media::cast::kMaxUnackedFrames) {
|
| - // TODO(xjz): Delay the sending of this frame and stop reading the next
|
| - // frame data.
|
| - return;
|
| + VLOG(1) << SENDER_SSRC
|
| + << "Cannot send frame now because too many frames are in flight.";
|
| + return false;
|
| }
|
|
|
| VLOG(2) << SENDER_SSRC
|
| @@ -275,10 +428,13 @@ void CastRemotingSender::SendFrame() {
|
|
|
| media::cast::EncodedFrame remoting_frame;
|
| remoting_frame.frame_id = frame_id;
|
| - remoting_frame.dependency =
|
| - (is_first_frame_to_be_sent || last_frame_was_canceled_)
|
| - ? media::cast::EncodedFrame::KEY
|
| - : media::cast::EncodedFrame::DEPENDENT;
|
| + if (flow_restart_pending_) {
|
| + remoting_frame.dependency = media::cast::EncodedFrame::KEY;
|
| + flow_restart_pending_ = false;
|
| + } else {
|
| + DCHECK(!is_first_frame_to_be_sent);
|
| + remoting_frame.dependency = media::cast::EncodedFrame::DEPENDENT;
|
| + }
|
| remoting_frame.referenced_frame_id =
|
| remoting_frame.dependency == media::cast::EncodedFrame::KEY
|
| ? frame_id
|
| @@ -317,16 +473,23 @@ void CastRemotingSender::SendFrame() {
|
| cast_environment_->logger()->DispatchFrameEvent(std::move(remoting_event));
|
|
|
| RecordLatestFrameTimestamps(frame_id, remoting_frame.rtp_timestamp);
|
| - last_frame_was_canceled_ = false;
|
|
|
| transport_->InsertFrame(ssrc_, remoting_frame);
|
| +
|
| + return true;
|
| }
|
|
|
| -void CastRemotingSender::CancelFramesInFlight() {
|
| +void CastRemotingSender::CancelInFlightData() {
|
| DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
|
|
|
| base::STLClearObject(&next_frame_data_);
|
|
|
| + // TODO(miu): The following code is something we want to do as an
|
| + // optimization. However, as-is, it's not quite correct. We can only cancel
|
| + // frames where no packets have actually hit the network yet. Said another
|
| + // way, we can only cancel frames the receiver has definitely not seen any
|
| + // part of (including kickstarting!). http://crbug.com/647423
|
| +#if 0
|
| if (latest_acked_frame_id_ < last_sent_frame_id_) {
|
| std::vector<media::cast::FrameId> frames_to_cancel;
|
| do {
|
| @@ -335,8 +498,14 @@ void CastRemotingSender::CancelFramesInFlight() {
|
| } while (latest_acked_frame_id_ < last_sent_frame_id_);
|
| transport_->CancelSendingFrames(ssrc_, frames_to_cancel);
|
| }
|
| +#endif
|
| +
|
| + // Flag that all pending input operations should discard data.
|
| + input_queue_discards_remaining_ = input_queue_.size();
|
|
|
| - last_frame_was_canceled_ = true;
|
| + flow_restart_pending_ = true;
|
| + VLOG(1) << SENDER_SSRC
|
| + << "Now restarting because in-flight data was just canceled.";
|
| }
|
|
|
| } // namespace cast
|
|
|