| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "media/remoting/remote_demuxer_stream_adapter.h" | 5 #include "media/remoting/demuxer_stream_adapter.h" |
| 6 | 6 |
| 7 #include "base/base64.h" | 7 #include "base/base64.h" |
| 8 #include "base/bind.h" | 8 #include "base/bind.h" |
| 9 #include "base/callback_helpers.h" | 9 #include "base/callback_helpers.h" |
| 10 #include "media/base/bind_to_current_loop.h" | 10 #include "media/base/bind_to_current_loop.h" |
| 11 #include "media/base/decoder_buffer.h" | 11 #include "media/base/decoder_buffer.h" |
| 12 #include "media/base/timestamp_constants.h" | 12 #include "media/base/timestamp_constants.h" |
| 13 #include "media/remoting/rpc/proto_enum_utils.h" | 13 #include "media/remoting/proto_enum_utils.h" |
| 14 #include "media/remoting/rpc/proto_utils.h" | 14 #include "media/remoting/proto_utils.h" |
| 15 | 15 |
| 16 // Convenience logging macro used throughout this file. | 16 // Convenience logging macro used throughout this file. |
| 17 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " | 17 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " |
| 18 | 18 |
| 19 namespace media { | 19 namespace media { |
| 20 namespace remoting { | 20 namespace remoting { |
| 21 | 21 |
| 22 // static | 22 // static |
| 23 mojo::DataPipe* CreateDataPipe() { | 23 mojo::DataPipe* DemuxerStreamAdapter::CreateDataPipe() { |
| 24 // Capacity in bytes for Mojo data pipe. | 24 // Capacity in bytes for Mojo data pipe. |
| 25 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; | 25 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; |
| 26 | 26 |
| 27 MojoCreateDataPipeOptions options; | 27 MojoCreateDataPipeOptions options; |
| 28 options.struct_size = sizeof(MojoCreateDataPipeOptions); | 28 options.struct_size = sizeof(MojoCreateDataPipeOptions); |
| 29 options.flags = MOJO_WRITE_DATA_FLAG_NONE; | 29 options.flags = MOJO_WRITE_DATA_FLAG_NONE; |
| 30 options.element_num_bytes = 1; | 30 options.element_num_bytes = 1; |
| 31 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; | 31 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; |
| 32 return new mojo::DataPipe(options); | 32 return new mojo::DataPipe(options); |
| 33 } | 33 } |
| 34 | 34 |
| 35 RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( | 35 DemuxerStreamAdapter::DemuxerStreamAdapter( |
| 36 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, | 36 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, |
| 37 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, | 37 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, |
| 38 const std::string& name, | 38 const std::string& name, |
| 39 ::media::DemuxerStream* demuxer_stream, | 39 DemuxerStream* demuxer_stream, |
| 40 const base::WeakPtr<RpcBroker>& rpc_broker, | 40 const base::WeakPtr<RpcBroker>& rpc_broker, |
| 41 int rpc_handle, | 41 int rpc_handle, |
| 42 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, | 42 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, |
| 43 mojo::ScopedDataPipeProducerHandle producer_handle, | 43 mojo::ScopedDataPipeProducerHandle producer_handle, |
| 44 const ErrorCallback& error_callback) | 44 const ErrorCallback& error_callback) |
| 45 : main_task_runner_(std::move(main_task_runner)), | 45 : main_task_runner_(std::move(main_task_runner)), |
| 46 media_task_runner_(std::move(media_task_runner)), | 46 media_task_runner_(std::move(media_task_runner)), |
| 47 name_(name), | 47 name_(name), |
| 48 rpc_broker_(rpc_broker), | 48 rpc_broker_(rpc_broker), |
| 49 rpc_handle_(rpc_handle), | 49 rpc_handle_(rpc_handle), |
| 50 demuxer_stream_(demuxer_stream), | 50 demuxer_stream_(demuxer_stream), |
| 51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), | 51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), |
| 52 error_callback_(error_callback), | 52 error_callback_(error_callback), |
| 53 remote_callback_handle_(kInvalidHandle), | 53 remote_callback_handle_(RpcBroker::kInvalidHandle), |
| 54 read_until_callback_handle_(kInvalidHandle), | 54 read_until_callback_handle_(RpcBroker::kInvalidHandle), |
| 55 read_until_count_(0), | 55 read_until_count_(0), |
| 56 last_count_(0), | 56 last_count_(0), |
| 57 pending_flush_(false), | 57 pending_flush_(false), |
| 58 current_pending_frame_offset_(0), | 58 current_pending_frame_offset_(0), |
| 59 pending_frame_is_eos_(false), | 59 pending_frame_is_eos_(false), |
| 60 media_status_(::media::DemuxerStream::kOk), | 60 media_status_(DemuxerStream::kOk), |
| 61 producer_handle_(std::move(producer_handle)), | 61 producer_handle_(std::move(producer_handle)), |
| 62 bytes_written_to_pipe_(0), | 62 bytes_written_to_pipe_(0), |
| 63 request_buffer_weak_factory_(this), | 63 request_buffer_weak_factory_(this), |
| 64 weak_factory_(this) { | 64 weak_factory_(this) { |
| 65 DCHECK(main_task_runner_); | 65 DCHECK(main_task_runner_); |
| 66 DCHECK(media_task_runner_); | 66 DCHECK(media_task_runner_); |
| 67 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 67 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 68 DCHECK(demuxer_stream); | 68 DCHECK(demuxer_stream); |
| 69 DCHECK(!error_callback.is_null()); | 69 DCHECK(!error_callback.is_null()); |
| 70 const RpcBroker::ReceiveMessageCallback receive_callback = | 70 const RpcBroker::ReceiveMessageCallback receive_callback = |
| 71 media::BindToCurrentLoop( | 71 BindToCurrentLoop(base::Bind(&DemuxerStreamAdapter::OnReceivedRpc, |
| 72 base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc, | 72 weak_factory_.GetWeakPtr())); |
| 73 weak_factory_.GetWeakPtr())); | |
| 74 main_task_runner_->PostTask( | 73 main_task_runner_->PostTask( |
| 75 FROM_HERE, | 74 FROM_HERE, base::Bind(&RpcBroker::RegisterMessageReceiverCallback, |
| 76 base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback, | 75 rpc_broker_, rpc_handle_, receive_callback)); |
| 77 rpc_broker_, rpc_handle_, receive_callback)); | |
| 78 | 76 |
| 79 stream_sender_.Bind(std::move(stream_sender_info)); | 77 stream_sender_.Bind(std::move(stream_sender_info)); |
| 80 stream_sender_.set_connection_error_handler( | 78 stream_sender_.set_connection_error_handler( |
| 81 base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError, | 79 base::Bind(&DemuxerStreamAdapter::OnFatalError, |
| 82 weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR)); | 80 weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR)); |
| 83 } | 81 } |
| 84 | 82 |
| 85 RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { | 83 DemuxerStreamAdapter::~DemuxerStreamAdapter() { |
| 86 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 84 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 87 main_task_runner_->PostTask( | 85 main_task_runner_->PostTask( |
| 88 FROM_HERE, | 86 FROM_HERE, base::Bind(&RpcBroker::UnregisterMessageReceiverCallback, |
| 89 base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback, | 87 rpc_broker_, rpc_handle_)); |
| 90 rpc_broker_, rpc_handle_)); | |
| 91 } | 88 } |
| 92 | 89 |
| 93 int64_t RemoteDemuxerStreamAdapter::GetBytesWrittenAndReset() { | 90 int64_t DemuxerStreamAdapter::GetBytesWrittenAndReset() { |
| 94 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 91 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 95 const int64_t current_count = bytes_written_to_pipe_; | 92 const int64_t current_count = bytes_written_to_pipe_; |
| 96 bytes_written_to_pipe_ = 0; | 93 bytes_written_to_pipe_ = 0; |
| 97 return current_count; | 94 return current_count; |
| 98 } | 95 } |
| 99 | 96 |
| 100 base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush( | 97 base::Optional<uint32_t> DemuxerStreamAdapter::SignalFlush(bool flushing) { |
| 101 bool flushing) { | |
| 102 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 98 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 103 DEMUXER_VLOG(2) << "flushing=" << flushing; | 99 DEMUXER_VLOG(2) << "flushing=" << flushing; |
| 104 | 100 |
| 105 // Ignores if |pending_flush_| states is same. | 101 // Ignores if |pending_flush_| states is same. |
| 106 if (pending_flush_ == flushing) | 102 if (pending_flush_ == flushing) |
| 107 return base::nullopt; | 103 return base::nullopt; |
| 108 | 104 |
| 109 // Cleans up pending frame data. | 105 // Cleans up pending frame data. |
| 110 pending_frame_.clear(); | 106 pending_frame_.clear(); |
| 111 current_pending_frame_offset_ = 0; | 107 current_pending_frame_offset_ = 0; |
| 112 pending_frame_is_eos_ = false; | 108 pending_frame_is_eos_ = false; |
| 113 // Invalidates pending Read() tasks. | 109 // Invalidates pending Read() tasks. |
| 114 request_buffer_weak_factory_.InvalidateWeakPtrs(); | 110 request_buffer_weak_factory_.InvalidateWeakPtrs(); |
| 115 | 111 |
| 116 // Cancels in flight data in browser process. | 112 // Cancels in flight data in browser process. |
| 117 pending_flush_ = flushing; | 113 pending_flush_ = flushing; |
| 118 if (flushing) { | 114 if (flushing) { |
| 119 stream_sender_->CancelInFlightData(); | 115 stream_sender_->CancelInFlightData(); |
| 120 } else { | 116 } else { |
| 121 // Sets callback handle invalid to abort ongoing read request. | 117 // Sets callback handle invalid to abort ongoing read request. |
| 122 read_until_callback_handle_ = kInvalidHandle; | 118 read_until_callback_handle_ = RpcBroker::kInvalidHandle; |
| 123 } | 119 } |
| 124 return last_count_; | 120 return last_count_; |
| 125 } | 121 } |
| 126 | 122 |
| 127 void RemoteDemuxerStreamAdapter::OnReceivedRpc( | 123 void DemuxerStreamAdapter::OnReceivedRpc( |
| 128 std::unique_ptr<remoting::pb::RpcMessage> message) { | 124 std::unique_ptr<pb::RpcMessage> message) { |
| 129 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 125 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 130 DCHECK(message); | 126 DCHECK(message); |
| 131 DCHECK(rpc_handle_ == message->handle()); | 127 DCHECK(rpc_handle_ == message->handle()); |
| 132 | 128 |
| 133 switch (message->proc()) { | 129 switch (message->proc()) { |
| 134 case remoting::pb::RpcMessage::RPC_DS_INITIALIZE: | 130 case pb::RpcMessage::RPC_DS_INITIALIZE: |
| 135 Initialize(message->integer_value()); | 131 Initialize(message->integer_value()); |
| 136 break; | 132 break; |
| 137 case remoting::pb::RpcMessage::RPC_DS_READUNTIL: | 133 case pb::RpcMessage::RPC_DS_READUNTIL: |
| 138 ReadUntil(std::move(message)); | 134 ReadUntil(std::move(message)); |
| 139 break; | 135 break; |
| 140 case remoting::pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER: | 136 case pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER: |
| 141 EnableBitstreamConverter(); | 137 EnableBitstreamConverter(); |
| 142 break; | 138 break; |
| 143 | 139 |
| 144 default: | 140 default: |
| 145 DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc(); | 141 DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc(); |
| 146 } | 142 } |
| 147 } | 143 } |
| 148 | 144 |
| 149 void RemoteDemuxerStreamAdapter::Initialize(int remote_callback_handle) { | 145 void DemuxerStreamAdapter::Initialize(int remote_callback_handle) { |
| 150 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 146 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 151 DCHECK(!pending_flush_); | 147 DCHECK(!pending_flush_); |
| 152 DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle=" | 148 DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle=" |
| 153 << remote_callback_handle; | 149 << remote_callback_handle; |
| 154 | 150 |
| 155 // Checks if initialization had been called or not. | 151 // Checks if initialization had been called or not. |
| 156 if (remote_callback_handle_ != kInvalidHandle) { | 152 if (remote_callback_handle_ != RpcBroker::kInvalidHandle) { |
| 157 DEMUXER_VLOG(1) << "Duplicated initialization. Have: " | 153 DEMUXER_VLOG(1) << "Duplicated initialization. Have: " |
| 158 << remote_callback_handle_ | 154 << remote_callback_handle_ |
| 159 << ", Given: " << remote_callback_handle; | 155 << ", Given: " << remote_callback_handle; |
| 160 // Shuts down data pipe if available if providing different remote callback | 156 // Shuts down data pipe if available if providing different remote callback |
| 161 // handle for initialization. Otherwise, just silently ignore the duplicated | 157 // handle for initialization. Otherwise, just silently ignore the duplicated |
| 162 // request. | 158 // request. |
| 163 if (remote_callback_handle_ != remote_callback_handle) { | 159 if (remote_callback_handle_ != remote_callback_handle) { |
| 164 OnFatalError(PEERS_OUT_OF_SYNC); | 160 OnFatalError(PEERS_OUT_OF_SYNC); |
| 165 } | 161 } |
| 166 return; | 162 return; |
| 167 } | 163 } |
| 168 remote_callback_handle_ = remote_callback_handle; | 164 remote_callback_handle_ = remote_callback_handle; |
| 169 | 165 |
| 170 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. | 166 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. |
| 171 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); | 167 std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); |
| 172 rpc->set_handle(remote_callback_handle_); | 168 rpc->set_handle(remote_callback_handle_); |
| 173 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); | 169 rpc->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); |
| 174 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); | 170 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); |
| 175 init_cb_message->set_type(type_); | 171 init_cb_message->set_type(type_); |
| 176 switch (type_) { | 172 switch (type_) { |
| 177 case ::media::DemuxerStream::Type::AUDIO: { | 173 case DemuxerStream::Type::AUDIO: { |
| 178 audio_config_ = demuxer_stream_->audio_decoder_config(); | 174 audio_config_ = demuxer_stream_->audio_decoder_config(); |
| 179 pb::AudioDecoderConfig* audio_message = | 175 pb::AudioDecoderConfig* audio_message = |
| 180 init_cb_message->mutable_audio_decoder_config(); | 176 init_cb_message->mutable_audio_decoder_config(); |
| 181 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); | 177 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); |
| 182 break; | 178 break; |
| 183 } | 179 } |
| 184 case ::media::DemuxerStream::Type::VIDEO: { | 180 case DemuxerStream::Type::VIDEO: { |
| 185 video_config_ = demuxer_stream_->video_decoder_config(); | 181 video_config_ = demuxer_stream_->video_decoder_config(); |
| 186 pb::VideoDecoderConfig* video_message = | 182 pb::VideoDecoderConfig* video_message = |
| 187 init_cb_message->mutable_video_decoder_config(); | 183 init_cb_message->mutable_video_decoder_config(); |
| 188 ConvertVideoDecoderConfigToProto(video_config_, video_message); | 184 ConvertVideoDecoderConfigToProto(video_config_, video_message); |
| 189 break; | 185 break; |
| 190 } | 186 } |
| 191 default: | 187 default: |
| 192 NOTREACHED(); | 188 NOTREACHED(); |
| 193 } | 189 } |
| 194 | 190 |
| 195 DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle() | 191 DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle() |
| 196 << " with decoder_config={" | 192 << " with decoder_config={" |
| 197 << (type_ == ::media::DemuxerStream::Type::AUDIO | 193 << (type_ == DemuxerStream::Type::AUDIO |
| 198 ? audio_config_.AsHumanReadableString() | 194 ? audio_config_.AsHumanReadableString() |
| 199 : video_config_.AsHumanReadableString()) | 195 : video_config_.AsHumanReadableString()) |
| 200 << '}'; | 196 << '}'; |
| 201 main_task_runner_->PostTask( | 197 main_task_runner_->PostTask( |
| 202 FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote, | 198 FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_, |
| 203 rpc_broker_, base::Passed(&rpc))); | 199 base::Passed(&rpc))); |
| 204 | 200 |
| 205 // Starts Mojo watcher. | 201 // Starts Mojo watcher. |
| 206 if (!write_watcher_.IsWatching()) { | 202 if (!write_watcher_.IsWatching()) { |
| 207 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher"; | 203 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher"; |
| 208 write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, | 204 write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, |
| 209 base::Bind(&RemoteDemuxerStreamAdapter::TryWriteData, | 205 base::Bind(&DemuxerStreamAdapter::TryWriteData, |
| 210 weak_factory_.GetWeakPtr())); | 206 weak_factory_.GetWeakPtr())); |
| 211 } | 207 } |
| 212 } | 208 } |
| 213 | 209 |
| 214 void RemoteDemuxerStreamAdapter::ReadUntil( | 210 void DemuxerStreamAdapter::ReadUntil(std::unique_ptr<pb::RpcMessage> message) { |
| 215 std::unique_ptr<remoting::pb::RpcMessage> message) { | |
| 216 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 211 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 217 DCHECK(message); | 212 DCHECK(message); |
| 218 if (!message->has_demuxerstream_readuntil_rpc()) { | 213 if (!message->has_demuxerstream_readuntil_rpc()) { |
| 219 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; | 214 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; |
| 220 OnFatalError(RPC_INVALID); | 215 OnFatalError(RPC_INVALID); |
| 221 return; | 216 return; |
| 222 } | 217 } |
| 223 | 218 |
| 224 const pb::DemuxerStreamReadUntil& rpc_message = | 219 const pb::DemuxerStreamReadUntil& rpc_message = |
| 225 message->demuxerstream_readuntil_rpc(); | 220 message->demuxerstream_readuntil_rpc(); |
| (...skipping 15 matching lines...) Expand all Loading... |
| 241 DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to " | 236 DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to " |
| 242 "current frame count"; | 237 "current frame count"; |
| 243 return; | 238 return; |
| 244 } | 239 } |
| 245 | 240 |
| 246 read_until_count_ = rpc_message.count(); | 241 read_until_count_ = rpc_message.count(); |
| 247 read_until_callback_handle_ = rpc_message.callback_handle(); | 242 read_until_callback_handle_ = rpc_message.callback_handle(); |
| 248 RequestBuffer(); | 243 RequestBuffer(); |
| 249 } | 244 } |
| 250 | 245 |
| 251 void RemoteDemuxerStreamAdapter::EnableBitstreamConverter() { | 246 void DemuxerStreamAdapter::EnableBitstreamConverter() { |
| 252 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 247 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 253 DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER"; | 248 DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER"; |
| 254 demuxer_stream_->EnableBitstreamConverter(); | 249 demuxer_stream_->EnableBitstreamConverter(); |
| 255 } | 250 } |
| 256 | 251 |
| 257 void RemoteDemuxerStreamAdapter::RequestBuffer() { | 252 void DemuxerStreamAdapter::RequestBuffer() { |
| 258 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 253 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 259 if (!IsProcessingReadRequest() || pending_flush_) { | 254 if (!IsProcessingReadRequest() || pending_flush_) { |
| 260 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; | 255 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; |
| 261 return; | 256 return; |
| 262 } | 257 } |
| 263 demuxer_stream_->Read(base::Bind(&RemoteDemuxerStreamAdapter::OnNewBuffer, | 258 demuxer_stream_->Read(base::Bind(&DemuxerStreamAdapter::OnNewBuffer, |
| 264 request_buffer_weak_factory_.GetWeakPtr())); | 259 request_buffer_weak_factory_.GetWeakPtr())); |
| 265 } | 260 } |
| 266 | 261 |
| 267 void RemoteDemuxerStreamAdapter::OnNewBuffer( | 262 void DemuxerStreamAdapter::OnNewBuffer( |
| 268 ::media::DemuxerStream::Status status, | 263 DemuxerStream::Status status, |
| 269 const scoped_refptr<::media::DecoderBuffer>& input) { | 264 const scoped_refptr<DecoderBuffer>& input) { |
| 270 DEMUXER_VLOG(3) << "status=" << status; | 265 DEMUXER_VLOG(3) << "status=" << status; |
| 271 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 266 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 272 if (!IsProcessingReadRequest() || pending_flush_) { | 267 if (!IsProcessingReadRequest() || pending_flush_) { |
| 273 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; | 268 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; |
| 274 return; | 269 return; |
| 275 } | 270 } |
| 276 | 271 |
| 277 switch (status) { | 272 switch (status) { |
| 278 case ::media::DemuxerStream::kAborted: | 273 case DemuxerStream::kAborted: |
| 279 DCHECK(!input); | 274 DCHECK(!input); |
| 280 SendReadAck(); | 275 SendReadAck(); |
| 281 return; | 276 return; |
| 282 case ::media::DemuxerStream::kConfigChanged: | 277 case DemuxerStream::kConfigChanged: |
| 283 // TODO(erickung): consider sending updated Audio/Video decoder config to | 278 // TODO(erickung): Notify controller of new decoder config, just in case |
| 284 // RemotingRendererController. | 279 // that will require remoting to be shutdown (due to known |
| 280 // lack-of-support). |
| 285 // Stores available audio/video decoder config and issues | 281 // Stores available audio/video decoder config and issues |
| 286 // RPC_DS_READUNTIL_CALLBACK RPC to notify receiver. | 282 // RPC_DS_READUNTIL_CALLBACK RPC to notify receiver. |
| 287 DCHECK(!input); | 283 DCHECK(!input); |
| 288 media_status_ = status; | 284 media_status_ = status; |
| 289 if (demuxer_stream_->type() == ::media::DemuxerStream::VIDEO) | 285 if (demuxer_stream_->type() == DemuxerStream::VIDEO) |
| 290 video_config_ = demuxer_stream_->video_decoder_config(); | 286 video_config_ = demuxer_stream_->video_decoder_config(); |
| 291 if (demuxer_stream_->type() == ::media::DemuxerStream::AUDIO) | 287 if (demuxer_stream_->type() == DemuxerStream::AUDIO) |
| 292 audio_config_ = demuxer_stream_->audio_decoder_config(); | 288 audio_config_ = demuxer_stream_->audio_decoder_config(); |
| 293 SendReadAck(); | 289 SendReadAck(); |
| 294 return; | 290 return; |
| 295 case ::media::DemuxerStream::kOk: { | 291 case DemuxerStream::kOk: { |
| 296 media_status_ = status; | 292 media_status_ = status; |
| 297 DCHECK(pending_frame_.empty()); | 293 DCHECK(pending_frame_.empty()); |
| 298 if (!producer_handle_.is_valid()) | 294 if (!producer_handle_.is_valid()) |
| 299 return; // Do not start sending (due to previous fatal error). | 295 return; // Do not start sending (due to previous fatal error). |
| 300 pending_frame_ = DecoderBufferToByteArray(input); | 296 pending_frame_ = DecoderBufferToByteArray(*input); |
| 301 pending_frame_is_eos_ = input->end_of_stream(); | 297 pending_frame_is_eos_ = input->end_of_stream(); |
| 302 TryWriteData(MOJO_RESULT_OK); | 298 TryWriteData(MOJO_RESULT_OK); |
| 303 } break; | 299 } break; |
| 304 } | 300 } |
| 305 } | 301 } |
| 306 | 302 |
| 307 void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { | 303 void DemuxerStreamAdapter::TryWriteData(MojoResult result) { |
| 308 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 304 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 309 // The Mojo watcher will also call TryWriteData() sometimes as a notification | 305 // The Mojo watcher will also call TryWriteData() sometimes as a notification |
| 310 // that data pipe is ready. But that does not necessarily mean the data for a | 306 // that data pipe is ready. But that does not necessarily mean the data for a |
| 311 // read request is ready to be written into the pipe. | 307 // read request is ready to be written into the pipe. |
| 312 if (!IsProcessingReadRequest() || pending_flush_) { | 308 if (!IsProcessingReadRequest() || pending_flush_) { |
| 313 DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state"; | 309 DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state"; |
| 314 return; | 310 return; |
| 315 } | 311 } |
| 316 | 312 |
| 317 if (pending_frame_.empty()) { | 313 if (pending_frame_.empty()) { |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 358 ResetPendingFrame(); | 354 ResetPendingFrame(); |
| 359 | 355 |
| 360 // Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message. | 356 // Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message. |
| 361 if (read_until_count_ == last_count_ || pending_frame_is_eos) { | 357 if (read_until_count_ == last_count_ || pending_frame_is_eos) { |
| 362 SendReadAck(); | 358 SendReadAck(); |
| 363 return; | 359 return; |
| 364 } | 360 } |
| 365 | 361 |
| 366 // Contiune to read decoder buffer until reaching |read_until_count_| or | 362 // Contiune to read decoder buffer until reaching |read_until_count_| or |
| 367 // end of stream. | 363 // end of stream. |
| 368 media_task_runner_->PostTask( | 364 media_task_runner_->PostTask(FROM_HERE, |
| 369 FROM_HERE, base::Bind(&RemoteDemuxerStreamAdapter::RequestBuffer, | 365 base::Bind(&DemuxerStreamAdapter::RequestBuffer, |
| 370 weak_factory_.GetWeakPtr())); | 366 weak_factory_.GetWeakPtr())); |
| 371 } | 367 } |
| 372 | 368 |
| 373 void RemoteDemuxerStreamAdapter::SendReadAck() { | 369 void DemuxerStreamAdapter::SendReadAck() { |
| 374 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 370 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 375 DEMUXER_VLOG(3) << "last_count_=" << last_count_ | 371 DEMUXER_VLOG(3) << "last_count_=" << last_count_ |
| 376 << ", remote_read_callback_handle=" | 372 << ", remote_read_callback_handle=" |
| 377 << read_until_callback_handle_ | 373 << read_until_callback_handle_ |
| 378 << ", media_status=" << media_status_; | 374 << ", media_status=" << media_status_; |
| 379 // Issues RPC_DS_READUNTIL_CALLBACK RPC message. | 375 // Issues RPC_DS_READUNTIL_CALLBACK RPC message. |
| 380 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); | 376 std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); |
| 381 rpc->set_handle(read_until_callback_handle_); | 377 rpc->set_handle(read_until_callback_handle_); |
| 382 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); | 378 rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); |
| 383 auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc(); | 379 auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc(); |
| 384 message->set_count(last_count_); | 380 message->set_count(last_count_); |
| 385 message->set_status( | 381 message->set_status(ToProtoDemuxerStreamStatus(media_status_).value()); |
| 386 remoting::ToProtoDemuxerStreamStatus(media_status_).value()); | 382 if (media_status_ == DemuxerStream::kConfigChanged) { |
| 387 if (media_status_ == ::media::DemuxerStream::kConfigChanged) { | |
| 388 if (audio_config_.IsValidConfig()) { | 383 if (audio_config_.IsValidConfig()) { |
| 389 pb::AudioDecoderConfig* audio_message = | 384 pb::AudioDecoderConfig* audio_message = |
| 390 message->mutable_audio_decoder_config(); | 385 message->mutable_audio_decoder_config(); |
| 391 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); | 386 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); |
| 392 } else if (video_config_.IsValidConfig()) { | 387 } else if (video_config_.IsValidConfig()) { |
| 393 pb::VideoDecoderConfig* video_message = | 388 pb::VideoDecoderConfig* video_message = |
| 394 message->mutable_video_decoder_config(); | 389 message->mutable_video_decoder_config(); |
| 395 ConvertVideoDecoderConfigToProto(video_config_, video_message); | 390 ConvertVideoDecoderConfigToProto(video_config_, video_message); |
| 396 } else { | 391 } else { |
| 397 NOTREACHED(); | 392 NOTREACHED(); |
| 398 } | 393 } |
| 399 } | 394 } |
| 400 | 395 |
| 401 DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle() | 396 DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle() |
| 402 << " with count=" << message->count() | 397 << " with count=" << message->count() |
| 403 << ", status=" << message->status() << ", decoder_config={" | 398 << ", status=" << message->status() << ", decoder_config={" |
| 404 << (audio_config_.IsValidConfig() | 399 << (audio_config_.IsValidConfig() |
| 405 ? audio_config_.AsHumanReadableString() | 400 ? audio_config_.AsHumanReadableString() |
| 406 : video_config_.IsValidConfig() | 401 : video_config_.IsValidConfig() |
| 407 ? video_config_.AsHumanReadableString() | 402 ? video_config_.AsHumanReadableString() |
| 408 : "DID NOT CHANGE") | 403 : "DID NOT CHANGE") |
| 409 << '}'; | 404 << '}'; |
| 410 main_task_runner_->PostTask( | 405 main_task_runner_->PostTask( |
| 411 FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote, | 406 FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_, |
| 412 rpc_broker_, base::Passed(&rpc))); | 407 base::Passed(&rpc))); |
| 413 // Resets callback handle after completing the reading request. | 408 // Resets callback handle after completing the reading request. |
| 414 read_until_callback_handle_ = kInvalidHandle; | 409 read_until_callback_handle_ = RpcBroker::kInvalidHandle; |
| 415 | 410 |
| 416 // Resets audio/video decoder config since it only sends once. | 411 // Resets audio/video decoder config since it only sends once. |
| 417 if (audio_config_.IsValidConfig()) | 412 if (audio_config_.IsValidConfig()) |
| 418 audio_config_ = ::media::AudioDecoderConfig(); | 413 audio_config_ = AudioDecoderConfig(); |
| 419 if (video_config_.IsValidConfig()) | 414 if (video_config_.IsValidConfig()) |
| 420 video_config_ = ::media::VideoDecoderConfig(); | 415 video_config_ = VideoDecoderConfig(); |
| 421 } | 416 } |
| 422 | 417 |
| 423 void RemoteDemuxerStreamAdapter::ResetPendingFrame() { | 418 void DemuxerStreamAdapter::ResetPendingFrame() { |
| 424 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 419 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 425 current_pending_frame_offset_ = 0; | 420 current_pending_frame_offset_ = 0; |
| 426 pending_frame_.clear(); | 421 pending_frame_.clear(); |
| 427 pending_frame_is_eos_ = false; | 422 pending_frame_is_eos_ = false; |
| 428 } | 423 } |
| 429 | 424 |
| 430 void RemoteDemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) { | 425 void DemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) { |
| 431 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 426 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 432 | 427 |
| 433 DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger; | 428 DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger; |
| 434 | 429 |
| 435 if (error_callback_.is_null()) | 430 if (error_callback_.is_null()) |
| 436 return; | 431 return; |
| 437 | 432 |
| 438 if (write_watcher_.IsWatching()) { | 433 if (write_watcher_.IsWatching()) { |
| 439 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; | 434 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; |
| 440 write_watcher_.Cancel(); | 435 write_watcher_.Cancel(); |
| 441 } | 436 } |
| 442 | 437 |
| 443 base::ResetAndReturn(&error_callback_).Run(stop_trigger); | 438 base::ResetAndReturn(&error_callback_).Run(stop_trigger); |
| 444 } | 439 } |
| 445 | 440 |
| 446 } // namespace remoting | 441 } // namespace remoting |
| 447 } // namespace media | 442 } // namespace media |
| OLD | NEW |