Index: media/remoting/remote_demuxer_stream_adapter.cc |
diff --git a/media/remoting/remote_demuxer_stream_adapter.cc b/media/remoting/remote_demuxer_stream_adapter.cc |
deleted file mode 100644 |
index 9929c4a8af1adcb91e01b9902191760e94b918dc..0000000000000000000000000000000000000000 |
--- a/media/remoting/remote_demuxer_stream_adapter.cc |
+++ /dev/null |
@@ -1,447 +0,0 @@ |
-// Copyright 2016 The Chromium Authors. All rights reserved. |
-// Use of this source code is governed by a BSD-style license that can be |
-// found in the LICENSE file. |
- |
-#include "media/remoting/remote_demuxer_stream_adapter.h" |
- |
-#include "base/base64.h" |
-#include "base/bind.h" |
-#include "base/callback_helpers.h" |
-#include "media/base/bind_to_current_loop.h" |
-#include "media/base/decoder_buffer.h" |
-#include "media/base/timestamp_constants.h" |
-#include "media/remoting/rpc/proto_enum_utils.h" |
-#include "media/remoting/rpc/proto_utils.h" |
- |
-// Convenience logging macro used throughout this file. |
-#define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " |
- |
-namespace media { |
-namespace remoting { |
- |
-// static |
-mojo::DataPipe* CreateDataPipe() { |
- // Capacity in bytes for Mojo data pipe. |
- constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; |
- |
- MojoCreateDataPipeOptions options; |
- options.struct_size = sizeof(MojoCreateDataPipeOptions); |
- options.flags = MOJO_WRITE_DATA_FLAG_NONE; |
- options.element_num_bytes = 1; |
- options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; |
- return new mojo::DataPipe(options); |
-} |
- |
-RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( |
- scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, |
- scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, |
- const std::string& name, |
- ::media::DemuxerStream* demuxer_stream, |
- const base::WeakPtr<RpcBroker>& rpc_broker, |
- int rpc_handle, |
- mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, |
- mojo::ScopedDataPipeProducerHandle producer_handle, |
- const ErrorCallback& error_callback) |
- : main_task_runner_(std::move(main_task_runner)), |
- media_task_runner_(std::move(media_task_runner)), |
- name_(name), |
- rpc_broker_(rpc_broker), |
- rpc_handle_(rpc_handle), |
- demuxer_stream_(demuxer_stream), |
- type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), |
- error_callback_(error_callback), |
- remote_callback_handle_(kInvalidHandle), |
- read_until_callback_handle_(kInvalidHandle), |
- read_until_count_(0), |
- last_count_(0), |
- pending_flush_(false), |
- current_pending_frame_offset_(0), |
- pending_frame_is_eos_(false), |
- media_status_(::media::DemuxerStream::kOk), |
- producer_handle_(std::move(producer_handle)), |
- bytes_written_to_pipe_(0), |
- request_buffer_weak_factory_(this), |
- weak_factory_(this) { |
- DCHECK(main_task_runner_); |
- DCHECK(media_task_runner_); |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- DCHECK(demuxer_stream); |
- DCHECK(!error_callback.is_null()); |
- const RpcBroker::ReceiveMessageCallback receive_callback = |
- media::BindToCurrentLoop( |
- base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc, |
- weak_factory_.GetWeakPtr())); |
- main_task_runner_->PostTask( |
- FROM_HERE, |
- base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback, |
- rpc_broker_, rpc_handle_, receive_callback)); |
- |
- stream_sender_.Bind(std::move(stream_sender_info)); |
- stream_sender_.set_connection_error_handler( |
- base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError, |
- weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR)); |
-} |
- |
-RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- main_task_runner_->PostTask( |
- FROM_HERE, |
- base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback, |
- rpc_broker_, rpc_handle_)); |
-} |
- |
-int64_t RemoteDemuxerStreamAdapter::GetBytesWrittenAndReset() { |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- const int64_t current_count = bytes_written_to_pipe_; |
- bytes_written_to_pipe_ = 0; |
- return current_count; |
-} |
- |
-base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush( |
- bool flushing) { |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- DEMUXER_VLOG(2) << "flushing=" << flushing; |
- |
- // Ignores if |pending_flush_| states is same. |
- if (pending_flush_ == flushing) |
- return base::nullopt; |
- |
- // Cleans up pending frame data. |
- pending_frame_.clear(); |
- current_pending_frame_offset_ = 0; |
- pending_frame_is_eos_ = false; |
- // Invalidates pending Read() tasks. |
- request_buffer_weak_factory_.InvalidateWeakPtrs(); |
- |
- // Cancels in flight data in browser process. |
- pending_flush_ = flushing; |
- if (flushing) { |
- stream_sender_->CancelInFlightData(); |
- } else { |
- // Sets callback handle invalid to abort ongoing read request. |
- read_until_callback_handle_ = kInvalidHandle; |
- } |
- return last_count_; |
-} |
- |
-void RemoteDemuxerStreamAdapter::OnReceivedRpc( |
- std::unique_ptr<remoting::pb::RpcMessage> message) { |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- DCHECK(message); |
- DCHECK(rpc_handle_ == message->handle()); |
- |
- switch (message->proc()) { |
- case remoting::pb::RpcMessage::RPC_DS_INITIALIZE: |
- Initialize(message->integer_value()); |
- break; |
- case remoting::pb::RpcMessage::RPC_DS_READUNTIL: |
- ReadUntil(std::move(message)); |
- break; |
- case remoting::pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER: |
- EnableBitstreamConverter(); |
- break; |
- |
- default: |
- DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc(); |
- } |
-} |
- |
-void RemoteDemuxerStreamAdapter::Initialize(int remote_callback_handle) { |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- DCHECK(!pending_flush_); |
- DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle=" |
- << remote_callback_handle; |
- |
- // Checks if initialization had been called or not. |
- if (remote_callback_handle_ != kInvalidHandle) { |
- DEMUXER_VLOG(1) << "Duplicated initialization. Have: " |
- << remote_callback_handle_ |
- << ", Given: " << remote_callback_handle; |
- // Shuts down data pipe if available if providing different remote callback |
- // handle for initialization. Otherwise, just silently ignore the duplicated |
- // request. |
- if (remote_callback_handle_ != remote_callback_handle) { |
- OnFatalError(PEERS_OUT_OF_SYNC); |
- } |
- return; |
- } |
- remote_callback_handle_ = remote_callback_handle; |
- |
- // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. |
- std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); |
- rpc->set_handle(remote_callback_handle_); |
- rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); |
- auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); |
- init_cb_message->set_type(type_); |
- switch (type_) { |
- case ::media::DemuxerStream::Type::AUDIO: { |
- audio_config_ = demuxer_stream_->audio_decoder_config(); |
- pb::AudioDecoderConfig* audio_message = |
- init_cb_message->mutable_audio_decoder_config(); |
- ConvertAudioDecoderConfigToProto(audio_config_, audio_message); |
- break; |
- } |
- case ::media::DemuxerStream::Type::VIDEO: { |
- video_config_ = demuxer_stream_->video_decoder_config(); |
- pb::VideoDecoderConfig* video_message = |
- init_cb_message->mutable_video_decoder_config(); |
- ConvertVideoDecoderConfigToProto(video_config_, video_message); |
- break; |
- } |
- default: |
- NOTREACHED(); |
- } |
- |
- DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle() |
- << " with decoder_config={" |
- << (type_ == ::media::DemuxerStream::Type::AUDIO |
- ? audio_config_.AsHumanReadableString() |
- : video_config_.AsHumanReadableString()) |
- << '}'; |
- main_task_runner_->PostTask( |
- FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote, |
- rpc_broker_, base::Passed(&rpc))); |
- |
- // Starts Mojo watcher. |
- if (!write_watcher_.IsWatching()) { |
- DEMUXER_VLOG(2) << "Start Mojo data pipe watcher"; |
- write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, |
- base::Bind(&RemoteDemuxerStreamAdapter::TryWriteData, |
- weak_factory_.GetWeakPtr())); |
- } |
-} |
- |
-void RemoteDemuxerStreamAdapter::ReadUntil( |
- std::unique_ptr<remoting::pb::RpcMessage> message) { |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- DCHECK(message); |
- if (!message->has_demuxerstream_readuntil_rpc()) { |
- DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; |
- OnFatalError(RPC_INVALID); |
- return; |
- } |
- |
- const pb::DemuxerStreamReadUntil& rpc_message = |
- message->demuxerstream_readuntil_rpc(); |
- DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle=" |
- << rpc_message.callback_handle() |
- << ", count=" << rpc_message.count(); |
- |
- if (pending_flush_) { |
- DEMUXER_VLOG(2) << "Skip actions since it's in the flushing state"; |
- return; |
- } |
- |
- if (IsProcessingReadRequest()) { |
- DEMUXER_VLOG(2) << "Ignore read request while it's in the reading state."; |
- return; |
- } |
- |
- if (rpc_message.count() <= last_count_) { |
- DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to " |
- "current frame count"; |
- return; |
- } |
- |
- read_until_count_ = rpc_message.count(); |
- read_until_callback_handle_ = rpc_message.callback_handle(); |
- RequestBuffer(); |
-} |
- |
-void RemoteDemuxerStreamAdapter::EnableBitstreamConverter() { |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER"; |
- demuxer_stream_->EnableBitstreamConverter(); |
-} |
- |
-void RemoteDemuxerStreamAdapter::RequestBuffer() { |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- if (!IsProcessingReadRequest() || pending_flush_) { |
- DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; |
- return; |
- } |
- demuxer_stream_->Read(base::Bind(&RemoteDemuxerStreamAdapter::OnNewBuffer, |
- request_buffer_weak_factory_.GetWeakPtr())); |
-} |
- |
-void RemoteDemuxerStreamAdapter::OnNewBuffer( |
- ::media::DemuxerStream::Status status, |
- const scoped_refptr<::media::DecoderBuffer>& input) { |
- DEMUXER_VLOG(3) << "status=" << status; |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- if (!IsProcessingReadRequest() || pending_flush_) { |
- DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; |
- return; |
- } |
- |
- switch (status) { |
- case ::media::DemuxerStream::kAborted: |
- DCHECK(!input); |
- SendReadAck(); |
- return; |
- case ::media::DemuxerStream::kConfigChanged: |
- // TODO(erickung): consider sending updated Audio/Video decoder config to |
- // RemotingRendererController. |
- // Stores available audio/video decoder config and issues |
- // RPC_DS_READUNTIL_CALLBACK RPC to notify receiver. |
- DCHECK(!input); |
- media_status_ = status; |
- if (demuxer_stream_->type() == ::media::DemuxerStream::VIDEO) |
- video_config_ = demuxer_stream_->video_decoder_config(); |
- if (demuxer_stream_->type() == ::media::DemuxerStream::AUDIO) |
- audio_config_ = demuxer_stream_->audio_decoder_config(); |
- SendReadAck(); |
- return; |
- case ::media::DemuxerStream::kOk: { |
- media_status_ = status; |
- DCHECK(pending_frame_.empty()); |
- if (!producer_handle_.is_valid()) |
- return; // Do not start sending (due to previous fatal error). |
- pending_frame_ = DecoderBufferToByteArray(input); |
- pending_frame_is_eos_ = input->end_of_stream(); |
- TryWriteData(MOJO_RESULT_OK); |
- } break; |
- } |
-} |
- |
-void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- // The Mojo watcher will also call TryWriteData() sometimes as a notification |
- // that data pipe is ready. But that does not necessarily mean the data for a |
- // read request is ready to be written into the pipe. |
- if (!IsProcessingReadRequest() || pending_flush_) { |
- DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state"; |
- return; |
- } |
- |
- if (pending_frame_.empty()) { |
- DEMUXER_VLOG(3) << "No data available, waiting for demuxer"; |
- return; |
- } |
- |
- if (!stream_sender_ || !producer_handle_.is_valid()) { |
- DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid"; |
- return; |
- } |
- |
- uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_; |
- MojoResult mojo_result = |
- WriteDataRaw(producer_handle_.get(), |
- pending_frame_.data() + current_pending_frame_offset_, |
- &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); |
- if (mojo_result != MOJO_RESULT_OK) { |
- if (mojo_result != MOJO_RESULT_SHOULD_WAIT) { |
- DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" |
- << mojo_result; |
- OnFatalError(MOJO_PIPE_ERROR); |
- } |
- return; |
- } |
- |
- stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, |
- pending_frame_.size()); |
- current_pending_frame_offset_ += num_bytes; |
- bytes_written_to_pipe_ += num_bytes; |
- |
- // Checks if all buffer was written to browser process. |
- if (current_pending_frame_offset_ != pending_frame_.size()) { |
- // Returns and wait for mojo watcher to notify to write more data. |
- return; |
- } |
- |
- // Signal mojo remoting service that all frame buffer is written to data pipe. |
- stream_sender_->SendFrame(); |
- |
- // Resets frame buffer variables. |
- bool pending_frame_is_eos = pending_frame_is_eos_; |
- ++last_count_; |
- ResetPendingFrame(); |
- |
- // Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message. |
- if (read_until_count_ == last_count_ || pending_frame_is_eos) { |
- SendReadAck(); |
- return; |
- } |
- |
- // Contiune to read decoder buffer until reaching |read_until_count_| or |
- // end of stream. |
- media_task_runner_->PostTask( |
- FROM_HERE, base::Bind(&RemoteDemuxerStreamAdapter::RequestBuffer, |
- weak_factory_.GetWeakPtr())); |
-} |
- |
-void RemoteDemuxerStreamAdapter::SendReadAck() { |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- DEMUXER_VLOG(3) << "last_count_=" << last_count_ |
- << ", remote_read_callback_handle=" |
- << read_until_callback_handle_ |
- << ", media_status=" << media_status_; |
- // Issues RPC_DS_READUNTIL_CALLBACK RPC message. |
- std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); |
- rpc->set_handle(read_until_callback_handle_); |
- rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); |
- auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc(); |
- message->set_count(last_count_); |
- message->set_status( |
- remoting::ToProtoDemuxerStreamStatus(media_status_).value()); |
- if (media_status_ == ::media::DemuxerStream::kConfigChanged) { |
- if (audio_config_.IsValidConfig()) { |
- pb::AudioDecoderConfig* audio_message = |
- message->mutable_audio_decoder_config(); |
- ConvertAudioDecoderConfigToProto(audio_config_, audio_message); |
- } else if (video_config_.IsValidConfig()) { |
- pb::VideoDecoderConfig* video_message = |
- message->mutable_video_decoder_config(); |
- ConvertVideoDecoderConfigToProto(video_config_, video_message); |
- } else { |
- NOTREACHED(); |
- } |
- } |
- |
- DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle() |
- << " with count=" << message->count() |
- << ", status=" << message->status() << ", decoder_config={" |
- << (audio_config_.IsValidConfig() |
- ? audio_config_.AsHumanReadableString() |
- : video_config_.IsValidConfig() |
- ? video_config_.AsHumanReadableString() |
- : "DID NOT CHANGE") |
- << '}'; |
- main_task_runner_->PostTask( |
- FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote, |
- rpc_broker_, base::Passed(&rpc))); |
- // Resets callback handle after completing the reading request. |
- read_until_callback_handle_ = kInvalidHandle; |
- |
- // Resets audio/video decoder config since it only sends once. |
- if (audio_config_.IsValidConfig()) |
- audio_config_ = ::media::AudioDecoderConfig(); |
- if (video_config_.IsValidConfig()) |
- video_config_ = ::media::VideoDecoderConfig(); |
-} |
- |
-void RemoteDemuxerStreamAdapter::ResetPendingFrame() { |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- current_pending_frame_offset_ = 0; |
- pending_frame_.clear(); |
- pending_frame_is_eos_ = false; |
-} |
- |
-void RemoteDemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) { |
- DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- |
- DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger; |
- |
- if (error_callback_.is_null()) |
- return; |
- |
- if (write_watcher_.IsWatching()) { |
- DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; |
- write_watcher_.Cancel(); |
- } |
- |
- base::ResetAndReturn(&error_callback_).Run(stop_trigger); |
-} |
- |
-} // namespace remoting |
-} // namespace media |