Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1508)

Unified Diff: media/remoting/remote_demuxer_stream_adapter.cc

Issue 2643253003: Media Remoting Clean-up: Less-redundant naming, style consistency, etc. (Closed)
Patch Set: REBASE Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: media/remoting/remote_demuxer_stream_adapter.cc
diff --git a/media/remoting/remote_demuxer_stream_adapter.cc b/media/remoting/remote_demuxer_stream_adapter.cc
deleted file mode 100644
index 9929c4a8af1adcb91e01b9902191760e94b918dc..0000000000000000000000000000000000000000
--- a/media/remoting/remote_demuxer_stream_adapter.cc
+++ /dev/null
@@ -1,447 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "media/remoting/remote_demuxer_stream_adapter.h"
-
-#include "base/base64.h"
-#include "base/bind.h"
-#include "base/callback_helpers.h"
-#include "media/base/bind_to_current_loop.h"
-#include "media/base/decoder_buffer.h"
-#include "media/base/timestamp_constants.h"
-#include "media/remoting/rpc/proto_enum_utils.h"
-#include "media/remoting/rpc/proto_utils.h"
-
-// Convenience logging macro used throughout this file.
-#define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: "
-
-namespace media {
-namespace remoting {
-
-// static
-mojo::DataPipe* CreateDataPipe() {
- // Capacity in bytes for Mojo data pipe.
- constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024;
-
- MojoCreateDataPipeOptions options;
- options.struct_size = sizeof(MojoCreateDataPipeOptions);
- options.flags = MOJO_WRITE_DATA_FLAG_NONE;
- options.element_num_bytes = 1;
- options.capacity_num_bytes = kMojoDataPipeCapacityInBytes;
- return new mojo::DataPipe(options);
-}
-
-RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter(
- scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
- scoped_refptr<base::SingleThreadTaskRunner> media_task_runner,
- const std::string& name,
- ::media::DemuxerStream* demuxer_stream,
- const base::WeakPtr<RpcBroker>& rpc_broker,
- int rpc_handle,
- mojom::RemotingDataStreamSenderPtrInfo stream_sender_info,
- mojo::ScopedDataPipeProducerHandle producer_handle,
- const ErrorCallback& error_callback)
- : main_task_runner_(std::move(main_task_runner)),
- media_task_runner_(std::move(media_task_runner)),
- name_(name),
- rpc_broker_(rpc_broker),
- rpc_handle_(rpc_handle),
- demuxer_stream_(demuxer_stream),
- type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN),
- error_callback_(error_callback),
- remote_callback_handle_(kInvalidHandle),
- read_until_callback_handle_(kInvalidHandle),
- read_until_count_(0),
- last_count_(0),
- pending_flush_(false),
- current_pending_frame_offset_(0),
- pending_frame_is_eos_(false),
- media_status_(::media::DemuxerStream::kOk),
- producer_handle_(std::move(producer_handle)),
- bytes_written_to_pipe_(0),
- request_buffer_weak_factory_(this),
- weak_factory_(this) {
- DCHECK(main_task_runner_);
- DCHECK(media_task_runner_);
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- DCHECK(demuxer_stream);
- DCHECK(!error_callback.is_null());
- const RpcBroker::ReceiveMessageCallback receive_callback =
- media::BindToCurrentLoop(
- base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc,
- weak_factory_.GetWeakPtr()));
- main_task_runner_->PostTask(
- FROM_HERE,
- base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback,
- rpc_broker_, rpc_handle_, receive_callback));
-
- stream_sender_.Bind(std::move(stream_sender_info));
- stream_sender_.set_connection_error_handler(
- base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError,
- weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR));
-}
-
-RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- main_task_runner_->PostTask(
- FROM_HERE,
- base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback,
- rpc_broker_, rpc_handle_));
-}
-
-int64_t RemoteDemuxerStreamAdapter::GetBytesWrittenAndReset() {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- const int64_t current_count = bytes_written_to_pipe_;
- bytes_written_to_pipe_ = 0;
- return current_count;
-}
-
-base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush(
- bool flushing) {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- DEMUXER_VLOG(2) << "flushing=" << flushing;
-
- // Ignores if |pending_flush_| states is same.
- if (pending_flush_ == flushing)
- return base::nullopt;
-
- // Cleans up pending frame data.
- pending_frame_.clear();
- current_pending_frame_offset_ = 0;
- pending_frame_is_eos_ = false;
- // Invalidates pending Read() tasks.
- request_buffer_weak_factory_.InvalidateWeakPtrs();
-
- // Cancels in flight data in browser process.
- pending_flush_ = flushing;
- if (flushing) {
- stream_sender_->CancelInFlightData();
- } else {
- // Sets callback handle invalid to abort ongoing read request.
- read_until_callback_handle_ = kInvalidHandle;
- }
- return last_count_;
-}
-
-void RemoteDemuxerStreamAdapter::OnReceivedRpc(
- std::unique_ptr<remoting::pb::RpcMessage> message) {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- DCHECK(message);
- DCHECK(rpc_handle_ == message->handle());
-
- switch (message->proc()) {
- case remoting::pb::RpcMessage::RPC_DS_INITIALIZE:
- Initialize(message->integer_value());
- break;
- case remoting::pb::RpcMessage::RPC_DS_READUNTIL:
- ReadUntil(std::move(message));
- break;
- case remoting::pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER:
- EnableBitstreamConverter();
- break;
-
- default:
- DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc();
- }
-}
-
-void RemoteDemuxerStreamAdapter::Initialize(int remote_callback_handle) {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- DCHECK(!pending_flush_);
- DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle="
- << remote_callback_handle;
-
- // Checks if initialization had been called or not.
- if (remote_callback_handle_ != kInvalidHandle) {
- DEMUXER_VLOG(1) << "Duplicated initialization. Have: "
- << remote_callback_handle_
- << ", Given: " << remote_callback_handle;
- // Shuts down data pipe if available if providing different remote callback
- // handle for initialization. Otherwise, just silently ignore the duplicated
- // request.
- if (remote_callback_handle_ != remote_callback_handle) {
- OnFatalError(PEERS_OUT_OF_SYNC);
- }
- return;
- }
- remote_callback_handle_ = remote_callback_handle;
-
- // Issues RPC_DS_INITIALIZE_CALLBACK RPC message.
- std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage());
- rpc->set_handle(remote_callback_handle_);
- rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK);
- auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc();
- init_cb_message->set_type(type_);
- switch (type_) {
- case ::media::DemuxerStream::Type::AUDIO: {
- audio_config_ = demuxer_stream_->audio_decoder_config();
- pb::AudioDecoderConfig* audio_message =
- init_cb_message->mutable_audio_decoder_config();
- ConvertAudioDecoderConfigToProto(audio_config_, audio_message);
- break;
- }
- case ::media::DemuxerStream::Type::VIDEO: {
- video_config_ = demuxer_stream_->video_decoder_config();
- pb::VideoDecoderConfig* video_message =
- init_cb_message->mutable_video_decoder_config();
- ConvertVideoDecoderConfigToProto(video_config_, video_message);
- break;
- }
- default:
- NOTREACHED();
- }
-
- DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle()
- << " with decoder_config={"
- << (type_ == ::media::DemuxerStream::Type::AUDIO
- ? audio_config_.AsHumanReadableString()
- : video_config_.AsHumanReadableString())
- << '}';
- main_task_runner_->PostTask(
- FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote,
- rpc_broker_, base::Passed(&rpc)));
-
- // Starts Mojo watcher.
- if (!write_watcher_.IsWatching()) {
- DEMUXER_VLOG(2) << "Start Mojo data pipe watcher";
- write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
- base::Bind(&RemoteDemuxerStreamAdapter::TryWriteData,
- weak_factory_.GetWeakPtr()));
- }
-}
-
-void RemoteDemuxerStreamAdapter::ReadUntil(
- std::unique_ptr<remoting::pb::RpcMessage> message) {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- DCHECK(message);
- if (!message->has_demuxerstream_readuntil_rpc()) {
- DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC";
- OnFatalError(RPC_INVALID);
- return;
- }
-
- const pb::DemuxerStreamReadUntil& rpc_message =
- message->demuxerstream_readuntil_rpc();
- DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle="
- << rpc_message.callback_handle()
- << ", count=" << rpc_message.count();
-
- if (pending_flush_) {
- DEMUXER_VLOG(2) << "Skip actions since it's in the flushing state";
- return;
- }
-
- if (IsProcessingReadRequest()) {
- DEMUXER_VLOG(2) << "Ignore read request while it's in the reading state.";
- return;
- }
-
- if (rpc_message.count() <= last_count_) {
- DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to "
- "current frame count";
- return;
- }
-
- read_until_count_ = rpc_message.count();
- read_until_callback_handle_ = rpc_message.callback_handle();
- RequestBuffer();
-}
-
-void RemoteDemuxerStreamAdapter::EnableBitstreamConverter() {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER";
- demuxer_stream_->EnableBitstreamConverter();
-}
-
-void RemoteDemuxerStreamAdapter::RequestBuffer() {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- if (!IsProcessingReadRequest() || pending_flush_) {
- DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state";
- return;
- }
- demuxer_stream_->Read(base::Bind(&RemoteDemuxerStreamAdapter::OnNewBuffer,
- request_buffer_weak_factory_.GetWeakPtr()));
-}
-
-void RemoteDemuxerStreamAdapter::OnNewBuffer(
- ::media::DemuxerStream::Status status,
- const scoped_refptr<::media::DecoderBuffer>& input) {
- DEMUXER_VLOG(3) << "status=" << status;
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- if (!IsProcessingReadRequest() || pending_flush_) {
- DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state";
- return;
- }
-
- switch (status) {
- case ::media::DemuxerStream::kAborted:
- DCHECK(!input);
- SendReadAck();
- return;
- case ::media::DemuxerStream::kConfigChanged:
- // TODO(erickung): consider sending updated Audio/Video decoder config to
- // RemotingRendererController.
- // Stores available audio/video decoder config and issues
- // RPC_DS_READUNTIL_CALLBACK RPC to notify receiver.
- DCHECK(!input);
- media_status_ = status;
- if (demuxer_stream_->type() == ::media::DemuxerStream::VIDEO)
- video_config_ = demuxer_stream_->video_decoder_config();
- if (demuxer_stream_->type() == ::media::DemuxerStream::AUDIO)
- audio_config_ = demuxer_stream_->audio_decoder_config();
- SendReadAck();
- return;
- case ::media::DemuxerStream::kOk: {
- media_status_ = status;
- DCHECK(pending_frame_.empty());
- if (!producer_handle_.is_valid())
- return; // Do not start sending (due to previous fatal error).
- pending_frame_ = DecoderBufferToByteArray(input);
- pending_frame_is_eos_ = input->end_of_stream();
- TryWriteData(MOJO_RESULT_OK);
- } break;
- }
-}
-
-void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- // The Mojo watcher will also call TryWriteData() sometimes as a notification
- // that data pipe is ready. But that does not necessarily mean the data for a
- // read request is ready to be written into the pipe.
- if (!IsProcessingReadRequest() || pending_flush_) {
- DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state";
- return;
- }
-
- if (pending_frame_.empty()) {
- DEMUXER_VLOG(3) << "No data available, waiting for demuxer";
- return;
- }
-
- if (!stream_sender_ || !producer_handle_.is_valid()) {
- DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid";
- return;
- }
-
- uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_;
- MojoResult mojo_result =
- WriteDataRaw(producer_handle_.get(),
- pending_frame_.data() + current_pending_frame_offset_,
- &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
- if (mojo_result != MOJO_RESULT_OK) {
- if (mojo_result != MOJO_RESULT_SHOULD_WAIT) {
- DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:"
- << mojo_result;
- OnFatalError(MOJO_PIPE_ERROR);
- }
- return;
- }
-
- stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes,
- pending_frame_.size());
- current_pending_frame_offset_ += num_bytes;
- bytes_written_to_pipe_ += num_bytes;
-
- // Checks if all buffer was written to browser process.
- if (current_pending_frame_offset_ != pending_frame_.size()) {
- // Returns and wait for mojo watcher to notify to write more data.
- return;
- }
-
- // Signal mojo remoting service that all frame buffer is written to data pipe.
- stream_sender_->SendFrame();
-
- // Resets frame buffer variables.
- bool pending_frame_is_eos = pending_frame_is_eos_;
- ++last_count_;
- ResetPendingFrame();
-
- // Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message.
- if (read_until_count_ == last_count_ || pending_frame_is_eos) {
- SendReadAck();
- return;
- }
-
- // Contiune to read decoder buffer until reaching |read_until_count_| or
- // end of stream.
- media_task_runner_->PostTask(
- FROM_HERE, base::Bind(&RemoteDemuxerStreamAdapter::RequestBuffer,
- weak_factory_.GetWeakPtr()));
-}
-
-void RemoteDemuxerStreamAdapter::SendReadAck() {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- DEMUXER_VLOG(3) << "last_count_=" << last_count_
- << ", remote_read_callback_handle="
- << read_until_callback_handle_
- << ", media_status=" << media_status_;
- // Issues RPC_DS_READUNTIL_CALLBACK RPC message.
- std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage());
- rpc->set_handle(read_until_callback_handle_);
- rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK);
- auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc();
- message->set_count(last_count_);
- message->set_status(
- remoting::ToProtoDemuxerStreamStatus(media_status_).value());
- if (media_status_ == ::media::DemuxerStream::kConfigChanged) {
- if (audio_config_.IsValidConfig()) {
- pb::AudioDecoderConfig* audio_message =
- message->mutable_audio_decoder_config();
- ConvertAudioDecoderConfigToProto(audio_config_, audio_message);
- } else if (video_config_.IsValidConfig()) {
- pb::VideoDecoderConfig* video_message =
- message->mutable_video_decoder_config();
- ConvertVideoDecoderConfigToProto(video_config_, video_message);
- } else {
- NOTREACHED();
- }
- }
-
- DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle()
- << " with count=" << message->count()
- << ", status=" << message->status() << ", decoder_config={"
- << (audio_config_.IsValidConfig()
- ? audio_config_.AsHumanReadableString()
- : video_config_.IsValidConfig()
- ? video_config_.AsHumanReadableString()
- : "DID NOT CHANGE")
- << '}';
- main_task_runner_->PostTask(
- FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote,
- rpc_broker_, base::Passed(&rpc)));
- // Resets callback handle after completing the reading request.
- read_until_callback_handle_ = kInvalidHandle;
-
- // Resets audio/video decoder config since it only sends once.
- if (audio_config_.IsValidConfig())
- audio_config_ = ::media::AudioDecoderConfig();
- if (video_config_.IsValidConfig())
- video_config_ = ::media::VideoDecoderConfig();
-}
-
-void RemoteDemuxerStreamAdapter::ResetPendingFrame() {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
- current_pending_frame_offset_ = 0;
- pending_frame_.clear();
- pending_frame_is_eos_ = false;
-}
-
-void RemoteDemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
-
- DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger;
-
- if (error_callback_.is_null())
- return;
-
- if (write_watcher_.IsWatching()) {
- DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher";
- write_watcher_.Cancel();
- }
-
- base::ResetAndReturn(&error_callback_).Run(stop_trigger);
-}
-
-} // namespace remoting
-} // namespace media
« no previous file with comments | « media/remoting/remote_demuxer_stream_adapter.h ('k') | media/remoting/remote_demuxer_stream_adapter_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698