Index: chrome/browser/media/cast_remoting_sender.cc |
diff --git a/chrome/browser/media/cast_remoting_sender.cc b/chrome/browser/media/cast_remoting_sender.cc |
index f24aed064a12e9c121dc69c55091b7062ab511c9..4d6ae380b4754bf53f7301f46ad845ac14efa5b9 100644 |
--- a/chrome/browser/media/cast_remoting_sender.cc |
+++ b/chrome/browser/media/cast_remoting_sender.cc |
@@ -4,6 +4,7 @@ |
#include "chrome/browser/media/cast_remoting_sender.h" |
+#include <algorithm> |
#include <map> |
#include "base/bind.h" |
@@ -12,15 +13,18 @@ |
#include "base/lazy_instance.h" |
#include "base/memory/ptr_util.h" |
#include "base/numerics/safe_conversions.h" |
+#include "base/stl_util.h" |
#include "base/trace_event/trace_event.h" |
#include "content/public/browser/browser_thread.h" |
#include "media/base/bind_to_current_loop.h" |
#include "media/cast/constants.h" |
+using content::BrowserThread; |
+ |
namespace { |
// Global map for looking-up CastRemotingSender instances by their |
-// |remoting_stream_id_|. |
+// |rtp_stream_id|. |
using CastRemotingSenderMap = std::map<int32_t, cast::CastRemotingSender*>; |
base::LazyInstance<CastRemotingSenderMap>::Leaky g_sender_map = |
LAZY_INSTANCE_INITIALIZER; |
@@ -28,7 +32,6 @@ base::LazyInstance<CastRemotingSenderMap>::Leaky g_sender_map = |
constexpr base::TimeDelta kMinSchedulingDelay = |
base::TimeDelta::FromMilliseconds(1); |
constexpr base::TimeDelta kMaxAckDelay = base::TimeDelta::FromMilliseconds(800); |
-constexpr base::TimeDelta kMinAckDelay = base::TimeDelta::FromMilliseconds(400); |
constexpr base::TimeDelta kReceiverProcessTime = |
base::TimeDelta::FromMilliseconds(250); |
@@ -72,39 +75,95 @@ CastRemotingSender::CastRemotingSender( |
scoped_refptr<media::cast::CastEnvironment> cast_environment, |
media::cast::CastTransport* transport, |
const media::cast::CastTransportRtpConfig& config) |
- : remoting_stream_id_(config.rtp_stream_id), |
+ : rtp_stream_id_(config.rtp_stream_id), |
cast_environment_(std::move(cast_environment)), |
transport_(transport), |
ssrc_(config.ssrc), |
is_audio_(config.rtp_payload_type == |
media::cast::RtpPayloadType::REMOTE_AUDIO), |
+ binding_(this), |
max_ack_delay_(kMaxAckDelay), |
last_sent_frame_id_(media::cast::FrameId::first() - 1), |
latest_acked_frame_id_(media::cast::FrameId::first() - 1), |
duplicate_ack_counter_(0), |
- last_frame_was_canceled_(false), |
+ input_queue_discards_remaining_(0), |
+ flow_restart_pending_(true), |
weak_factory_(this) { |
+ // Confirm this constructor is running on the IO BrowserThread and the |
+ // CastEnvironment::MAIN thread is the same thread. |
+ DCHECK_CURRENTLY_ON(BrowserThread::IO); |
DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
- CastRemotingSender*& pointer_in_map = g_sender_map.Get()[remoting_stream_id_]; |
+ CastRemotingSender*& pointer_in_map = g_sender_map.Get()[rtp_stream_id_]; |
DCHECK(!pointer_in_map); |
pointer_in_map = this; |
+ |
transport_->InitializeStream( |
config, base::MakeUnique<RemotingRtcpClient>(weak_factory_.GetWeakPtr())); |
} |
CastRemotingSender::~CastRemotingSender() { |
- DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
+ DCHECK_CURRENTLY_ON(BrowserThread::IO); |
+ g_sender_map.Pointer()->erase(rtp_stream_id_); |
+} |
+ |
+// static |
+void CastRemotingSender::FindAndBind( |
+ int32_t rtp_stream_id, |
+ mojo::ScopedDataPipeConsumerHandle pipe, |
+ media::mojom::RemotingDataStreamSenderRequest request, |
+ const base::Closure& error_callback) { |
+ // CastRemotingSender lives entirely on the IO thread, so trampoline if |
+ // necessary. |
+ if (!BrowserThread::CurrentlyOn(BrowserThread::IO)) { |
+ BrowserThread::PostTask( |
+ BrowserThread::IO, FROM_HERE, |
+ base::Bind(&CastRemotingSender::FindAndBind, rtp_stream_id, |
+ base::Passed(&pipe), base::Passed(&request), |
+ // Using media::BindToCurrentLoop() so the |error_callback| |
+ // is trampolined back to the original thread. |
+ media::BindToCurrentLoop(error_callback))); |
+ return; |
+ } |
+ |
+ DCHECK(!error_callback.is_null()); |
+ |
+ // Look-up the CastRemotingSender instance by its |rtp_stream_id|. |
+ const auto it = g_sender_map.Pointer()->find(rtp_stream_id); |
+ if (it == g_sender_map.Pointer()->end()) { |
+ DLOG(ERROR) << "Cannot find CastRemotingSender instance by ID: " |
+ << rtp_stream_id; |
+ error_callback.Run(); |
+ return; |
+ } |
+ CastRemotingSender* const sender = it->second; |
- g_sender_map.Pointer()->erase(remoting_stream_id_); |
+ // Confirm that the CastRemotingSender isn't already bound to a message pipe. |
+ if (sender->binding_.is_bound()) { |
+ DLOG(ERROR) << "Attempt to bind to CastRemotingSender a second time (id=" |
+ << rtp_stream_id << ")!"; |
+ error_callback.Run(); |
+ return; |
+ } |
+ |
+ DCHECK(sender->error_callback_.is_null()); |
+ sender->error_callback_ = error_callback; |
+ |
+ sender->pipe_ = std::move(pipe); |
+ sender->pipe_watcher_.Start( |
+ sender->pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
+ base::Bind(&CastRemotingSender::ProcessInputQueue, |
+ base::Unretained(sender))); |
+ sender->binding_.Bind(std::move(request)); |
+ sender->binding_.set_connection_error_handler(sender->error_callback_); |
} |
void CastRemotingSender::OnReceivedRtt(base::TimeDelta round_trip_time) { |
DCHECK_GT(round_trip_time, base::TimeDelta()); |
current_round_trip_time_ = round_trip_time; |
- max_ack_delay_ = 2 * current_round_trip_time_ + kReceiverProcessTime; |
- max_ack_delay_ = |
- std::max(std::min(max_ack_delay_, kMaxAckDelay), kMinAckDelay); |
+ max_ack_delay_ = 2 * std::max(current_round_trip_time_, base::TimeDelta()) + |
+ kReceiverProcessTime; |
+ max_ack_delay_ = std::min(max_ack_delay_, kMaxAckDelay); |
} |
void CastRemotingSender::ResendCheck() { |
@@ -243,16 +302,110 @@ void CastRemotingSender::OnReceivedCastMessage( |
current_round_trip_time_.InMicroseconds()); |
} while (latest_acked_frame_id_ < cast_feedback.ack_frame_id); |
transport_->CancelSendingFrames(ssrc_, frames_to_cancel); |
+ |
+ // One or more frames were canceled. This may allow pending input operations |
+ // to complete. |
+ ProcessInputQueue(MOJO_RESULT_OK); |
} |
} |
+void CastRemotingSender::ConsumeDataChunk(uint32_t offset, uint32_t size, |
+ uint32_t total_payload_size) { |
+ DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
+ input_queue_.push( |
+ base::Bind(&CastRemotingSender::TryConsumeDataChunk, |
+ base::Unretained(this), offset, size, total_payload_size)); |
+ ProcessInputQueue(MOJO_RESULT_OK); |
+} |
+ |
void CastRemotingSender::SendFrame() { |
DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
+ input_queue_.push( |
+ base::Bind(&CastRemotingSender::TrySendFrame, base::Unretained(this))); |
+ ProcessInputQueue(MOJO_RESULT_OK); |
+} |
+ |
+void CastRemotingSender::ProcessInputQueue(MojoResult result) { |
+ DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
+ while (!input_queue_.empty()) { |
+ if (!input_queue_.front().Run(input_queue_discards_remaining_ > 0)) |
+ break; // Operation must be retried later. Stop processing queue. |
+ input_queue_.pop(); |
+ if (input_queue_discards_remaining_ > 0) |
+ --input_queue_discards_remaining_; |
+ } |
+} |
+ |
+bool CastRemotingSender::TryConsumeDataChunk(uint32_t offset, uint32_t size, |
+ uint32_t total_payload_size, |
+ bool discard_data) { |
+ DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
+ |
+ do { |
+ if (!pipe_.is_valid()) { |
+ VLOG(1) << SENDER_SSRC << "Data pipe handle no longer valid."; |
+ break; |
+ } |
+ |
+ if (offset + size > total_payload_size) { |
+ LOG(ERROR) |
+ << SENDER_SSRC << "BUG: offset + size > total_payload_size (" |
+ << offset << " + " << size << " > " << total_payload_size << ')'; |
+ break; |
+ } |
+ |
+ // If the data is to be discarded, do a data pipe read with the DISCARD flag |
+ // set. |
+ if (discard_data) { |
+ const MojoResult result = mojo::ReadDataRaw( |
+ pipe_.get(), nullptr, &size, |
+ MOJO_READ_DATA_FLAG_DISCARD | MOJO_READ_DATA_FLAG_ALL_OR_NONE); |
+ if (result == MOJO_RESULT_OK) |
+ return true; // Successfully discarded data. |
+ if (result == MOJO_RESULT_OUT_OF_RANGE) |
+ return false; // Retry later. |
+ LOG(ERROR) << SENDER_SSRC |
+ << "Unexpected result when discarding from data pipe (" |
+ << result << ')'; |
+ break; |
+ } |
+ // If |total_payload_size| has changed, resize the data string. If it has |
+ // not changed, the following statement will be a no-op. |
+ next_frame_data_.resize(total_payload_size); |
+ |
+ const MojoResult result = mojo::ReadDataRaw( |
+ pipe_.get(), base::string_as_array(&next_frame_data_) + offset, &size, |
+ MOJO_READ_DATA_FLAG_ALL_OR_NONE); |
+ if (result == MOJO_RESULT_OK) |
+ return true; // Successfully consumed data. |
+ if (result == MOJO_RESULT_OUT_OF_RANGE) |
+ return false; // Retry later. |
+ LOG(ERROR) |
+ << SENDER_SSRC << "Read from data pipe failed (" << result << ')'; |
+ } while (false); |
+ |
+ // If this point is reached, there was a fatal error. Shut things down and run |
+ // the error callback. |
+ pipe_watcher_.Cancel(); |
+ pipe_.reset(); |
+ binding_.Close(); |
+ error_callback_.Run(); |
+ return true; |
+} |
+ |
+bool CastRemotingSender::TrySendFrame(bool discard_data) { |
+ DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
+ |
+ // If the frame's data is to be discarded, just return immediately. |
+ if (discard_data) |
+ return true; |
+ |
+ // If there would be too many frames in-flight, do not proceed. |
if (NumberOfFramesInFlight() >= media::cast::kMaxUnackedFrames) { |
- // TODO(xjz): Delay the sending of this frame and stop reading the next |
- // frame data. |
- return; |
+ VLOG(1) << SENDER_SSRC |
+ << "Cannot send frame now because too many frames are in flight."; |
+ return false; |
} |
VLOG(2) << SENDER_SSRC |
@@ -275,10 +428,13 @@ void CastRemotingSender::SendFrame() { |
media::cast::EncodedFrame remoting_frame; |
remoting_frame.frame_id = frame_id; |
- remoting_frame.dependency = |
- (is_first_frame_to_be_sent || last_frame_was_canceled_) |
- ? media::cast::EncodedFrame::KEY |
- : media::cast::EncodedFrame::DEPENDENT; |
+ if (flow_restart_pending_) { |
+ remoting_frame.dependency = media::cast::EncodedFrame::KEY; |
+ flow_restart_pending_ = false; |
+ } else { |
+ DCHECK(!is_first_frame_to_be_sent); |
+ remoting_frame.dependency = media::cast::EncodedFrame::DEPENDENT; |
+ } |
remoting_frame.referenced_frame_id = |
remoting_frame.dependency == media::cast::EncodedFrame::KEY |
? frame_id |
@@ -317,16 +473,23 @@ void CastRemotingSender::SendFrame() { |
cast_environment_->logger()->DispatchFrameEvent(std::move(remoting_event)); |
RecordLatestFrameTimestamps(frame_id, remoting_frame.rtp_timestamp); |
- last_frame_was_canceled_ = false; |
transport_->InsertFrame(ssrc_, remoting_frame); |
+ |
+ return true; |
} |
-void CastRemotingSender::CancelFramesInFlight() { |
+void CastRemotingSender::CancelInFlightData() { |
DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
base::STLClearObject(&next_frame_data_); |
+ // TODO(miu): The following code is something we want to do as an |
+ // optimization. However, as-is, it's not quite correct. We can only cancel |
+ // frames where no packets have actually hit the network yet. Said another |
+ // way, we can only cancel frames the receiver has definitely not seen any |
+ // part of (including kickstarting!). http://crbug.com/647423 |
+#if 0 |
if (latest_acked_frame_id_ < last_sent_frame_id_) { |
std::vector<media::cast::FrameId> frames_to_cancel; |
do { |
@@ -335,8 +498,14 @@ void CastRemotingSender::CancelFramesInFlight() { |
} while (latest_acked_frame_id_ < last_sent_frame_id_); |
transport_->CancelSendingFrames(ssrc_, frames_to_cancel); |
} |
+#endif |
+ |
+ // Flag that all pending input operations should discard data. |
+ input_queue_discards_remaining_ = input_queue_.size(); |
- last_frame_was_canceled_ = true; |
+ flow_restart_pending_ = true; |
+ VLOG(1) << SENDER_SSRC |
+ << "Now restarting because in-flight data was just canceled."; |
} |
} // namespace cast |