OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "media/remoting/remote_demuxer_stream_adapter.h" | 5 #include "media/remoting/remote_demuxer_stream_adapter.h" |
6 | 6 |
7 #include "base/base64.h" | 7 #include "base/base64.h" |
8 #include "base/bind.h" | 8 #include "base/bind.h" |
| 9 #include "base/callback_helpers.h" |
9 #include "media/base/bind_to_current_loop.h" | 10 #include "media/base/bind_to_current_loop.h" |
10 #include "media/base/decoder_buffer.h" | 11 #include "media/base/decoder_buffer.h" |
11 #include "media/base/timestamp_constants.h" | 12 #include "media/base/timestamp_constants.h" |
12 #include "media/remoting/rpc/proto_enum_utils.h" | 13 #include "media/remoting/rpc/proto_enum_utils.h" |
13 #include "media/remoting/rpc/proto_utils.h" | 14 #include "media/remoting/rpc/proto_utils.h" |
14 | 15 |
15 // Convenience logging macro used throughout this file. | 16 // Convenience logging macro used throughout this file. |
16 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " | 17 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " |
17 | 18 |
18 namespace media { | 19 namespace media { |
19 namespace remoting { | 20 namespace remoting { |
20 | 21 |
21 namespace { | |
22 constexpr char kErrorLostMojoConnection[] = "Mojo connection error"; | |
23 constexpr char kErrorDataPipeWrite[] = "Mojo data pipe write error"; | |
24 constexpr char kErrorDuplicateInitialize[] = "Duplicate attempt to initialize"; | |
25 constexpr char kErrorMissingMessageFields[] = "Missing required message fields"; | |
26 } // namepsace | |
27 | |
28 // static | 22 // static |
29 mojo::DataPipe* CreateDataPipe() { | 23 mojo::DataPipe* CreateDataPipe() { |
30 // Capacity in bytes for Mojo data pipe. | 24 // Capacity in bytes for Mojo data pipe. |
31 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; | 25 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; |
32 | 26 |
33 MojoCreateDataPipeOptions options; | 27 MojoCreateDataPipeOptions options; |
34 options.struct_size = sizeof(MojoCreateDataPipeOptions); | 28 options.struct_size = sizeof(MojoCreateDataPipeOptions); |
35 options.flags = MOJO_WRITE_DATA_FLAG_NONE; | 29 options.flags = MOJO_WRITE_DATA_FLAG_NONE; |
36 options.element_num_bytes = 1; | 30 options.element_num_bytes = 1; |
37 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; | 31 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; |
38 return new mojo::DataPipe(options); | 32 return new mojo::DataPipe(options); |
39 } | 33 } |
40 | 34 |
41 RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( | 35 RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( |
42 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, | 36 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, |
43 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, | 37 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, |
44 const std::string& name, | 38 const std::string& name, |
45 ::media::DemuxerStream* demuxer_stream, | 39 ::media::DemuxerStream* demuxer_stream, |
46 const base::WeakPtr<RpcBroker>& rpc_broker, | 40 const base::WeakPtr<RpcBroker>& rpc_broker, |
47 int rpc_handle, | 41 int rpc_handle, |
48 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, | 42 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, |
49 mojo::ScopedDataPipeProducerHandle producer_handle) | 43 mojo::ScopedDataPipeProducerHandle producer_handle, |
| 44 const ErrorCallback& error_callback) |
50 : main_task_runner_(std::move(main_task_runner)), | 45 : main_task_runner_(std::move(main_task_runner)), |
51 media_task_runner_(std::move(media_task_runner)), | 46 media_task_runner_(std::move(media_task_runner)), |
52 name_(name), | 47 name_(name), |
53 rpc_broker_(rpc_broker), | 48 rpc_broker_(rpc_broker), |
54 rpc_handle_(rpc_handle), | 49 rpc_handle_(rpc_handle), |
55 demuxer_stream_(demuxer_stream), | 50 demuxer_stream_(demuxer_stream), |
56 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), | 51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), |
| 52 error_callback_(error_callback), |
57 remote_callback_handle_(kInvalidHandle), | 53 remote_callback_handle_(kInvalidHandle), |
58 read_until_callback_handle_(kInvalidHandle), | 54 read_until_callback_handle_(kInvalidHandle), |
59 read_until_count_(0), | 55 read_until_count_(0), |
60 last_count_(0), | 56 last_count_(0), |
61 pending_flush_(false), | 57 pending_flush_(false), |
62 current_pending_frame_offset_(0), | 58 current_pending_frame_offset_(0), |
63 pending_frame_is_eos_(false), | 59 pending_frame_is_eos_(false), |
64 media_status_(::media::DemuxerStream::kOk), | 60 media_status_(::media::DemuxerStream::kOk), |
65 producer_handle_(std::move(producer_handle)), | 61 producer_handle_(std::move(producer_handle)), |
| 62 bytes_written_to_pipe_(0), |
66 request_buffer_weak_factory_(this), | 63 request_buffer_weak_factory_(this), |
67 weak_factory_(this) { | 64 weak_factory_(this) { |
68 DCHECK(main_task_runner_); | 65 DCHECK(main_task_runner_); |
69 DCHECK(media_task_runner_); | 66 DCHECK(media_task_runner_); |
70 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 67 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
71 DCHECK(demuxer_stream); | 68 DCHECK(demuxer_stream); |
| 69 DCHECK(!error_callback.is_null()); |
72 const RpcBroker::ReceiveMessageCallback receive_callback = | 70 const RpcBroker::ReceiveMessageCallback receive_callback = |
73 media::BindToCurrentLoop( | 71 media::BindToCurrentLoop( |
74 base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc, | 72 base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc, |
75 weak_factory_.GetWeakPtr())); | 73 weak_factory_.GetWeakPtr())); |
76 main_task_runner_->PostTask( | 74 main_task_runner_->PostTask( |
77 FROM_HERE, | 75 FROM_HERE, |
78 base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback, | 76 base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback, |
79 rpc_broker_, rpc_handle_, receive_callback)); | 77 rpc_broker_, rpc_handle_, receive_callback)); |
80 | 78 |
81 stream_sender_.Bind(std::move(stream_sender_info)); | 79 stream_sender_.Bind(std::move(stream_sender_info)); |
82 stream_sender_.set_connection_error_handler( | 80 stream_sender_.set_connection_error_handler( |
83 base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError, | 81 base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError, |
84 weak_factory_.GetWeakPtr(), kErrorLostMojoConnection)); | 82 weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR)); |
85 } | 83 } |
86 | 84 |
87 RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { | 85 RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { |
88 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 86 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
89 main_task_runner_->PostTask( | 87 main_task_runner_->PostTask( |
90 FROM_HERE, | 88 FROM_HERE, |
91 base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback, | 89 base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback, |
92 rpc_broker_, rpc_handle_)); | 90 rpc_broker_, rpc_handle_)); |
93 } | 91 } |
94 | 92 |
| 93 int64_t RemoteDemuxerStreamAdapter::GetBytesWrittenAndReset() { |
| 94 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| 95 const int64_t current_count = bytes_written_to_pipe_; |
| 96 bytes_written_to_pipe_ = 0; |
| 97 return current_count; |
| 98 } |
| 99 |
95 base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush( | 100 base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush( |
96 bool flushing) { | 101 bool flushing) { |
97 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 102 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
98 DEMUXER_VLOG(2) << "flushing=" << flushing; | 103 DEMUXER_VLOG(2) << "flushing=" << flushing; |
99 | 104 |
100 // Ignores if |pending_flush_| states is same. | 105 // Ignores if |pending_flush_| states is same. |
101 if (pending_flush_ == flushing) | 106 if (pending_flush_ == flushing) |
102 return base::nullopt; | 107 return base::nullopt; |
103 | 108 |
104 // Cleans up pending frame data. | 109 // Cleans up pending frame data. |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
149 | 154 |
150 // Checks if initialization had been called or not. | 155 // Checks if initialization had been called or not. |
151 if (remote_callback_handle_ != kInvalidHandle) { | 156 if (remote_callback_handle_ != kInvalidHandle) { |
152 DEMUXER_VLOG(1) << "Duplicated initialization. Have: " | 157 DEMUXER_VLOG(1) << "Duplicated initialization. Have: " |
153 << remote_callback_handle_ | 158 << remote_callback_handle_ |
154 << ", Given: " << remote_callback_handle; | 159 << ", Given: " << remote_callback_handle; |
155 // Shuts down data pipe if available if providing different remote callback | 160 // Shuts down data pipe if available if providing different remote callback |
156 // handle for initialization. Otherwise, just silently ignore the duplicated | 161 // handle for initialization. Otherwise, just silently ignore the duplicated |
157 // request. | 162 // request. |
158 if (remote_callback_handle_ != remote_callback_handle) { | 163 if (remote_callback_handle_ != remote_callback_handle) { |
159 OnFatalError(kErrorDuplicateInitialize); | 164 OnFatalError(PEERS_OUT_OF_SYNC); |
160 } | 165 } |
161 return; | 166 return; |
162 } | 167 } |
163 remote_callback_handle_ = remote_callback_handle; | 168 remote_callback_handle_ = remote_callback_handle; |
164 | 169 |
165 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. | 170 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. |
166 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); | 171 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); |
167 rpc->set_handle(remote_callback_handle_); | 172 rpc->set_handle(remote_callback_handle_); |
168 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); | 173 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); |
169 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); | 174 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
205 weak_factory_.GetWeakPtr())); | 210 weak_factory_.GetWeakPtr())); |
206 } | 211 } |
207 } | 212 } |
208 | 213 |
209 void RemoteDemuxerStreamAdapter::ReadUntil( | 214 void RemoteDemuxerStreamAdapter::ReadUntil( |
210 std::unique_ptr<remoting::pb::RpcMessage> message) { | 215 std::unique_ptr<remoting::pb::RpcMessage> message) { |
211 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 216 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
212 DCHECK(message); | 217 DCHECK(message); |
213 if (!message->has_demuxerstream_readuntil_rpc()) { | 218 if (!message->has_demuxerstream_readuntil_rpc()) { |
214 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; | 219 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; |
215 OnFatalError(kErrorMissingMessageFields); | 220 OnFatalError(RPC_INVALID); |
216 return; | 221 return; |
217 } | 222 } |
218 | 223 |
219 const pb::DemuxerStreamReadUntil& rpc_message = | 224 const pb::DemuxerStreamReadUntil& rpc_message = |
220 message->demuxerstream_readuntil_rpc(); | 225 message->demuxerstream_readuntil_rpc(); |
221 DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle=" | 226 DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle=" |
222 << rpc_message.callback_handle() | 227 << rpc_message.callback_handle() |
223 << ", count=" << rpc_message.count(); | 228 << ", count=" << rpc_message.count(); |
224 | 229 |
225 if (pending_flush_) { | 230 if (pending_flush_) { |
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
298 } break; | 303 } break; |
299 } | 304 } |
300 } | 305 } |
301 | 306 |
302 void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { | 307 void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { |
303 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 308 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
304 // The Mojo watcher will also call TryWriteData() sometimes as a notification | 309 // The Mojo watcher will also call TryWriteData() sometimes as a notification |
305 // that data pipe is ready. But that does not necessarily mean the data for a | 310 // that data pipe is ready. But that does not necessarily mean the data for a |
306 // read request is ready to be written into the pipe. | 311 // read request is ready to be written into the pipe. |
307 if (!IsProcessingReadRequest() || pending_flush_) { | 312 if (!IsProcessingReadRequest() || pending_flush_) { |
308 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; | 313 DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state"; |
309 return; | 314 return; |
310 } | 315 } |
311 | 316 |
312 if (pending_frame_.empty()) { | 317 if (pending_frame_.empty()) { |
313 DEMUXER_VLOG(3) << "No data available, waiting for demuxer"; | 318 DEMUXER_VLOG(3) << "No data available, waiting for demuxer"; |
314 return; | 319 return; |
315 } | 320 } |
316 | 321 |
317 if (!stream_sender_ || !producer_handle_.is_valid()) { | 322 if (!stream_sender_ || !producer_handle_.is_valid()) { |
318 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid"; | 323 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid"; |
319 return; | 324 return; |
320 } | 325 } |
321 | 326 |
322 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_; | 327 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_; |
323 MojoResult mojo_result = | 328 MojoResult mojo_result = |
324 WriteDataRaw(producer_handle_.get(), | 329 WriteDataRaw(producer_handle_.get(), |
325 pending_frame_.data() + current_pending_frame_offset_, | 330 pending_frame_.data() + current_pending_frame_offset_, |
326 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); | 331 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); |
327 if (mojo_result != MOJO_RESULT_OK) { | 332 if (mojo_result != MOJO_RESULT_OK) { |
328 if (mojo_result != MOJO_RESULT_SHOULD_WAIT) { | 333 if (mojo_result != MOJO_RESULT_SHOULD_WAIT) { |
329 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" | 334 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" |
330 << mojo_result; | 335 << mojo_result; |
331 OnFatalError(kErrorDataPipeWrite); | 336 OnFatalError(MOJO_PIPE_ERROR); |
332 } | 337 } |
333 return; | 338 return; |
334 } | 339 } |
335 | 340 |
336 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, | 341 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, |
337 pending_frame_.size()); | 342 pending_frame_.size()); |
338 current_pending_frame_offset_ += num_bytes; | 343 current_pending_frame_offset_ += num_bytes; |
| 344 bytes_written_to_pipe_ += num_bytes; |
339 | 345 |
340 // Checks if all buffer was written to browser process. | 346 // Checks if all buffer was written to browser process. |
341 if (current_pending_frame_offset_ != pending_frame_.size()) { | 347 if (current_pending_frame_offset_ != pending_frame_.size()) { |
342 // Returns and wait for mojo watcher to notify to write more data. | 348 // Returns and wait for mojo watcher to notify to write more data. |
343 return; | 349 return; |
344 } | 350 } |
345 | 351 |
346 // Signal mojo remoting service that all frame buffer is written to data pipe. | 352 // Signal mojo remoting service that all frame buffer is written to data pipe. |
347 stream_sender_->SendFrame(); | 353 stream_sender_->SendFrame(); |
348 | 354 |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
414 video_config_ = ::media::VideoDecoderConfig(); | 420 video_config_ = ::media::VideoDecoderConfig(); |
415 } | 421 } |
416 | 422 |
417 void RemoteDemuxerStreamAdapter::ResetPendingFrame() { | 423 void RemoteDemuxerStreamAdapter::ResetPendingFrame() { |
418 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 424 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
419 current_pending_frame_offset_ = 0; | 425 current_pending_frame_offset_ = 0; |
420 pending_frame_.clear(); | 426 pending_frame_.clear(); |
421 pending_frame_is_eos_ = false; | 427 pending_frame_is_eos_ = false; |
422 } | 428 } |
423 | 429 |
424 void RemoteDemuxerStreamAdapter::OnFatalError(const char* reason) { | 430 void RemoteDemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) { |
425 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 431 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
426 | 432 |
427 DEMUXER_VLOG(1) << reason; | 433 DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger; |
428 | 434 |
429 // Resets mojo data pipe producer handle. | 435 if (error_callback_.is_null()) |
430 producer_handle_.reset(); | 436 return; |
431 | |
432 // Resetting |stream_sender_| will close Mojo message pipe, which will cause | |
433 // the entire remoting session to be shut down. | |
434 if (stream_sender_) { | |
435 DEMUXER_VLOG(2) << "Reset data stream sender"; | |
436 stream_sender_.reset(); | |
437 } | |
438 | 437 |
439 if (write_watcher_.IsWatching()) { | 438 if (write_watcher_.IsWatching()) { |
440 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; | 439 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; |
441 write_watcher_.Cancel(); | 440 write_watcher_.Cancel(); |
442 } | 441 } |
| 442 |
| 443 base::ResetAndReturn(&error_callback_).Run(stop_trigger); |
443 } | 444 } |
444 | 445 |
445 } // namespace remoting | 446 } // namespace remoting |
446 } // namespace media | 447 } // namespace media |
OLD | NEW |