Index: chrome/browser/media/cast_remoting_sender.cc |
diff --git a/chrome/browser/media/cast_remoting_sender.cc b/chrome/browser/media/cast_remoting_sender.cc |
index f24aed064a12e9c121dc69c55091b7062ab511c9..2e759888a875a5e61ec26a01a36c654c3c10daf9 100644 |
--- a/chrome/browser/media/cast_remoting_sender.cc |
+++ b/chrome/browser/media/cast_remoting_sender.cc |
@@ -4,6 +4,7 @@ |
#include "chrome/browser/media/cast_remoting_sender.h" |
+#include <algorithm> |
#include <map> |
#include "base/bind.h" |
@@ -12,15 +13,18 @@ |
#include "base/lazy_instance.h" |
#include "base/memory/ptr_util.h" |
#include "base/numerics/safe_conversions.h" |
+#include "base/stl_util.h" |
#include "base/trace_event/trace_event.h" |
#include "content/public/browser/browser_thread.h" |
#include "media/base/bind_to_current_loop.h" |
#include "media/cast/constants.h" |
+using content::BrowserThread; |
+ |
namespace { |
// Global map for looking-up CastRemotingSender instances by their |
-// |remoting_stream_id_|. |
+// |rtp_stream_id|. |
using CastRemotingSenderMap = std::map<int32_t, cast::CastRemotingSender*>; |
base::LazyInstance<CastRemotingSenderMap>::Leaky g_sender_map = |
LAZY_INSTANCE_INITIALIZER; |
@@ -28,7 +32,6 @@ base::LazyInstance<CastRemotingSenderMap>::Leaky g_sender_map = |
constexpr base::TimeDelta kMinSchedulingDelay = |
base::TimeDelta::FromMilliseconds(1); |
constexpr base::TimeDelta kMaxAckDelay = base::TimeDelta::FromMilliseconds(800); |
-constexpr base::TimeDelta kMinAckDelay = base::TimeDelta::FromMilliseconds(400); |
constexpr base::TimeDelta kReceiverProcessTime = |
base::TimeDelta::FromMilliseconds(250); |
@@ -72,39 +75,92 @@ CastRemotingSender::CastRemotingSender( |
scoped_refptr<media::cast::CastEnvironment> cast_environment, |
media::cast::CastTransport* transport, |
const media::cast::CastTransportRtpConfig& config) |
- : remoting_stream_id_(config.rtp_stream_id), |
+ : rtp_stream_id_(config.rtp_stream_id), |
cast_environment_(std::move(cast_environment)), |
transport_(transport), |
ssrc_(config.ssrc), |
is_audio_(config.rtp_payload_type == |
media::cast::RtpPayloadType::REMOTE_AUDIO), |
+ binding_(this), |
max_ack_delay_(kMaxAckDelay), |
last_sent_frame_id_(media::cast::FrameId::first() - 1), |
latest_acked_frame_id_(media::cast::FrameId::first() - 1), |
duplicate_ack_counter_(0), |
- last_frame_was_canceled_(false), |
+ flow_control_state_(STARTING), |
weak_factory_(this) { |
+ // Confirm this constructor is running on the IO BrowserThread and the |
+ // CastEnvironment::MAIN thread is the same thread. |
+ DCHECK_CURRENTLY_ON(BrowserThread::IO); |
DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
- CastRemotingSender*& pointer_in_map = g_sender_map.Get()[remoting_stream_id_]; |
+ CastRemotingSender*& pointer_in_map = g_sender_map.Get()[rtp_stream_id_]; |
DCHECK(!pointer_in_map); |
pointer_in_map = this; |
+ |
transport_->InitializeStream( |
config, base::MakeUnique<RemotingRtcpClient>(weak_factory_.GetWeakPtr())); |
} |
CastRemotingSender::~CastRemotingSender() { |
- DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
+ DCHECK_CURRENTLY_ON(BrowserThread::IO); |
+ g_sender_map.Pointer()->erase(rtp_stream_id_); |
+} |
+ |
+// static |
+void CastRemotingSender::FindAndBind( |
+ int32_t rtp_stream_id, |
+ mojo::ScopedDataPipeConsumerHandle pipe, |
+ media::mojom::RemotingDataStreamSenderRequest request, |
+ const base::Closure& error_callback) { |
+ // CastRemotingSender lives entirely on the IO thread, so trampoline if |
+ // necessary. |
+ if (!BrowserThread::CurrentlyOn(BrowserThread::IO)) { |
+ BrowserThread::PostTask( |
+ BrowserThread::IO, FROM_HERE, |
+ base::Bind(&CastRemotingSender::FindAndBind, rtp_stream_id, |
+ base::Passed(&pipe), base::Passed(&request), |
+ // Using media::BindToCurrentLoop() so the |error_callback| |
+ // is trampolined back to the original thread. |
+ media::BindToCurrentLoop(error_callback))); |
+ return; |
+ } |
+ |
+ DCHECK(!error_callback.is_null()); |
+ |
+ // Look-up the CastRemotingSender instance by its |rtp_stream_id|. |
+ const auto it = g_sender_map.Pointer()->find(rtp_stream_id); |
+ if (it == g_sender_map.Pointer()->end()) { |
+ DLOG(ERROR) << "Cannot find CastRemotingSender instance by ID: " |
+ << rtp_stream_id; |
+ error_callback.Run(); |
+ return; |
+ } |
+ CastRemotingSender* const sender = it->second; |
+ |
+ // Confirm that the CastRemotingSender isn't already bound to a message pipe. |
+ if (sender->binding_.is_bound()) { |
+ DLOG(ERROR) << "Attempt to bind to CastRemotingSender a second time (id=" |
+ << rtp_stream_id << ")!"; |
+ error_callback.Run(); |
+ return; |
+ } |
- g_sender_map.Pointer()->erase(remoting_stream_id_); |
+ DCHECK(sender->error_callback_.is_null()); |
+ sender->error_callback_ = error_callback; |
+ |
+ sender->pipe_ = std::move(pipe); |
+ sender->binding_.Bind(std::move(request)); |
+ sender->binding_.set_connection_error_handler( |
+ base::Bind(&base::Closure::Run, |
+ base::Unretained(&sender->error_callback_))); |
dcheng
2016/09/20 08:20:12
Does set_connection_error(sender->error_callback_)
miu
2016/09/21 03:15:50
Whoops! Left-overs, ya know? Done.
|
} |
void CastRemotingSender::OnReceivedRtt(base::TimeDelta round_trip_time) { |
DCHECK_GT(round_trip_time, base::TimeDelta()); |
current_round_trip_time_ = round_trip_time; |
- max_ack_delay_ = 2 * current_round_trip_time_ + kReceiverProcessTime; |
- max_ack_delay_ = |
- std::max(std::min(max_ack_delay_, kMaxAckDelay), kMinAckDelay); |
+ max_ack_delay_ = 2 * std::max(current_round_trip_time_, base::TimeDelta()) + |
+ kReceiverProcessTime; |
+ max_ack_delay_ = std::min(max_ack_delay_, kMaxAckDelay); |
} |
void CastRemotingSender::ResendCheck() { |
@@ -243,15 +299,78 @@ void CastRemotingSender::OnReceivedCastMessage( |
current_round_trip_time_.InMicroseconds()); |
} while (latest_acked_frame_id_ < cast_feedback.ack_frame_id); |
transport_->CancelSendingFrames(ssrc_, frames_to_cancel); |
+ |
+ if (flow_control_state_ == CONSUME_PAUSED) { |
+ // One or more frames were canceled, so resume reading from the Mojo data |
+ // pipes and sending frames. |
+ binding_.ResumeIncomingMethodCallProcessing(); |
+ flow_control_state_ = CONSUMING; |
+ VLOG(1) << SENDER_SSRC |
+ << "Now CONSUMING because one or more frames were ACK'ed."; |
+ |
+ // The last call to SendFrame() placed the sender into the CONSUME_PAUSED |
+ // state without sending the frame. So, send that frame now. |
+ SendFrame(); |
+ } |
} |
} |
+void CastRemotingSender::ConsumeDataChunk(uint32_t offset, uint32_t size, |
+ uint32_t total_payload_size) { |
+ DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
+ |
+ // Method calls from Mojo should not be happening when in the paused state. |
+ DCHECK_NE(flow_control_state_, CONSUME_PAUSED); |
dcheng
2016/09/20 08:20:12
why is that? What stops these messages from being
miu
2016/09/21 03:15:50
Short answer: We call binding_.PauseIncomingMethod
dcheng
2016/09/22 02:29:15
OK,thanks, I just wanted to make sure we weren't r
|
+ |
+ // If |total_payload_size| has changed, resize the data string. If it has not |
+ // changed, the following statement will be a no-op. |
+ next_frame_data_.resize(total_payload_size); |
+ |
+ do { |
+ if (!pipe_.is_valid()) { |
+ VLOG(1) << SENDER_SSRC << "Data pipe handle no longer valid."; |
+ break; |
+ } |
+ |
+ if (offset + size > total_payload_size) { |
+ LOG(DFATAL) |
dcheng
2016/09/20 08:20:12
We probably shouldn't dcheck here, this is coming
miu
2016/09/21 03:15:50
Relaxed to ERROR level.
|
+ << "BUG: offset + size > total_payload_size (" |
+ << offset << " + " << size << " > " << total_payload_size << ')'; |
+ break; |
+ } |
+ |
+ uint32_t num_bytes = size; |
+ const MojoResult result = mojo::ReadDataRaw( |
+ pipe_.get(), base::string_as_array(&next_frame_data_) + offset, |
+ &num_bytes, MOJO_READ_DATA_FLAG_ALL_OR_NONE); |
+ if (result != MOJO_RESULT_OK) { |
dcheng
2016/09/20 08:20:12
Is it possible for MOJO_RESULT_OUT_OF_RANGE to be
miu
2016/09/21 03:15:50
This should never happen. The render process is al
dcheng
2016/09/22 02:29:15
Right, it's not clear to me if there's synchroniza
miu
2016/09/22 08:39:14
Good catch on this, and rockot@ confirmed.
I re-w
|
+ LOG(DFATAL) << "BUG: Read from data pipe failed (" << result << ')'; |
+ break; |
+ } |
+ |
+ return; // Success. |
+ } while (false); |
+ |
+ // If this point is reached, there was a fatal error. Shut things down and run |
+ // the error callback. |
+ pipe_.reset(); |
+ binding_.Close(); |
+ error_callback_.Run(); |
+} |
+ |
void CastRemotingSender::SendFrame() { |
DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
+ // Method calls from Mojo should not be happening when in the paused state. |
+ DCHECK_NE(flow_control_state_, CONSUME_PAUSED); |
+ |
+ // If there would be too many frames in-flight, suspend consuming more input |
+ // from the Mojo data pipes until one or more frames are ACKed. |
if (NumberOfFramesInFlight() >= media::cast::kMaxUnackedFrames) { |
- // TODO(xjz): Delay the sending of this frame and stop reading the next |
- // frame data. |
+ binding_.PauseIncomingMethodCallProcessing(); |
+ flow_control_state_ = CONSUME_PAUSED; |
+ VLOG(1) << SENDER_SSRC |
+ << "Now CONSUME_PAUSED because too many frames are in flight."; |
return; |
} |
@@ -275,10 +394,13 @@ void CastRemotingSender::SendFrame() { |
media::cast::EncodedFrame remoting_frame; |
remoting_frame.frame_id = frame_id; |
- remoting_frame.dependency = |
- (is_first_frame_to_be_sent || last_frame_was_canceled_) |
- ? media::cast::EncodedFrame::KEY |
- : media::cast::EncodedFrame::DEPENDENT; |
+ if (flow_control_state_ == STARTING) { |
+ remoting_frame.dependency = media::cast::EncodedFrame::KEY; |
+ flow_control_state_ = CONSUMING; |
+ } else { |
+ DCHECK(!is_first_frame_to_be_sent); |
+ remoting_frame.dependency = media::cast::EncodedFrame::DEPENDENT; |
+ } |
remoting_frame.referenced_frame_id = |
remoting_frame.dependency == media::cast::EncodedFrame::KEY |
? frame_id |
@@ -317,16 +439,24 @@ void CastRemotingSender::SendFrame() { |
cast_environment_->logger()->DispatchFrameEvent(std::move(remoting_event)); |
RecordLatestFrameTimestamps(frame_id, remoting_frame.rtp_timestamp); |
- last_frame_was_canceled_ = false; |
transport_->InsertFrame(ssrc_, remoting_frame); |
} |
-void CastRemotingSender::CancelFramesInFlight() { |
+void CastRemotingSender::CancelInFlightData() { |
DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN)); |
+ // Method calls from Mojo should not be happening when in the paused state. |
+ DCHECK_NE(flow_control_state_, CONSUME_PAUSED); |
+ |
base::STLClearObject(&next_frame_data_); |
+ // TODO(miu): The following code is something we want to do as an |
+ // optimization. However, as-is, it's not quite correct. We can only cancel |
+ // frames where no packets have actually hit the network yet. Said another |
+ // way, we can only cancel frames the receiver has definitely not seen any |
+ // part of (including kickstarting!). http://crbug.com/647423 |
+#if 0 |
if (latest_acked_frame_id_ < last_sent_frame_id_) { |
std::vector<media::cast::FrameId> frames_to_cancel; |
do { |
@@ -335,8 +465,11 @@ void CastRemotingSender::CancelFramesInFlight() { |
} while (latest_acked_frame_id_ < last_sent_frame_id_); |
transport_->CancelSendingFrames(ssrc_, frames_to_cancel); |
} |
+#endif |
- last_frame_was_canceled_ = true; |
+ flow_control_state_ = STARTING; |
+ VLOG(1) << SENDER_SSRC |
+ << "Now re-STARTING because in-flight data was just canceled."; |
} |
} // namespace cast |