Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1951)

Unified Diff: media/remoting/stream_provider.cc

Issue 2808583002: RELAND: Media Remoting end to end integration tests. (Closed)
Patch Set: Rebased. Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « media/remoting/stream_provider.h ('k') | media/test/BUILD.gn » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: media/remoting/stream_provider.cc
diff --git a/media/remoting/stream_provider.cc b/media/remoting/stream_provider.cc
new file mode 100644
index 0000000000000000000000000000000000000000..3bd9959102eb159a96b7c007e4fa9eab080fc3bf
--- /dev/null
+++ b/media/remoting/stream_provider.cc
@@ -0,0 +1,485 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/remoting/stream_provider.h"
+
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/callback_helpers.h"
+#include "base/logging.h"
+#include "media/base/decoder_buffer.h"
+#include "media/base/video_rotation.h"
+#include "media/remoting/proto_enum_utils.h"
+#include "media/remoting/proto_utils.h"
+
+namespace media {
+namespace remoting {
+
+namespace {
+// The number of frames requested in each ReadUntil RPC message.
+constexpr int kNumFramesInEachReadUntil = 10;
+}
+
+// An implementation of media::DemuxerStream on Media Remoting receiver.
+// Receives data from mojo data pipe, and returns one frame or/and status when
+// Read() is called.
+class MediaStream final : public DemuxerStream {
+ public:
+ MediaStream(RpcBroker* rpc_broker,
+ Type type,
+ int remote_handle,
+ const base::Closure& error_callback);
+ ~MediaStream() override;
+
+ // DemuxerStream implementation.
+ void Read(const ReadCB& read_cb) override;
+ AudioDecoderConfig audio_decoder_config() override;
+ VideoDecoderConfig video_decoder_config() override;
+ DemuxerStream::Type type() const override;
+ Liveness liveness() const override;
+ bool SupportsConfigChanges() override;
+ VideoRotation video_rotation() override;
+
+ void Initialize(const base::Closure& init_done_cb);
+ void FlushUntil(int count);
+ void AppendBuffer(scoped_refptr<DecoderBuffer> buffer);
+
+ private:
+ // RPC messages handlers.
+ void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message);
+ void OnInitializeCallback(std::unique_ptr<pb::RpcMessage> message);
+ void OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message);
+
+ // Issues the ReadUntil RPC message when read is pending and buffer is empty.
+ void SendReadUntil();
+
+ // Run and reset the read callback.
+ void CompleteRead(DemuxerStream::Status status);
+
+ // Update the audio/video decoder config When config changes in the mid
+ // stream, the new config will be stored in
+ // |next_audio/video_decoder_config_|. Old config will be droped when all
+ // associated frames are consumed.
+ void UpdateConfig(const pb::AudioDecoderConfig* audio_message,
+ const pb::VideoDecoderConfig* video_message);
+
+ // Called when any error occurs.
+ void OnError(const std::string& error);
+
+ RpcBroker* const rpc_broker_; // Outlives this class.
+ const Type type_;
+ const int remote_handle_;
+ const int rpc_handle_;
+
+ // Set when Initialize() is called, and will be run only once after
+ // initialization is done.
+ base::Closure init_done_callback_;
+
+ // The read until count in the last ReadUntil RPC message.
+ int last_read_until_count_ = 0;
+
+ // Indicates whether Audio/VideoDecoderConfig changed and the frames with the
+ // old config are not yet consumed. The new config is stored in the end of
+ // |audio/video_decoder_config_|;
+ bool config_changed_ = false;
+
+ // Indicates whether a ReadUntil RPC message was sent without receiving the
+ // ReadUntilCallback message yet.
+ bool read_until_sent_ = false;
+
+ // Set when Read() is called. Run only once when read completes.
+ ReadCB read_complete_callback_;
+
+ base::Closure error_callback_; // Called only once when first error occurs.
+
+ std::deque<scoped_refptr<DecoderBuffer>> buffers_;
+
+ // Current audio/video config.
+ AudioDecoderConfig audio_decoder_config_;
+ VideoDecoderConfig video_decoder_config_;
+
+ // Stores the new auido/video config when config changes.
+ AudioDecoderConfig next_audio_decoder_config_;
+ VideoDecoderConfig next_video_decoder_config_;
+
+ base::WeakPtrFactory<MediaStream> weak_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(MediaStream);
+};
+
+MediaStream::MediaStream(RpcBroker* rpc_broker,
+ Type type,
+ int remote_handle,
+ const base::Closure& error_callback)
+ : rpc_broker_(rpc_broker),
+ type_(type),
+ remote_handle_(remote_handle),
+ rpc_handle_(rpc_broker_->GetUniqueHandle()),
+ error_callback_(error_callback),
+ weak_factory_(this) {
+ DCHECK(remote_handle_ != RpcBroker::kInvalidHandle);
+ rpc_broker_->RegisterMessageReceiverCallback(
+ rpc_handle_,
+ base::Bind(&MediaStream::OnReceivedRpc, weak_factory_.GetWeakPtr()));
+}
+
+MediaStream::~MediaStream() {
+ rpc_broker_->UnregisterMessageReceiverCallback(rpc_handle_);
+}
+
+void MediaStream::Initialize(const base::Closure& init_done_cb) {
+ DCHECK(!init_done_cb.is_null());
+ if (!init_done_callback_.is_null()) {
+ OnError("Duplicate initialization");
+ return;
+ }
+ init_done_callback_ = init_done_cb;
+
+ DVLOG(3) << __func__ << "Issues RpcMessage::RPC_DS_INITIALIZE with "
+ << "remote_handle=" << remote_handle_
+ << " rpc_handle=" << rpc_handle_;
+ std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ rpc->set_handle(remote_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE);
+ rpc->set_integer_value(rpc_handle_);
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+}
+
+void MediaStream::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(message->handle() == rpc_handle_);
+
+ switch (message->proc()) {
+ case pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK:
+ OnInitializeCallback(std::move(message));
+ break;
+ case pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK:
+ OnReadUntilCallback(std::move(message));
+ break;
+ default:
+ VLOG(3) << __func__ << "Unknow RPC message.";
+ }
+}
+
+void MediaStream::OnInitializeCallback(
+ std::unique_ptr<pb::RpcMessage> message) {
+ DVLOG(3) << __func__ << "Receives RPC_DS_INITIALIZE_CALLBACK message.";
+ const pb::DemuxerStreamInitializeCallback callback_message =
+ message->demuxerstream_initializecb_rpc();
+ if (callback_message.type() != type_) {
+ OnError("Wrong type");
+ return;
+ }
+ if ((type_ == DemuxerStream::AUDIO &&
+ audio_decoder_config_.IsValidConfig()) ||
+ (type_ == DemuxerStream::VIDEO &&
+ video_decoder_config_.IsValidConfig())) {
+ OnError("Duplicate Iniitialize");
+ return;
+ }
+ if (init_done_callback_.is_null()) {
+ OnError("Iniitialize callback missing");
+ return;
+ }
+
+ if (type_ == DemuxerStream::AUDIO &&
+ callback_message.has_audio_decoder_config()) {
+ const pb::AudioDecoderConfig audio_message =
+ callback_message.audio_decoder_config();
+ UpdateConfig(&audio_message, nullptr);
+ } else if (type_ == DemuxerStream::VIDEO &&
+ callback_message.has_video_decoder_config()) {
+ const pb::VideoDecoderConfig video_message =
+ callback_message.video_decoder_config();
+ UpdateConfig(nullptr, &video_message);
+ } else {
+ OnError("config missing");
+ return;
+ }
+ base::ResetAndReturn(&init_done_callback_).Run();
+}
+
+void MediaStream::OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message) {
+ DVLOG(3) << __func__ << ": Receives RPC_DS_READUNTIL_CALLBACK message.";
+ if (!read_until_sent_) {
+ OnError("Unexpected ReadUntilCallback");
+ return;
+ }
+ read_until_sent_ = false;
+ const pb::DemuxerStreamReadUntilCallback callback_message =
+ message->demuxerstream_readuntilcb_rpc();
+ last_read_until_count_ = callback_message.count();
+ if (ToDemuxerStreamStatus(callback_message.status()) == kConfigChanged) {
+ config_changed_ = true;
+ if (callback_message.has_audio_decoder_config()) {
+ const pb::AudioDecoderConfig audio_message =
+ callback_message.audio_decoder_config();
+ UpdateConfig(&audio_message, nullptr);
+ }
+ if (callback_message.has_video_decoder_config()) {
+ const pb::VideoDecoderConfig video_message =
+ callback_message.video_decoder_config();
+ UpdateConfig(nullptr, &video_message);
+ }
+ if (buffers_.empty() && !read_complete_callback_.is_null())
+ CompleteRead(DemuxerStream::kConfigChanged);
+ return;
+ }
+ if (buffers_.empty() && !read_complete_callback_.is_null())
+ SendReadUntil();
+}
+
+void MediaStream::UpdateConfig(const pb::AudioDecoderConfig* audio_message,
+ const pb::VideoDecoderConfig* video_message) {
+ if (type_ == AUDIO) {
+ DCHECK(audio_message && !video_message);
+ AudioDecoderConfig audio_config;
+ ConvertProtoToAudioDecoderConfig(*audio_message, &audio_config);
+ if (!audio_config.IsValidConfig()) {
+ OnError("Invalid audio config");
+ return;
+ }
+ if (config_changed_) {
+ DCHECK(audio_decoder_config_.IsValidConfig());
+ DCHECK(!next_audio_decoder_config_.IsValidConfig());
+ next_audio_decoder_config_ = audio_config;
+ } else {
+ DCHECK(!audio_decoder_config_.IsValidConfig());
+ audio_decoder_config_ = audio_config;
+ }
+ } else if (type_ == VIDEO) {
+ DCHECK(video_message && !audio_message);
+ VideoDecoderConfig video_config;
+ ConvertProtoToVideoDecoderConfig(*video_message, &video_config);
+ if (!video_config.IsValidConfig()) {
+ OnError("Invalid video config");
+ return;
+ }
+ if (config_changed_) {
+ DCHECK(video_decoder_config_.IsValidConfig());
+ DCHECK(!next_video_decoder_config_.IsValidConfig());
+ next_video_decoder_config_ = video_config;
+ } else {
+ DCHECK(!video_decoder_config_.IsValidConfig());
+ video_decoder_config_ = video_config;
+ }
+ } else {
+ NOTREACHED() << ": Only supports video or audio stream.";
+ }
+}
+
+void MediaStream::SendReadUntil() {
+ if (read_until_sent_)
+ return;
+ DVLOG(3) << "Issues RPC_DS_READUNTIL RPC message to remote_handle_="
+ << remote_handle_ << " with callback handle=" << rpc_handle_
+ << " count=" << last_read_until_count_;
+
+ std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ rpc->set_handle(remote_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL);
+ auto* message = rpc->mutable_demuxerstream_readuntil_rpc();
+ last_read_until_count_ += kNumFramesInEachReadUntil;
+ message->set_count(last_read_until_count_);
+ message->set_callback_handle(rpc_handle_);
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+ read_until_sent_ = true;
+}
+
+void MediaStream::FlushUntil(int count) {
+ while (!buffers_.empty()) {
+ buffers_.pop_front();
+ }
+
+ last_read_until_count_ = count;
+ if (!read_complete_callback_.is_null())
+ CompleteRead(DemuxerStream::kAborted);
+ read_until_sent_ = false;
+}
+
+void MediaStream::Read(const ReadCB& read_cb) {
+ DCHECK(read_complete_callback_.is_null());
+ DCHECK(!read_cb.is_null());
+ read_complete_callback_ = read_cb;
+ if (buffers_.empty() && config_changed_) {
+ CompleteRead(DemuxerStream::kConfigChanged);
+ return;
+ }
+
+ // Wait for more data.
+ if (buffers_.empty()) {
+ SendReadUntil();
+ return;
+ }
+
+ CompleteRead(DemuxerStream::kOk);
+}
+
+void MediaStream::CompleteRead(DemuxerStream::Status status) {
+ DVLOG(3) << __func__ << ": " << status;
+ switch (status) {
+ case DemuxerStream::kConfigChanged:
+ if (type_ == AUDIO) {
+ DCHECK(next_audio_decoder_config_.IsValidConfig());
+ audio_decoder_config_ = next_audio_decoder_config_;
+#if DCHECK_IS_ON()
+ next_audio_decoder_config_ = AudioDecoderConfig();
+#endif // DCHECK_IS_ON()
+ } else {
+ DCHECK(next_video_decoder_config_.IsValidConfig());
+ video_decoder_config_ = next_video_decoder_config_;
+#if DCHECK_IS_ON()
+ next_video_decoder_config_ = VideoDecoderConfig();
+#endif // DCHECK_IS_ON()
+ }
+ config_changed_ = false;
+ base::ResetAndReturn(&read_complete_callback_).Run(status, nullptr);
+ return;
+ case DemuxerStream::kAborted:
+ base::ResetAndReturn(&read_complete_callback_).Run(status, nullptr);
+ return;
+ case DemuxerStream::kOk:
+ DCHECK(!buffers_.empty());
+ scoped_refptr<DecoderBuffer> frame_data = buffers_.front();
+ buffers_.pop_front();
+ base::ResetAndReturn(&read_complete_callback_).Run(status, frame_data);
+ return;
+ }
+}
+
+AudioDecoderConfig MediaStream::audio_decoder_config() {
+ DVLOG(3) << __func__;
+ DCHECK(type_ == DemuxerStream::AUDIO);
+ return audio_decoder_config_;
+}
+
+VideoDecoderConfig MediaStream::video_decoder_config() {
+ DVLOG(3) << __func__;
+ DCHECK(type_ == DemuxerStream::VIDEO);
+ return video_decoder_config_;
+}
+
+DemuxerStream::Type MediaStream::type() const {
+ return type_;
+}
+
+DemuxerStream::Liveness MediaStream::liveness() const {
+ return DemuxerStream::LIVENESS_LIVE;
+}
+
+bool MediaStream::SupportsConfigChanges() {
+ return true;
+}
+
+VideoRotation MediaStream::video_rotation() {
+ return VideoRotation::VIDEO_ROTATION_0;
+}
+
+void MediaStream::AppendBuffer(scoped_refptr<DecoderBuffer> buffer) {
+ DVLOG(3) << __func__;
+ buffers_.push_back(buffer);
+ if (!read_complete_callback_.is_null())
+ CompleteRead(DemuxerStream::kOk);
+}
+
+void MediaStream::OnError(const std::string& error) {
+ VLOG(1) << __func__ << ": " << error;
+ if (error_callback_.is_null())
+ return;
+ base::ResetAndReturn(&error_callback_).Run();
+}
+
+StreamProvider::StreamProvider(RpcBroker* rpc_broker,
+ const base::Closure& error_callback)
+ : rpc_broker_(rpc_broker),
+ error_callback_(error_callback),
+ weak_factory_(this) {}
+
+StreamProvider::~StreamProvider() {}
+
+void StreamProvider::Initialize(int remote_audio_handle,
+ int remote_video_handle,
+ const base::Closure& callback) {
+ DVLOG(3) << __func__ << ": remote_audio_handle=" << remote_audio_handle
+ << " remote_video_handle=" << remote_video_handle;
+ if (!init_done_callback_.is_null()) {
+ OnError("Duplicate initialization.");
+ return;
+ }
+ if (remote_audio_handle == RpcBroker::kInvalidHandle &&
+ remote_video_handle == RpcBroker::kInvalidHandle) {
+ OnError("Invalid handle.");
+ return;
+ }
+
+ init_done_callback_ = callback;
+ if (remote_audio_handle != RpcBroker::kInvalidHandle) {
+ audio_stream_.reset(new MediaStream(
+ rpc_broker_, DemuxerStream::AUDIO, remote_audio_handle,
+ base::Bind(&StreamProvider::OnError, weak_factory_.GetWeakPtr(),
+ "Media stream error")));
+ audio_stream_->Initialize(base::Bind(
+ &StreamProvider::AudioStreamInitialized, weak_factory_.GetWeakPtr()));
+ }
+ if (remote_video_handle != RpcBroker::kInvalidHandle) {
+ video_stream_.reset(new MediaStream(
+ rpc_broker_, DemuxerStream::VIDEO, remote_video_handle,
+ base::Bind(&StreamProvider::OnError, weak_factory_.GetWeakPtr(),
+ "Media stream error")));
+ video_stream_->Initialize(base::Bind(
+ &StreamProvider::VideoStreamInitialized, weak_factory_.GetWeakPtr()));
+ }
+}
+
+void StreamProvider::OnError(const std::string& error) {
+ VLOG(1) << __func__ << ": " << error;
+ if (error_callback_.is_null())
+ return;
+ base::ResetAndReturn(&error_callback_).Run();
+}
+
+void StreamProvider::AudioStreamInitialized() {
+ DCHECK(!init_done_callback_.is_null());
+ audio_stream_initialized_ = true;
+ if (video_stream_initialized_ || !video_stream_)
+ base::ResetAndReturn(&init_done_callback_).Run();
+}
+
+void StreamProvider::VideoStreamInitialized() {
+ DCHECK(!init_done_callback_.is_null());
+ video_stream_initialized_ = true;
+ if (audio_stream_initialized_ || !audio_stream_)
+ base::ResetAndReturn(&init_done_callback_).Run();
+}
+
+std::vector<DemuxerStream*> StreamProvider::GetAllStreams() {
+ std::vector<DemuxerStream*> streams;
+ if (audio_stream_)
+ streams.push_back(audio_stream_.get());
+ if (video_stream_)
+ streams.push_back(video_stream_.get());
+ return streams;
+}
+
+void StreamProvider::AppendBuffer(DemuxerStream::Type type,
+ scoped_refptr<DecoderBuffer> buffer) {
+ if (type == DemuxerStream::AUDIO)
+ audio_stream_->AppendBuffer(buffer);
+ else if (type == DemuxerStream::VIDEO)
+ video_stream_->AppendBuffer(buffer);
+ else
+ NOTREACHED() << ": Only supports video or audio stream.";
+}
+
+void StreamProvider::FlushUntil(DemuxerStream::Type type, int count) {
+ DVLOG(3) << __func__ << ": type=" << type << " count=" << count;
+ if (type == DemuxerStream::AUDIO)
+ audio_stream_->FlushUntil(count);
+ else if (type == DemuxerStream::VIDEO)
+ video_stream_->FlushUntil(count);
+ else
+ NOTREACHED() << ": Only supports video or audio stream.";
+}
+
+} // namespace remoting
+} // namespace media
« no previous file with comments | « media/remoting/stream_provider.h ('k') | media/test/BUILD.gn » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698