| OLD | NEW |
| (Empty) |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "media/remoting/remote_demuxer_stream_adapter.h" | |
| 6 | |
| 7 #include "base/base64.h" | |
| 8 #include "base/bind.h" | |
| 9 #include "base/callback_helpers.h" | |
| 10 #include "media/base/bind_to_current_loop.h" | |
| 11 #include "media/base/decoder_buffer.h" | |
| 12 #include "media/base/timestamp_constants.h" | |
| 13 #include "media/remoting/rpc/proto_enum_utils.h" | |
| 14 #include "media/remoting/rpc/proto_utils.h" | |
| 15 | |
| 16 // Convenience logging macro used throughout this file. | |
| 17 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " | |
| 18 | |
| 19 namespace media { | |
| 20 namespace remoting { | |
| 21 | |
| 22 // static | |
| 23 mojo::DataPipe* CreateDataPipe() { | |
| 24 // Capacity in bytes for Mojo data pipe. | |
| 25 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; | |
| 26 | |
| 27 MojoCreateDataPipeOptions options; | |
| 28 options.struct_size = sizeof(MojoCreateDataPipeOptions); | |
| 29 options.flags = MOJO_WRITE_DATA_FLAG_NONE; | |
| 30 options.element_num_bytes = 1; | |
| 31 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; | |
| 32 return new mojo::DataPipe(options); | |
| 33 } | |
| 34 | |
| 35 RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( | |
| 36 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, | |
| 37 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, | |
| 38 const std::string& name, | |
| 39 ::media::DemuxerStream* demuxer_stream, | |
| 40 const base::WeakPtr<RpcBroker>& rpc_broker, | |
| 41 int rpc_handle, | |
| 42 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, | |
| 43 mojo::ScopedDataPipeProducerHandle producer_handle, | |
| 44 const ErrorCallback& error_callback) | |
| 45 : main_task_runner_(std::move(main_task_runner)), | |
| 46 media_task_runner_(std::move(media_task_runner)), | |
| 47 name_(name), | |
| 48 rpc_broker_(rpc_broker), | |
| 49 rpc_handle_(rpc_handle), | |
| 50 demuxer_stream_(demuxer_stream), | |
| 51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), | |
| 52 error_callback_(error_callback), | |
| 53 remote_callback_handle_(kInvalidHandle), | |
| 54 read_until_callback_handle_(kInvalidHandle), | |
| 55 read_until_count_(0), | |
| 56 last_count_(0), | |
| 57 pending_flush_(false), | |
| 58 current_pending_frame_offset_(0), | |
| 59 pending_frame_is_eos_(false), | |
| 60 media_status_(::media::DemuxerStream::kOk), | |
| 61 producer_handle_(std::move(producer_handle)), | |
| 62 bytes_written_to_pipe_(0), | |
| 63 request_buffer_weak_factory_(this), | |
| 64 weak_factory_(this) { | |
| 65 DCHECK(main_task_runner_); | |
| 66 DCHECK(media_task_runner_); | |
| 67 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 68 DCHECK(demuxer_stream); | |
| 69 DCHECK(!error_callback.is_null()); | |
| 70 const RpcBroker::ReceiveMessageCallback receive_callback = | |
| 71 media::BindToCurrentLoop( | |
| 72 base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc, | |
| 73 weak_factory_.GetWeakPtr())); | |
| 74 main_task_runner_->PostTask( | |
| 75 FROM_HERE, | |
| 76 base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback, | |
| 77 rpc_broker_, rpc_handle_, receive_callback)); | |
| 78 | |
| 79 stream_sender_.Bind(std::move(stream_sender_info)); | |
| 80 stream_sender_.set_connection_error_handler( | |
| 81 base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError, | |
| 82 weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR)); | |
| 83 } | |
| 84 | |
| 85 RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { | |
| 86 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 87 main_task_runner_->PostTask( | |
| 88 FROM_HERE, | |
| 89 base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback, | |
| 90 rpc_broker_, rpc_handle_)); | |
| 91 } | |
| 92 | |
| 93 int64_t RemoteDemuxerStreamAdapter::GetBytesWrittenAndReset() { | |
| 94 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 95 const int64_t current_count = bytes_written_to_pipe_; | |
| 96 bytes_written_to_pipe_ = 0; | |
| 97 return current_count; | |
| 98 } | |
| 99 | |
| 100 base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush( | |
| 101 bool flushing) { | |
| 102 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 103 DEMUXER_VLOG(2) << "flushing=" << flushing; | |
| 104 | |
| 105 // Ignores if |pending_flush_| states is same. | |
| 106 if (pending_flush_ == flushing) | |
| 107 return base::nullopt; | |
| 108 | |
| 109 // Cleans up pending frame data. | |
| 110 pending_frame_.clear(); | |
| 111 current_pending_frame_offset_ = 0; | |
| 112 pending_frame_is_eos_ = false; | |
| 113 // Invalidates pending Read() tasks. | |
| 114 request_buffer_weak_factory_.InvalidateWeakPtrs(); | |
| 115 | |
| 116 // Cancels in flight data in browser process. | |
| 117 pending_flush_ = flushing; | |
| 118 if (flushing) { | |
| 119 stream_sender_->CancelInFlightData(); | |
| 120 } else { | |
| 121 // Sets callback handle invalid to abort ongoing read request. | |
| 122 read_until_callback_handle_ = kInvalidHandle; | |
| 123 } | |
| 124 return last_count_; | |
| 125 } | |
| 126 | |
| 127 void RemoteDemuxerStreamAdapter::OnReceivedRpc( | |
| 128 std::unique_ptr<remoting::pb::RpcMessage> message) { | |
| 129 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 130 DCHECK(message); | |
| 131 DCHECK(rpc_handle_ == message->handle()); | |
| 132 | |
| 133 switch (message->proc()) { | |
| 134 case remoting::pb::RpcMessage::RPC_DS_INITIALIZE: | |
| 135 Initialize(message->integer_value()); | |
| 136 break; | |
| 137 case remoting::pb::RpcMessage::RPC_DS_READUNTIL: | |
| 138 ReadUntil(std::move(message)); | |
| 139 break; | |
| 140 case remoting::pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER: | |
| 141 EnableBitstreamConverter(); | |
| 142 break; | |
| 143 | |
| 144 default: | |
| 145 DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc(); | |
| 146 } | |
| 147 } | |
| 148 | |
| 149 void RemoteDemuxerStreamAdapter::Initialize(int remote_callback_handle) { | |
| 150 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 151 DCHECK(!pending_flush_); | |
| 152 DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle=" | |
| 153 << remote_callback_handle; | |
| 154 | |
| 155 // Checks if initialization had been called or not. | |
| 156 if (remote_callback_handle_ != kInvalidHandle) { | |
| 157 DEMUXER_VLOG(1) << "Duplicated initialization. Have: " | |
| 158 << remote_callback_handle_ | |
| 159 << ", Given: " << remote_callback_handle; | |
| 160 // Shuts down data pipe if available if providing different remote callback | |
| 161 // handle for initialization. Otherwise, just silently ignore the duplicated | |
| 162 // request. | |
| 163 if (remote_callback_handle_ != remote_callback_handle) { | |
| 164 OnFatalError(PEERS_OUT_OF_SYNC); | |
| 165 } | |
| 166 return; | |
| 167 } | |
| 168 remote_callback_handle_ = remote_callback_handle; | |
| 169 | |
| 170 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. | |
| 171 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); | |
| 172 rpc->set_handle(remote_callback_handle_); | |
| 173 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); | |
| 174 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); | |
| 175 init_cb_message->set_type(type_); | |
| 176 switch (type_) { | |
| 177 case ::media::DemuxerStream::Type::AUDIO: { | |
| 178 audio_config_ = demuxer_stream_->audio_decoder_config(); | |
| 179 pb::AudioDecoderConfig* audio_message = | |
| 180 init_cb_message->mutable_audio_decoder_config(); | |
| 181 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); | |
| 182 break; | |
| 183 } | |
| 184 case ::media::DemuxerStream::Type::VIDEO: { | |
| 185 video_config_ = demuxer_stream_->video_decoder_config(); | |
| 186 pb::VideoDecoderConfig* video_message = | |
| 187 init_cb_message->mutable_video_decoder_config(); | |
| 188 ConvertVideoDecoderConfigToProto(video_config_, video_message); | |
| 189 break; | |
| 190 } | |
| 191 default: | |
| 192 NOTREACHED(); | |
| 193 } | |
| 194 | |
| 195 DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle() | |
| 196 << " with decoder_config={" | |
| 197 << (type_ == ::media::DemuxerStream::Type::AUDIO | |
| 198 ? audio_config_.AsHumanReadableString() | |
| 199 : video_config_.AsHumanReadableString()) | |
| 200 << '}'; | |
| 201 main_task_runner_->PostTask( | |
| 202 FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote, | |
| 203 rpc_broker_, base::Passed(&rpc))); | |
| 204 | |
| 205 // Starts Mojo watcher. | |
| 206 if (!write_watcher_.IsWatching()) { | |
| 207 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher"; | |
| 208 write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, | |
| 209 base::Bind(&RemoteDemuxerStreamAdapter::TryWriteData, | |
| 210 weak_factory_.GetWeakPtr())); | |
| 211 } | |
| 212 } | |
| 213 | |
| 214 void RemoteDemuxerStreamAdapter::ReadUntil( | |
| 215 std::unique_ptr<remoting::pb::RpcMessage> message) { | |
| 216 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 217 DCHECK(message); | |
| 218 if (!message->has_demuxerstream_readuntil_rpc()) { | |
| 219 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; | |
| 220 OnFatalError(RPC_INVALID); | |
| 221 return; | |
| 222 } | |
| 223 | |
| 224 const pb::DemuxerStreamReadUntil& rpc_message = | |
| 225 message->demuxerstream_readuntil_rpc(); | |
| 226 DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle=" | |
| 227 << rpc_message.callback_handle() | |
| 228 << ", count=" << rpc_message.count(); | |
| 229 | |
| 230 if (pending_flush_) { | |
| 231 DEMUXER_VLOG(2) << "Skip actions since it's in the flushing state"; | |
| 232 return; | |
| 233 } | |
| 234 | |
| 235 if (IsProcessingReadRequest()) { | |
| 236 DEMUXER_VLOG(2) << "Ignore read request while it's in the reading state."; | |
| 237 return; | |
| 238 } | |
| 239 | |
| 240 if (rpc_message.count() <= last_count_) { | |
| 241 DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to " | |
| 242 "current frame count"; | |
| 243 return; | |
| 244 } | |
| 245 | |
| 246 read_until_count_ = rpc_message.count(); | |
| 247 read_until_callback_handle_ = rpc_message.callback_handle(); | |
| 248 RequestBuffer(); | |
| 249 } | |
| 250 | |
| 251 void RemoteDemuxerStreamAdapter::EnableBitstreamConverter() { | |
| 252 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 253 DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER"; | |
| 254 demuxer_stream_->EnableBitstreamConverter(); | |
| 255 } | |
| 256 | |
| 257 void RemoteDemuxerStreamAdapter::RequestBuffer() { | |
| 258 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 259 if (!IsProcessingReadRequest() || pending_flush_) { | |
| 260 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; | |
| 261 return; | |
| 262 } | |
| 263 demuxer_stream_->Read(base::Bind(&RemoteDemuxerStreamAdapter::OnNewBuffer, | |
| 264 request_buffer_weak_factory_.GetWeakPtr())); | |
| 265 } | |
| 266 | |
| 267 void RemoteDemuxerStreamAdapter::OnNewBuffer( | |
| 268 ::media::DemuxerStream::Status status, | |
| 269 const scoped_refptr<::media::DecoderBuffer>& input) { | |
| 270 DEMUXER_VLOG(3) << "status=" << status; | |
| 271 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 272 if (!IsProcessingReadRequest() || pending_flush_) { | |
| 273 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; | |
| 274 return; | |
| 275 } | |
| 276 | |
| 277 switch (status) { | |
| 278 case ::media::DemuxerStream::kAborted: | |
| 279 DCHECK(!input); | |
| 280 SendReadAck(); | |
| 281 return; | |
| 282 case ::media::DemuxerStream::kConfigChanged: | |
| 283 // TODO(erickung): consider sending updated Audio/Video decoder config to | |
| 284 // RemotingRendererController. | |
| 285 // Stores available audio/video decoder config and issues | |
| 286 // RPC_DS_READUNTIL_CALLBACK RPC to notify receiver. | |
| 287 DCHECK(!input); | |
| 288 media_status_ = status; | |
| 289 if (demuxer_stream_->type() == ::media::DemuxerStream::VIDEO) | |
| 290 video_config_ = demuxer_stream_->video_decoder_config(); | |
| 291 if (demuxer_stream_->type() == ::media::DemuxerStream::AUDIO) | |
| 292 audio_config_ = demuxer_stream_->audio_decoder_config(); | |
| 293 SendReadAck(); | |
| 294 return; | |
| 295 case ::media::DemuxerStream::kOk: { | |
| 296 media_status_ = status; | |
| 297 DCHECK(pending_frame_.empty()); | |
| 298 if (!producer_handle_.is_valid()) | |
| 299 return; // Do not start sending (due to previous fatal error). | |
| 300 pending_frame_ = DecoderBufferToByteArray(input); | |
| 301 pending_frame_is_eos_ = input->end_of_stream(); | |
| 302 TryWriteData(MOJO_RESULT_OK); | |
| 303 } break; | |
| 304 } | |
| 305 } | |
| 306 | |
| 307 void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { | |
| 308 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 309 // The Mojo watcher will also call TryWriteData() sometimes as a notification | |
| 310 // that data pipe is ready. But that does not necessarily mean the data for a | |
| 311 // read request is ready to be written into the pipe. | |
| 312 if (!IsProcessingReadRequest() || pending_flush_) { | |
| 313 DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state"; | |
| 314 return; | |
| 315 } | |
| 316 | |
| 317 if (pending_frame_.empty()) { | |
| 318 DEMUXER_VLOG(3) << "No data available, waiting for demuxer"; | |
| 319 return; | |
| 320 } | |
| 321 | |
| 322 if (!stream_sender_ || !producer_handle_.is_valid()) { | |
| 323 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid"; | |
| 324 return; | |
| 325 } | |
| 326 | |
| 327 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_; | |
| 328 MojoResult mojo_result = | |
| 329 WriteDataRaw(producer_handle_.get(), | |
| 330 pending_frame_.data() + current_pending_frame_offset_, | |
| 331 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); | |
| 332 if (mojo_result != MOJO_RESULT_OK) { | |
| 333 if (mojo_result != MOJO_RESULT_SHOULD_WAIT) { | |
| 334 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" | |
| 335 << mojo_result; | |
| 336 OnFatalError(MOJO_PIPE_ERROR); | |
| 337 } | |
| 338 return; | |
| 339 } | |
| 340 | |
| 341 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, | |
| 342 pending_frame_.size()); | |
| 343 current_pending_frame_offset_ += num_bytes; | |
| 344 bytes_written_to_pipe_ += num_bytes; | |
| 345 | |
| 346 // Checks if all buffer was written to browser process. | |
| 347 if (current_pending_frame_offset_ != pending_frame_.size()) { | |
| 348 // Returns and wait for mojo watcher to notify to write more data. | |
| 349 return; | |
| 350 } | |
| 351 | |
| 352 // Signal mojo remoting service that all frame buffer is written to data pipe. | |
| 353 stream_sender_->SendFrame(); | |
| 354 | |
| 355 // Resets frame buffer variables. | |
| 356 bool pending_frame_is_eos = pending_frame_is_eos_; | |
| 357 ++last_count_; | |
| 358 ResetPendingFrame(); | |
| 359 | |
| 360 // Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message. | |
| 361 if (read_until_count_ == last_count_ || pending_frame_is_eos) { | |
| 362 SendReadAck(); | |
| 363 return; | |
| 364 } | |
| 365 | |
| 366 // Contiune to read decoder buffer until reaching |read_until_count_| or | |
| 367 // end of stream. | |
| 368 media_task_runner_->PostTask( | |
| 369 FROM_HERE, base::Bind(&RemoteDemuxerStreamAdapter::RequestBuffer, | |
| 370 weak_factory_.GetWeakPtr())); | |
| 371 } | |
| 372 | |
| 373 void RemoteDemuxerStreamAdapter::SendReadAck() { | |
| 374 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 375 DEMUXER_VLOG(3) << "last_count_=" << last_count_ | |
| 376 << ", remote_read_callback_handle=" | |
| 377 << read_until_callback_handle_ | |
| 378 << ", media_status=" << media_status_; | |
| 379 // Issues RPC_DS_READUNTIL_CALLBACK RPC message. | |
| 380 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); | |
| 381 rpc->set_handle(read_until_callback_handle_); | |
| 382 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); | |
| 383 auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc(); | |
| 384 message->set_count(last_count_); | |
| 385 message->set_status( | |
| 386 remoting::ToProtoDemuxerStreamStatus(media_status_).value()); | |
| 387 if (media_status_ == ::media::DemuxerStream::kConfigChanged) { | |
| 388 if (audio_config_.IsValidConfig()) { | |
| 389 pb::AudioDecoderConfig* audio_message = | |
| 390 message->mutable_audio_decoder_config(); | |
| 391 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); | |
| 392 } else if (video_config_.IsValidConfig()) { | |
| 393 pb::VideoDecoderConfig* video_message = | |
| 394 message->mutable_video_decoder_config(); | |
| 395 ConvertVideoDecoderConfigToProto(video_config_, video_message); | |
| 396 } else { | |
| 397 NOTREACHED(); | |
| 398 } | |
| 399 } | |
| 400 | |
| 401 DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle() | |
| 402 << " with count=" << message->count() | |
| 403 << ", status=" << message->status() << ", decoder_config={" | |
| 404 << (audio_config_.IsValidConfig() | |
| 405 ? audio_config_.AsHumanReadableString() | |
| 406 : video_config_.IsValidConfig() | |
| 407 ? video_config_.AsHumanReadableString() | |
| 408 : "DID NOT CHANGE") | |
| 409 << '}'; | |
| 410 main_task_runner_->PostTask( | |
| 411 FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote, | |
| 412 rpc_broker_, base::Passed(&rpc))); | |
| 413 // Resets callback handle after completing the reading request. | |
| 414 read_until_callback_handle_ = kInvalidHandle; | |
| 415 | |
| 416 // Resets audio/video decoder config since it only sends once. | |
| 417 if (audio_config_.IsValidConfig()) | |
| 418 audio_config_ = ::media::AudioDecoderConfig(); | |
| 419 if (video_config_.IsValidConfig()) | |
| 420 video_config_ = ::media::VideoDecoderConfig(); | |
| 421 } | |
| 422 | |
| 423 void RemoteDemuxerStreamAdapter::ResetPendingFrame() { | |
| 424 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 425 current_pending_frame_offset_ = 0; | |
| 426 pending_frame_.clear(); | |
| 427 pending_frame_is_eos_ = false; | |
| 428 } | |
| 429 | |
| 430 void RemoteDemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) { | |
| 431 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
| 432 | |
| 433 DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger; | |
| 434 | |
| 435 if (error_callback_.is_null()) | |
| 436 return; | |
| 437 | |
| 438 if (write_watcher_.IsWatching()) { | |
| 439 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; | |
| 440 write_watcher_.Cancel(); | |
| 441 } | |
| 442 | |
| 443 base::ResetAndReturn(&error_callback_).Run(stop_trigger); | |
| 444 } | |
| 445 | |
| 446 } // namespace remoting | |
| 447 } // namespace media | |
| OLD | NEW |