Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1705)

Unified Diff: chrome/browser/media/cast_remoting_sender.cc

Issue 2310753002: Media Remoting: Data/Control plumbing between renderer and Media Router. (Closed)
Patch Set: Just a REBASE on ToT before commit. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: chrome/browser/media/cast_remoting_sender.cc
diff --git a/chrome/browser/media/cast_remoting_sender.cc b/chrome/browser/media/cast_remoting_sender.cc
index f24aed064a12e9c121dc69c55091b7062ab511c9..4d6ae380b4754bf53f7301f46ad845ac14efa5b9 100644
--- a/chrome/browser/media/cast_remoting_sender.cc
+++ b/chrome/browser/media/cast_remoting_sender.cc
@@ -4,6 +4,7 @@
#include "chrome/browser/media/cast_remoting_sender.h"
+#include <algorithm>
#include <map>
#include "base/bind.h"
@@ -12,15 +13,18 @@
#include "base/lazy_instance.h"
#include "base/memory/ptr_util.h"
#include "base/numerics/safe_conversions.h"
+#include "base/stl_util.h"
#include "base/trace_event/trace_event.h"
#include "content/public/browser/browser_thread.h"
#include "media/base/bind_to_current_loop.h"
#include "media/cast/constants.h"
+using content::BrowserThread;
+
namespace {
// Global map for looking-up CastRemotingSender instances by their
-// |remoting_stream_id_|.
+// |rtp_stream_id|.
using CastRemotingSenderMap = std::map<int32_t, cast::CastRemotingSender*>;
base::LazyInstance<CastRemotingSenderMap>::Leaky g_sender_map =
LAZY_INSTANCE_INITIALIZER;
@@ -28,7 +32,6 @@ base::LazyInstance<CastRemotingSenderMap>::Leaky g_sender_map =
constexpr base::TimeDelta kMinSchedulingDelay =
base::TimeDelta::FromMilliseconds(1);
constexpr base::TimeDelta kMaxAckDelay = base::TimeDelta::FromMilliseconds(800);
-constexpr base::TimeDelta kMinAckDelay = base::TimeDelta::FromMilliseconds(400);
constexpr base::TimeDelta kReceiverProcessTime =
base::TimeDelta::FromMilliseconds(250);
@@ -72,39 +75,95 @@ CastRemotingSender::CastRemotingSender(
scoped_refptr<media::cast::CastEnvironment> cast_environment,
media::cast::CastTransport* transport,
const media::cast::CastTransportRtpConfig& config)
- : remoting_stream_id_(config.rtp_stream_id),
+ : rtp_stream_id_(config.rtp_stream_id),
cast_environment_(std::move(cast_environment)),
transport_(transport),
ssrc_(config.ssrc),
is_audio_(config.rtp_payload_type ==
media::cast::RtpPayloadType::REMOTE_AUDIO),
+ binding_(this),
max_ack_delay_(kMaxAckDelay),
last_sent_frame_id_(media::cast::FrameId::first() - 1),
latest_acked_frame_id_(media::cast::FrameId::first() - 1),
duplicate_ack_counter_(0),
- last_frame_was_canceled_(false),
+ input_queue_discards_remaining_(0),
+ flow_restart_pending_(true),
weak_factory_(this) {
+ // Confirm this constructor is running on the IO BrowserThread and the
+ // CastEnvironment::MAIN thread is the same thread.
+ DCHECK_CURRENTLY_ON(BrowserThread::IO);
DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
- CastRemotingSender*& pointer_in_map = g_sender_map.Get()[remoting_stream_id_];
+ CastRemotingSender*& pointer_in_map = g_sender_map.Get()[rtp_stream_id_];
DCHECK(!pointer_in_map);
pointer_in_map = this;
+
transport_->InitializeStream(
config, base::MakeUnique<RemotingRtcpClient>(weak_factory_.GetWeakPtr()));
}
CastRemotingSender::~CastRemotingSender() {
- DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
+ DCHECK_CURRENTLY_ON(BrowserThread::IO);
+ g_sender_map.Pointer()->erase(rtp_stream_id_);
+}
+
+// static
+void CastRemotingSender::FindAndBind(
+ int32_t rtp_stream_id,
+ mojo::ScopedDataPipeConsumerHandle pipe,
+ media::mojom::RemotingDataStreamSenderRequest request,
+ const base::Closure& error_callback) {
+ // CastRemotingSender lives entirely on the IO thread, so trampoline if
+ // necessary.
+ if (!BrowserThread::CurrentlyOn(BrowserThread::IO)) {
+ BrowserThread::PostTask(
+ BrowserThread::IO, FROM_HERE,
+ base::Bind(&CastRemotingSender::FindAndBind, rtp_stream_id,
+ base::Passed(&pipe), base::Passed(&request),
+ // Using media::BindToCurrentLoop() so the |error_callback|
+ // is trampolined back to the original thread.
+ media::BindToCurrentLoop(error_callback)));
+ return;
+ }
+
+ DCHECK(!error_callback.is_null());
+
+ // Look-up the CastRemotingSender instance by its |rtp_stream_id|.
+ const auto it = g_sender_map.Pointer()->find(rtp_stream_id);
+ if (it == g_sender_map.Pointer()->end()) {
+ DLOG(ERROR) << "Cannot find CastRemotingSender instance by ID: "
+ << rtp_stream_id;
+ error_callback.Run();
+ return;
+ }
+ CastRemotingSender* const sender = it->second;
- g_sender_map.Pointer()->erase(remoting_stream_id_);
+ // Confirm that the CastRemotingSender isn't already bound to a message pipe.
+ if (sender->binding_.is_bound()) {
+ DLOG(ERROR) << "Attempt to bind to CastRemotingSender a second time (id="
+ << rtp_stream_id << ")!";
+ error_callback.Run();
+ return;
+ }
+
+ DCHECK(sender->error_callback_.is_null());
+ sender->error_callback_ = error_callback;
+
+ sender->pipe_ = std::move(pipe);
+ sender->pipe_watcher_.Start(
+ sender->pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&CastRemotingSender::ProcessInputQueue,
+ base::Unretained(sender)));
+ sender->binding_.Bind(std::move(request));
+ sender->binding_.set_connection_error_handler(sender->error_callback_);
}
void CastRemotingSender::OnReceivedRtt(base::TimeDelta round_trip_time) {
DCHECK_GT(round_trip_time, base::TimeDelta());
current_round_trip_time_ = round_trip_time;
- max_ack_delay_ = 2 * current_round_trip_time_ + kReceiverProcessTime;
- max_ack_delay_ =
- std::max(std::min(max_ack_delay_, kMaxAckDelay), kMinAckDelay);
+ max_ack_delay_ = 2 * std::max(current_round_trip_time_, base::TimeDelta()) +
+ kReceiverProcessTime;
+ max_ack_delay_ = std::min(max_ack_delay_, kMaxAckDelay);
}
void CastRemotingSender::ResendCheck() {
@@ -243,16 +302,110 @@ void CastRemotingSender::OnReceivedCastMessage(
current_round_trip_time_.InMicroseconds());
} while (latest_acked_frame_id_ < cast_feedback.ack_frame_id);
transport_->CancelSendingFrames(ssrc_, frames_to_cancel);
+
+ // One or more frames were canceled. This may allow pending input operations
+ // to complete.
+ ProcessInputQueue(MOJO_RESULT_OK);
}
}
+void CastRemotingSender::ConsumeDataChunk(uint32_t offset, uint32_t size,
+ uint32_t total_payload_size) {
+ DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
+ input_queue_.push(
+ base::Bind(&CastRemotingSender::TryConsumeDataChunk,
+ base::Unretained(this), offset, size, total_payload_size));
+ ProcessInputQueue(MOJO_RESULT_OK);
+}
+
void CastRemotingSender::SendFrame() {
DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
+ input_queue_.push(
+ base::Bind(&CastRemotingSender::TrySendFrame, base::Unretained(this)));
+ ProcessInputQueue(MOJO_RESULT_OK);
+}
+
+void CastRemotingSender::ProcessInputQueue(MojoResult result) {
+ DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
+ while (!input_queue_.empty()) {
+ if (!input_queue_.front().Run(input_queue_discards_remaining_ > 0))
+ break; // Operation must be retried later. Stop processing queue.
+ input_queue_.pop();
+ if (input_queue_discards_remaining_ > 0)
+ --input_queue_discards_remaining_;
+ }
+}
+
+bool CastRemotingSender::TryConsumeDataChunk(uint32_t offset, uint32_t size,
+ uint32_t total_payload_size,
+ bool discard_data) {
+ DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
+
+ do {
+ if (!pipe_.is_valid()) {
+ VLOG(1) << SENDER_SSRC << "Data pipe handle no longer valid.";
+ break;
+ }
+
+ if (offset + size > total_payload_size) {
+ LOG(ERROR)
+ << SENDER_SSRC << "BUG: offset + size > total_payload_size ("
+ << offset << " + " << size << " > " << total_payload_size << ')';
+ break;
+ }
+
+ // If the data is to be discarded, do a data pipe read with the DISCARD flag
+ // set.
+ if (discard_data) {
+ const MojoResult result = mojo::ReadDataRaw(
+ pipe_.get(), nullptr, &size,
+ MOJO_READ_DATA_FLAG_DISCARD | MOJO_READ_DATA_FLAG_ALL_OR_NONE);
+ if (result == MOJO_RESULT_OK)
+ return true; // Successfully discarded data.
+ if (result == MOJO_RESULT_OUT_OF_RANGE)
+ return false; // Retry later.
+ LOG(ERROR) << SENDER_SSRC
+ << "Unexpected result when discarding from data pipe ("
+ << result << ')';
+ break;
+ }
+ // If |total_payload_size| has changed, resize the data string. If it has
+ // not changed, the following statement will be a no-op.
+ next_frame_data_.resize(total_payload_size);
+
+ const MojoResult result = mojo::ReadDataRaw(
+ pipe_.get(), base::string_as_array(&next_frame_data_) + offset, &size,
+ MOJO_READ_DATA_FLAG_ALL_OR_NONE);
+ if (result == MOJO_RESULT_OK)
+ return true; // Successfully consumed data.
+ if (result == MOJO_RESULT_OUT_OF_RANGE)
+ return false; // Retry later.
+ LOG(ERROR)
+ << SENDER_SSRC << "Read from data pipe failed (" << result << ')';
+ } while (false);
+
+ // If this point is reached, there was a fatal error. Shut things down and run
+ // the error callback.
+ pipe_watcher_.Cancel();
+ pipe_.reset();
+ binding_.Close();
+ error_callback_.Run();
+ return true;
+}
+
+bool CastRemotingSender::TrySendFrame(bool discard_data) {
+ DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
+
+ // If the frame's data is to be discarded, just return immediately.
+ if (discard_data)
+ return true;
+
+ // If there would be too many frames in-flight, do not proceed.
if (NumberOfFramesInFlight() >= media::cast::kMaxUnackedFrames) {
- // TODO(xjz): Delay the sending of this frame and stop reading the next
- // frame data.
- return;
+ VLOG(1) << SENDER_SSRC
+ << "Cannot send frame now because too many frames are in flight.";
+ return false;
}
VLOG(2) << SENDER_SSRC
@@ -275,10 +428,13 @@ void CastRemotingSender::SendFrame() {
media::cast::EncodedFrame remoting_frame;
remoting_frame.frame_id = frame_id;
- remoting_frame.dependency =
- (is_first_frame_to_be_sent || last_frame_was_canceled_)
- ? media::cast::EncodedFrame::KEY
- : media::cast::EncodedFrame::DEPENDENT;
+ if (flow_restart_pending_) {
+ remoting_frame.dependency = media::cast::EncodedFrame::KEY;
+ flow_restart_pending_ = false;
+ } else {
+ DCHECK(!is_first_frame_to_be_sent);
+ remoting_frame.dependency = media::cast::EncodedFrame::DEPENDENT;
+ }
remoting_frame.referenced_frame_id =
remoting_frame.dependency == media::cast::EncodedFrame::KEY
? frame_id
@@ -317,16 +473,23 @@ void CastRemotingSender::SendFrame() {
cast_environment_->logger()->DispatchFrameEvent(std::move(remoting_event));
RecordLatestFrameTimestamps(frame_id, remoting_frame.rtp_timestamp);
- last_frame_was_canceled_ = false;
transport_->InsertFrame(ssrc_, remoting_frame);
+
+ return true;
}
-void CastRemotingSender::CancelFramesInFlight() {
+void CastRemotingSender::CancelInFlightData() {
DCHECK(cast_environment_->CurrentlyOn(media::cast::CastEnvironment::MAIN));
base::STLClearObject(&next_frame_data_);
+ // TODO(miu): The following code is something we want to do as an
+ // optimization. However, as-is, it's not quite correct. We can only cancel
+ // frames where no packets have actually hit the network yet. Said another
+ // way, we can only cancel frames the receiver has definitely not seen any
+ // part of (including kickstarting!). http://crbug.com/647423
+#if 0
if (latest_acked_frame_id_ < last_sent_frame_id_) {
std::vector<media::cast::FrameId> frames_to_cancel;
do {
@@ -335,8 +498,14 @@ void CastRemotingSender::CancelFramesInFlight() {
} while (latest_acked_frame_id_ < last_sent_frame_id_);
transport_->CancelSendingFrames(ssrc_, frames_to_cancel);
}
+#endif
+
+ // Flag that all pending input operations should discard data.
+ input_queue_discards_remaining_ = input_queue_.size();
- last_frame_was_canceled_ = true;
+ flow_restart_pending_ = true;
+ VLOG(1) << SENDER_SSRC
+ << "Now restarting because in-flight data was just canceled.";
}
} // namespace cast

Powered by Google App Engine
This is Rietveld 408576698