| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "media/remoting/remote_demuxer_stream_adapter.h" | 5 #include "media/remoting/remote_demuxer_stream_adapter.h" |
| 6 | 6 |
| 7 #include "base/base64.h" | 7 #include "base/base64.h" |
| 8 #include "base/bind.h" | 8 #include "base/bind.h" |
| 9 #include "base/callback_helpers.h" |
| 9 #include "media/base/bind_to_current_loop.h" | 10 #include "media/base/bind_to_current_loop.h" |
| 10 #include "media/base/decoder_buffer.h" | 11 #include "media/base/decoder_buffer.h" |
| 11 #include "media/base/timestamp_constants.h" | 12 #include "media/base/timestamp_constants.h" |
| 12 #include "media/remoting/rpc/proto_enum_utils.h" | 13 #include "media/remoting/rpc/proto_enum_utils.h" |
| 13 #include "media/remoting/rpc/proto_utils.h" | 14 #include "media/remoting/rpc/proto_utils.h" |
| 14 | 15 |
| 15 // Convenience logging macro used throughout this file. | 16 // Convenience logging macro used throughout this file. |
| 16 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " | 17 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " |
| 17 | 18 |
| 18 namespace media { | 19 namespace media { |
| 19 namespace remoting { | 20 namespace remoting { |
| 20 | 21 |
| 21 namespace { | |
| 22 constexpr char kErrorLostMojoConnection[] = "Mojo connection error"; | |
| 23 constexpr char kErrorDataPipeWrite[] = "Mojo data pipe write error"; | |
| 24 constexpr char kErrorDuplicateInitialize[] = "Duplicate attempt to initialize"; | |
| 25 constexpr char kErrorMissingMessageFields[] = "Missing required message fields"; | |
| 26 } // namepsace | |
| 27 | |
| 28 // static | 22 // static |
| 29 mojo::DataPipe* CreateDataPipe() { | 23 mojo::DataPipe* CreateDataPipe() { |
| 30 // Capacity in bytes for Mojo data pipe. | 24 // Capacity in bytes for Mojo data pipe. |
| 31 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; | 25 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; |
| 32 | 26 |
| 33 MojoCreateDataPipeOptions options; | 27 MojoCreateDataPipeOptions options; |
| 34 options.struct_size = sizeof(MojoCreateDataPipeOptions); | 28 options.struct_size = sizeof(MojoCreateDataPipeOptions); |
| 35 options.flags = MOJO_WRITE_DATA_FLAG_NONE; | 29 options.flags = MOJO_WRITE_DATA_FLAG_NONE; |
| 36 options.element_num_bytes = 1; | 30 options.element_num_bytes = 1; |
| 37 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; | 31 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; |
| 38 return new mojo::DataPipe(options); | 32 return new mojo::DataPipe(options); |
| 39 } | 33 } |
| 40 | 34 |
| 41 RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( | 35 RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( |
| 42 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, | 36 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, |
| 43 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, | 37 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, |
| 44 const std::string& name, | 38 const std::string& name, |
| 45 ::media::DemuxerStream* demuxer_stream, | 39 ::media::DemuxerStream* demuxer_stream, |
| 46 const base::WeakPtr<RpcBroker>& rpc_broker, | 40 const base::WeakPtr<RpcBroker>& rpc_broker, |
| 47 int rpc_handle, | 41 int rpc_handle, |
| 48 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, | 42 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, |
| 49 mojo::ScopedDataPipeProducerHandle producer_handle) | 43 mojo::ScopedDataPipeProducerHandle producer_handle, |
| 44 const ErrorCallback& error_callback) |
| 50 : main_task_runner_(std::move(main_task_runner)), | 45 : main_task_runner_(std::move(main_task_runner)), |
| 51 media_task_runner_(std::move(media_task_runner)), | 46 media_task_runner_(std::move(media_task_runner)), |
| 52 name_(name), | 47 name_(name), |
| 53 rpc_broker_(rpc_broker), | 48 rpc_broker_(rpc_broker), |
| 54 rpc_handle_(rpc_handle), | 49 rpc_handle_(rpc_handle), |
| 55 demuxer_stream_(demuxer_stream), | 50 demuxer_stream_(demuxer_stream), |
| 56 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), | 51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), |
| 52 error_callback_(error_callback), |
| 57 remote_callback_handle_(kInvalidHandle), | 53 remote_callback_handle_(kInvalidHandle), |
| 58 read_until_callback_handle_(kInvalidHandle), | 54 read_until_callback_handle_(kInvalidHandle), |
| 59 read_until_count_(0), | 55 read_until_count_(0), |
| 60 last_count_(0), | 56 last_count_(0), |
| 61 pending_flush_(false), | 57 pending_flush_(false), |
| 62 current_pending_frame_offset_(0), | 58 current_pending_frame_offset_(0), |
| 63 pending_frame_is_eos_(false), | 59 pending_frame_is_eos_(false), |
| 64 media_status_(::media::DemuxerStream::kOk), | 60 media_status_(::media::DemuxerStream::kOk), |
| 65 producer_handle_(std::move(producer_handle)), | 61 producer_handle_(std::move(producer_handle)), |
| 62 bytes_written_to_pipe_(0), |
| 66 request_buffer_weak_factory_(this), | 63 request_buffer_weak_factory_(this), |
| 67 weak_factory_(this) { | 64 weak_factory_(this) { |
| 68 DCHECK(main_task_runner_); | 65 DCHECK(main_task_runner_); |
| 69 DCHECK(media_task_runner_); | 66 DCHECK(media_task_runner_); |
| 70 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 67 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 71 DCHECK(demuxer_stream); | 68 DCHECK(demuxer_stream); |
| 69 DCHECK(!error_callback.is_null()); |
| 72 const RpcBroker::ReceiveMessageCallback receive_callback = | 70 const RpcBroker::ReceiveMessageCallback receive_callback = |
| 73 media::BindToCurrentLoop( | 71 media::BindToCurrentLoop( |
| 74 base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc, | 72 base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc, |
| 75 weak_factory_.GetWeakPtr())); | 73 weak_factory_.GetWeakPtr())); |
| 76 main_task_runner_->PostTask( | 74 main_task_runner_->PostTask( |
| 77 FROM_HERE, | 75 FROM_HERE, |
| 78 base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback, | 76 base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback, |
| 79 rpc_broker_, rpc_handle_, receive_callback)); | 77 rpc_broker_, rpc_handle_, receive_callback)); |
| 80 | 78 |
| 81 stream_sender_.Bind(std::move(stream_sender_info)); | 79 stream_sender_.Bind(std::move(stream_sender_info)); |
| 82 stream_sender_.set_connection_error_handler( | 80 stream_sender_.set_connection_error_handler( |
| 83 base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError, | 81 base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError, |
| 84 weak_factory_.GetWeakPtr(), kErrorLostMojoConnection)); | 82 weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR)); |
| 85 } | 83 } |
| 86 | 84 |
| 87 RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { | 85 RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { |
| 88 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 86 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 89 main_task_runner_->PostTask( | 87 main_task_runner_->PostTask( |
| 90 FROM_HERE, | 88 FROM_HERE, |
| 91 base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback, | 89 base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback, |
| 92 rpc_broker_, rpc_handle_)); | 90 rpc_broker_, rpc_handle_)); |
| 93 } | 91 } |
| 94 | 92 |
| 93 int64_t RemoteDemuxerStreamAdapter::GetBytesWrittenAndReset() { |
| 94 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 95 const int64_t current_count = bytes_written_to_pipe_; |
| 96 bytes_written_to_pipe_ = 0; |
| 97 return current_count; |
| 98 } |
| 99 |
| 95 base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush( | 100 base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush( |
| 96 bool flushing) { | 101 bool flushing) { |
| 97 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 102 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 98 DEMUXER_VLOG(2) << "flushing=" << flushing; | 103 DEMUXER_VLOG(2) << "flushing=" << flushing; |
| 99 | 104 |
| 100 // Ignores if |pending_flush_| states is same. | 105 // Ignores if |pending_flush_| states is same. |
| 101 if (pending_flush_ == flushing) | 106 if (pending_flush_ == flushing) |
| 102 return base::nullopt; | 107 return base::nullopt; |
| 103 | 108 |
| 104 // Cleans up pending frame data. | 109 // Cleans up pending frame data. |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 149 | 154 |
| 150 // Checks if initialization had been called or not. | 155 // Checks if initialization had been called or not. |
| 151 if (remote_callback_handle_ != kInvalidHandle) { | 156 if (remote_callback_handle_ != kInvalidHandle) { |
| 152 DEMUXER_VLOG(1) << "Duplicated initialization. Have: " | 157 DEMUXER_VLOG(1) << "Duplicated initialization. Have: " |
| 153 << remote_callback_handle_ | 158 << remote_callback_handle_ |
| 154 << ", Given: " << remote_callback_handle; | 159 << ", Given: " << remote_callback_handle; |
| 155 // Shuts down data pipe if available if providing different remote callback | 160 // Shuts down data pipe if available if providing different remote callback |
| 156 // handle for initialization. Otherwise, just silently ignore the duplicated | 161 // handle for initialization. Otherwise, just silently ignore the duplicated |
| 157 // request. | 162 // request. |
| 158 if (remote_callback_handle_ != remote_callback_handle) { | 163 if (remote_callback_handle_ != remote_callback_handle) { |
| 159 OnFatalError(kErrorDuplicateInitialize); | 164 OnFatalError(PEERS_OUT_OF_SYNC); |
| 160 } | 165 } |
| 161 return; | 166 return; |
| 162 } | 167 } |
| 163 remote_callback_handle_ = remote_callback_handle; | 168 remote_callback_handle_ = remote_callback_handle; |
| 164 | 169 |
| 165 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. | 170 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. |
| 166 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); | 171 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); |
| 167 rpc->set_handle(remote_callback_handle_); | 172 rpc->set_handle(remote_callback_handle_); |
| 168 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); | 173 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); |
| 169 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); | 174 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 205 weak_factory_.GetWeakPtr())); | 210 weak_factory_.GetWeakPtr())); |
| 206 } | 211 } |
| 207 } | 212 } |
| 208 | 213 |
| 209 void RemoteDemuxerStreamAdapter::ReadUntil( | 214 void RemoteDemuxerStreamAdapter::ReadUntil( |
| 210 std::unique_ptr<remoting::pb::RpcMessage> message) { | 215 std::unique_ptr<remoting::pb::RpcMessage> message) { |
| 211 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 216 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 212 DCHECK(message); | 217 DCHECK(message); |
| 213 if (!message->has_demuxerstream_readuntil_rpc()) { | 218 if (!message->has_demuxerstream_readuntil_rpc()) { |
| 214 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; | 219 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; |
| 215 OnFatalError(kErrorMissingMessageFields); | 220 OnFatalError(RPC_INVALID); |
| 216 return; | 221 return; |
| 217 } | 222 } |
| 218 | 223 |
| 219 const pb::DemuxerStreamReadUntil& rpc_message = | 224 const pb::DemuxerStreamReadUntil& rpc_message = |
| 220 message->demuxerstream_readuntil_rpc(); | 225 message->demuxerstream_readuntil_rpc(); |
| 221 DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle=" | 226 DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle=" |
| 222 << rpc_message.callback_handle() | 227 << rpc_message.callback_handle() |
| 223 << ", count=" << rpc_message.count(); | 228 << ", count=" << rpc_message.count(); |
| 224 | 229 |
| 225 if (pending_flush_) { | 230 if (pending_flush_) { |
| (...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 298 } break; | 303 } break; |
| 299 } | 304 } |
| 300 } | 305 } |
| 301 | 306 |
| 302 void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { | 307 void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { |
| 303 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 308 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 304 // The Mojo watcher will also call TryWriteData() sometimes as a notification | 309 // The Mojo watcher will also call TryWriteData() sometimes as a notification |
| 305 // that data pipe is ready. But that does not necessarily mean the data for a | 310 // that data pipe is ready. But that does not necessarily mean the data for a |
| 306 // read request is ready to be written into the pipe. | 311 // read request is ready to be written into the pipe. |
| 307 if (!IsProcessingReadRequest() || pending_flush_) { | 312 if (!IsProcessingReadRequest() || pending_flush_) { |
| 308 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; | 313 DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state"; |
| 309 return; | 314 return; |
| 310 } | 315 } |
| 311 | 316 |
| 312 if (pending_frame_.empty()) { | 317 if (pending_frame_.empty()) { |
| 313 DEMUXER_VLOG(3) << "No data available, waiting for demuxer"; | 318 DEMUXER_VLOG(3) << "No data available, waiting for demuxer"; |
| 314 return; | 319 return; |
| 315 } | 320 } |
| 316 | 321 |
| 317 if (!stream_sender_ || !producer_handle_.is_valid()) { | 322 if (!stream_sender_ || !producer_handle_.is_valid()) { |
| 318 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid"; | 323 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid"; |
| 319 return; | 324 return; |
| 320 } | 325 } |
| 321 | 326 |
| 322 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_; | 327 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_; |
| 323 MojoResult mojo_result = | 328 MojoResult mojo_result = |
| 324 WriteDataRaw(producer_handle_.get(), | 329 WriteDataRaw(producer_handle_.get(), |
| 325 pending_frame_.data() + current_pending_frame_offset_, | 330 pending_frame_.data() + current_pending_frame_offset_, |
| 326 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); | 331 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); |
| 327 if (mojo_result != MOJO_RESULT_OK) { | 332 if (mojo_result != MOJO_RESULT_OK) { |
| 328 if (mojo_result != MOJO_RESULT_SHOULD_WAIT) { | 333 if (mojo_result != MOJO_RESULT_SHOULD_WAIT) { |
| 329 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" | 334 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" |
| 330 << mojo_result; | 335 << mojo_result; |
| 331 OnFatalError(kErrorDataPipeWrite); | 336 OnFatalError(MOJO_PIPE_ERROR); |
| 332 } | 337 } |
| 333 return; | 338 return; |
| 334 } | 339 } |
| 335 | 340 |
| 336 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, | 341 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, |
| 337 pending_frame_.size()); | 342 pending_frame_.size()); |
| 338 current_pending_frame_offset_ += num_bytes; | 343 current_pending_frame_offset_ += num_bytes; |
| 344 bytes_written_to_pipe_ += num_bytes; |
| 339 | 345 |
| 340 // Checks if all buffer was written to browser process. | 346 // Checks if all buffer was written to browser process. |
| 341 if (current_pending_frame_offset_ != pending_frame_.size()) { | 347 if (current_pending_frame_offset_ != pending_frame_.size()) { |
| 342 // Returns and wait for mojo watcher to notify to write more data. | 348 // Returns and wait for mojo watcher to notify to write more data. |
| 343 return; | 349 return; |
| 344 } | 350 } |
| 345 | 351 |
| 346 // Signal mojo remoting service that all frame buffer is written to data pipe. | 352 // Signal mojo remoting service that all frame buffer is written to data pipe. |
| 347 stream_sender_->SendFrame(); | 353 stream_sender_->SendFrame(); |
| 348 | 354 |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 414 video_config_ = ::media::VideoDecoderConfig(); | 420 video_config_ = ::media::VideoDecoderConfig(); |
| 415 } | 421 } |
| 416 | 422 |
| 417 void RemoteDemuxerStreamAdapter::ResetPendingFrame() { | 423 void RemoteDemuxerStreamAdapter::ResetPendingFrame() { |
| 418 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 424 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 419 current_pending_frame_offset_ = 0; | 425 current_pending_frame_offset_ = 0; |
| 420 pending_frame_.clear(); | 426 pending_frame_.clear(); |
| 421 pending_frame_is_eos_ = false; | 427 pending_frame_is_eos_ = false; |
| 422 } | 428 } |
| 423 | 429 |
| 424 void RemoteDemuxerStreamAdapter::OnFatalError(const char* reason) { | 430 void RemoteDemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) { |
| 425 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 431 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 426 | 432 |
| 427 DEMUXER_VLOG(1) << reason; | 433 DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger; |
| 428 | 434 |
| 429 // Resets mojo data pipe producer handle. | 435 if (error_callback_.is_null()) |
| 430 producer_handle_.reset(); | 436 return; |
| 431 | |
| 432 // Resetting |stream_sender_| will close Mojo message pipe, which will cause | |
| 433 // the entire remoting session to be shut down. | |
| 434 if (stream_sender_) { | |
| 435 DEMUXER_VLOG(2) << "Reset data stream sender"; | |
| 436 stream_sender_.reset(); | |
| 437 } | |
| 438 | 437 |
| 439 if (write_watcher_.IsWatching()) { | 438 if (write_watcher_.IsWatching()) { |
| 440 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; | 439 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; |
| 441 write_watcher_.Cancel(); | 440 write_watcher_.Cancel(); |
| 442 } | 441 } |
| 442 |
| 443 base::ResetAndReturn(&error_callback_).Run(stop_trigger); |
| 443 } | 444 } |
| 444 | 445 |
| 445 } // namespace remoting | 446 } // namespace remoting |
| 446 } // namespace media | 447 } // namespace media |
| OLD | NEW |