OLD | NEW |
| (Empty) |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "media/remoting/remote_demuxer_stream_adapter.h" | |
6 | |
7 #include "base/base64.h" | |
8 #include "base/bind.h" | |
9 #include "base/callback_helpers.h" | |
10 #include "media/base/bind_to_current_loop.h" | |
11 #include "media/base/decoder_buffer.h" | |
12 #include "media/base/timestamp_constants.h" | |
13 #include "media/remoting/rpc/proto_enum_utils.h" | |
14 #include "media/remoting/rpc/proto_utils.h" | |
15 | |
16 // Convenience logging macro used throughout this file. | |
17 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " | |
18 | |
19 namespace media { | |
20 namespace remoting { | |
21 | |
22 // static | |
23 mojo::DataPipe* CreateDataPipe() { | |
24 // Capacity in bytes for Mojo data pipe. | |
25 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; | |
26 | |
27 MojoCreateDataPipeOptions options; | |
28 options.struct_size = sizeof(MojoCreateDataPipeOptions); | |
29 options.flags = MOJO_WRITE_DATA_FLAG_NONE; | |
30 options.element_num_bytes = 1; | |
31 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; | |
32 return new mojo::DataPipe(options); | |
33 } | |
34 | |
35 RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( | |
36 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, | |
37 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, | |
38 const std::string& name, | |
39 ::media::DemuxerStream* demuxer_stream, | |
40 const base::WeakPtr<RpcBroker>& rpc_broker, | |
41 int rpc_handle, | |
42 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, | |
43 mojo::ScopedDataPipeProducerHandle producer_handle, | |
44 const ErrorCallback& error_callback) | |
45 : main_task_runner_(std::move(main_task_runner)), | |
46 media_task_runner_(std::move(media_task_runner)), | |
47 name_(name), | |
48 rpc_broker_(rpc_broker), | |
49 rpc_handle_(rpc_handle), | |
50 demuxer_stream_(demuxer_stream), | |
51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), | |
52 error_callback_(error_callback), | |
53 remote_callback_handle_(kInvalidHandle), | |
54 read_until_callback_handle_(kInvalidHandle), | |
55 read_until_count_(0), | |
56 last_count_(0), | |
57 pending_flush_(false), | |
58 current_pending_frame_offset_(0), | |
59 pending_frame_is_eos_(false), | |
60 media_status_(::media::DemuxerStream::kOk), | |
61 producer_handle_(std::move(producer_handle)), | |
62 bytes_written_to_pipe_(0), | |
63 request_buffer_weak_factory_(this), | |
64 weak_factory_(this) { | |
65 DCHECK(main_task_runner_); | |
66 DCHECK(media_task_runner_); | |
67 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
68 DCHECK(demuxer_stream); | |
69 DCHECK(!error_callback.is_null()); | |
70 const RpcBroker::ReceiveMessageCallback receive_callback = | |
71 media::BindToCurrentLoop( | |
72 base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc, | |
73 weak_factory_.GetWeakPtr())); | |
74 main_task_runner_->PostTask( | |
75 FROM_HERE, | |
76 base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback, | |
77 rpc_broker_, rpc_handle_, receive_callback)); | |
78 | |
79 stream_sender_.Bind(std::move(stream_sender_info)); | |
80 stream_sender_.set_connection_error_handler( | |
81 base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError, | |
82 weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR)); | |
83 } | |
84 | |
85 RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { | |
86 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
87 main_task_runner_->PostTask( | |
88 FROM_HERE, | |
89 base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback, | |
90 rpc_broker_, rpc_handle_)); | |
91 } | |
92 | |
93 int64_t RemoteDemuxerStreamAdapter::GetBytesWrittenAndReset() { | |
94 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
95 const int64_t current_count = bytes_written_to_pipe_; | |
96 bytes_written_to_pipe_ = 0; | |
97 return current_count; | |
98 } | |
99 | |
100 base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush( | |
101 bool flushing) { | |
102 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
103 DEMUXER_VLOG(2) << "flushing=" << flushing; | |
104 | |
105 // Ignores if |pending_flush_| states is same. | |
106 if (pending_flush_ == flushing) | |
107 return base::nullopt; | |
108 | |
109 // Cleans up pending frame data. | |
110 pending_frame_.clear(); | |
111 current_pending_frame_offset_ = 0; | |
112 pending_frame_is_eos_ = false; | |
113 // Invalidates pending Read() tasks. | |
114 request_buffer_weak_factory_.InvalidateWeakPtrs(); | |
115 | |
116 // Cancels in flight data in browser process. | |
117 pending_flush_ = flushing; | |
118 if (flushing) { | |
119 stream_sender_->CancelInFlightData(); | |
120 } else { | |
121 // Sets callback handle invalid to abort ongoing read request. | |
122 read_until_callback_handle_ = kInvalidHandle; | |
123 } | |
124 return last_count_; | |
125 } | |
126 | |
127 void RemoteDemuxerStreamAdapter::OnReceivedRpc( | |
128 std::unique_ptr<remoting::pb::RpcMessage> message) { | |
129 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
130 DCHECK(message); | |
131 DCHECK(rpc_handle_ == message->handle()); | |
132 | |
133 switch (message->proc()) { | |
134 case remoting::pb::RpcMessage::RPC_DS_INITIALIZE: | |
135 Initialize(message->integer_value()); | |
136 break; | |
137 case remoting::pb::RpcMessage::RPC_DS_READUNTIL: | |
138 ReadUntil(std::move(message)); | |
139 break; | |
140 case remoting::pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER: | |
141 EnableBitstreamConverter(); | |
142 break; | |
143 | |
144 default: | |
145 DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc(); | |
146 } | |
147 } | |
148 | |
149 void RemoteDemuxerStreamAdapter::Initialize(int remote_callback_handle) { | |
150 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
151 DCHECK(!pending_flush_); | |
152 DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle=" | |
153 << remote_callback_handle; | |
154 | |
155 // Checks if initialization had been called or not. | |
156 if (remote_callback_handle_ != kInvalidHandle) { | |
157 DEMUXER_VLOG(1) << "Duplicated initialization. Have: " | |
158 << remote_callback_handle_ | |
159 << ", Given: " << remote_callback_handle; | |
160 // Shuts down data pipe if available if providing different remote callback | |
161 // handle for initialization. Otherwise, just silently ignore the duplicated | |
162 // request. | |
163 if (remote_callback_handle_ != remote_callback_handle) { | |
164 OnFatalError(PEERS_OUT_OF_SYNC); | |
165 } | |
166 return; | |
167 } | |
168 remote_callback_handle_ = remote_callback_handle; | |
169 | |
170 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. | |
171 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); | |
172 rpc->set_handle(remote_callback_handle_); | |
173 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); | |
174 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); | |
175 init_cb_message->set_type(type_); | |
176 switch (type_) { | |
177 case ::media::DemuxerStream::Type::AUDIO: { | |
178 audio_config_ = demuxer_stream_->audio_decoder_config(); | |
179 pb::AudioDecoderConfig* audio_message = | |
180 init_cb_message->mutable_audio_decoder_config(); | |
181 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); | |
182 break; | |
183 } | |
184 case ::media::DemuxerStream::Type::VIDEO: { | |
185 video_config_ = demuxer_stream_->video_decoder_config(); | |
186 pb::VideoDecoderConfig* video_message = | |
187 init_cb_message->mutable_video_decoder_config(); | |
188 ConvertVideoDecoderConfigToProto(video_config_, video_message); | |
189 break; | |
190 } | |
191 default: | |
192 NOTREACHED(); | |
193 } | |
194 | |
195 DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle() | |
196 << " with decoder_config={" | |
197 << (type_ == ::media::DemuxerStream::Type::AUDIO | |
198 ? audio_config_.AsHumanReadableString() | |
199 : video_config_.AsHumanReadableString()) | |
200 << '}'; | |
201 main_task_runner_->PostTask( | |
202 FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote, | |
203 rpc_broker_, base::Passed(&rpc))); | |
204 | |
205 // Starts Mojo watcher. | |
206 if (!write_watcher_.IsWatching()) { | |
207 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher"; | |
208 write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, | |
209 base::Bind(&RemoteDemuxerStreamAdapter::TryWriteData, | |
210 weak_factory_.GetWeakPtr())); | |
211 } | |
212 } | |
213 | |
214 void RemoteDemuxerStreamAdapter::ReadUntil( | |
215 std::unique_ptr<remoting::pb::RpcMessage> message) { | |
216 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
217 DCHECK(message); | |
218 if (!message->has_demuxerstream_readuntil_rpc()) { | |
219 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; | |
220 OnFatalError(RPC_INVALID); | |
221 return; | |
222 } | |
223 | |
224 const pb::DemuxerStreamReadUntil& rpc_message = | |
225 message->demuxerstream_readuntil_rpc(); | |
226 DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle=" | |
227 << rpc_message.callback_handle() | |
228 << ", count=" << rpc_message.count(); | |
229 | |
230 if (pending_flush_) { | |
231 DEMUXER_VLOG(2) << "Skip actions since it's in the flushing state"; | |
232 return; | |
233 } | |
234 | |
235 if (IsProcessingReadRequest()) { | |
236 DEMUXER_VLOG(2) << "Ignore read request while it's in the reading state."; | |
237 return; | |
238 } | |
239 | |
240 if (rpc_message.count() <= last_count_) { | |
241 DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to " | |
242 "current frame count"; | |
243 return; | |
244 } | |
245 | |
246 read_until_count_ = rpc_message.count(); | |
247 read_until_callback_handle_ = rpc_message.callback_handle(); | |
248 RequestBuffer(); | |
249 } | |
250 | |
251 void RemoteDemuxerStreamAdapter::EnableBitstreamConverter() { | |
252 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
253 DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER"; | |
254 demuxer_stream_->EnableBitstreamConverter(); | |
255 } | |
256 | |
257 void RemoteDemuxerStreamAdapter::RequestBuffer() { | |
258 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
259 if (!IsProcessingReadRequest() || pending_flush_) { | |
260 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; | |
261 return; | |
262 } | |
263 demuxer_stream_->Read(base::Bind(&RemoteDemuxerStreamAdapter::OnNewBuffer, | |
264 request_buffer_weak_factory_.GetWeakPtr())); | |
265 } | |
266 | |
267 void RemoteDemuxerStreamAdapter::OnNewBuffer( | |
268 ::media::DemuxerStream::Status status, | |
269 const scoped_refptr<::media::DecoderBuffer>& input) { | |
270 DEMUXER_VLOG(3) << "status=" << status; | |
271 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
272 if (!IsProcessingReadRequest() || pending_flush_) { | |
273 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; | |
274 return; | |
275 } | |
276 | |
277 switch (status) { | |
278 case ::media::DemuxerStream::kAborted: | |
279 DCHECK(!input); | |
280 SendReadAck(); | |
281 return; | |
282 case ::media::DemuxerStream::kConfigChanged: | |
283 // TODO(erickung): consider sending updated Audio/Video decoder config to | |
284 // RemotingRendererController. | |
285 // Stores available audio/video decoder config and issues | |
286 // RPC_DS_READUNTIL_CALLBACK RPC to notify receiver. | |
287 DCHECK(!input); | |
288 media_status_ = status; | |
289 if (demuxer_stream_->type() == ::media::DemuxerStream::VIDEO) | |
290 video_config_ = demuxer_stream_->video_decoder_config(); | |
291 if (demuxer_stream_->type() == ::media::DemuxerStream::AUDIO) | |
292 audio_config_ = demuxer_stream_->audio_decoder_config(); | |
293 SendReadAck(); | |
294 return; | |
295 case ::media::DemuxerStream::kOk: { | |
296 media_status_ = status; | |
297 DCHECK(pending_frame_.empty()); | |
298 if (!producer_handle_.is_valid()) | |
299 return; // Do not start sending (due to previous fatal error). | |
300 pending_frame_ = DecoderBufferToByteArray(input); | |
301 pending_frame_is_eos_ = input->end_of_stream(); | |
302 TryWriteData(MOJO_RESULT_OK); | |
303 } break; | |
304 } | |
305 } | |
306 | |
307 void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { | |
308 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
309 // The Mojo watcher will also call TryWriteData() sometimes as a notification | |
310 // that data pipe is ready. But that does not necessarily mean the data for a | |
311 // read request is ready to be written into the pipe. | |
312 if (!IsProcessingReadRequest() || pending_flush_) { | |
313 DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state"; | |
314 return; | |
315 } | |
316 | |
317 if (pending_frame_.empty()) { | |
318 DEMUXER_VLOG(3) << "No data available, waiting for demuxer"; | |
319 return; | |
320 } | |
321 | |
322 if (!stream_sender_ || !producer_handle_.is_valid()) { | |
323 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid"; | |
324 return; | |
325 } | |
326 | |
327 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_; | |
328 MojoResult mojo_result = | |
329 WriteDataRaw(producer_handle_.get(), | |
330 pending_frame_.data() + current_pending_frame_offset_, | |
331 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); | |
332 if (mojo_result != MOJO_RESULT_OK) { | |
333 if (mojo_result != MOJO_RESULT_SHOULD_WAIT) { | |
334 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" | |
335 << mojo_result; | |
336 OnFatalError(MOJO_PIPE_ERROR); | |
337 } | |
338 return; | |
339 } | |
340 | |
341 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, | |
342 pending_frame_.size()); | |
343 current_pending_frame_offset_ += num_bytes; | |
344 bytes_written_to_pipe_ += num_bytes; | |
345 | |
346 // Checks if all buffer was written to browser process. | |
347 if (current_pending_frame_offset_ != pending_frame_.size()) { | |
348 // Returns and wait for mojo watcher to notify to write more data. | |
349 return; | |
350 } | |
351 | |
352 // Signal mojo remoting service that all frame buffer is written to data pipe. | |
353 stream_sender_->SendFrame(); | |
354 | |
355 // Resets frame buffer variables. | |
356 bool pending_frame_is_eos = pending_frame_is_eos_; | |
357 ++last_count_; | |
358 ResetPendingFrame(); | |
359 | |
360 // Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message. | |
361 if (read_until_count_ == last_count_ || pending_frame_is_eos) { | |
362 SendReadAck(); | |
363 return; | |
364 } | |
365 | |
366 // Contiune to read decoder buffer until reaching |read_until_count_| or | |
367 // end of stream. | |
368 media_task_runner_->PostTask( | |
369 FROM_HERE, base::Bind(&RemoteDemuxerStreamAdapter::RequestBuffer, | |
370 weak_factory_.GetWeakPtr())); | |
371 } | |
372 | |
373 void RemoteDemuxerStreamAdapter::SendReadAck() { | |
374 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
375 DEMUXER_VLOG(3) << "last_count_=" << last_count_ | |
376 << ", remote_read_callback_handle=" | |
377 << read_until_callback_handle_ | |
378 << ", media_status=" << media_status_; | |
379 // Issues RPC_DS_READUNTIL_CALLBACK RPC message. | |
380 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); | |
381 rpc->set_handle(read_until_callback_handle_); | |
382 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); | |
383 auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc(); | |
384 message->set_count(last_count_); | |
385 message->set_status( | |
386 remoting::ToProtoDemuxerStreamStatus(media_status_).value()); | |
387 if (media_status_ == ::media::DemuxerStream::kConfigChanged) { | |
388 if (audio_config_.IsValidConfig()) { | |
389 pb::AudioDecoderConfig* audio_message = | |
390 message->mutable_audio_decoder_config(); | |
391 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); | |
392 } else if (video_config_.IsValidConfig()) { | |
393 pb::VideoDecoderConfig* video_message = | |
394 message->mutable_video_decoder_config(); | |
395 ConvertVideoDecoderConfigToProto(video_config_, video_message); | |
396 } else { | |
397 NOTREACHED(); | |
398 } | |
399 } | |
400 | |
401 DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle() | |
402 << " with count=" << message->count() | |
403 << ", status=" << message->status() << ", decoder_config={" | |
404 << (audio_config_.IsValidConfig() | |
405 ? audio_config_.AsHumanReadableString() | |
406 : video_config_.IsValidConfig() | |
407 ? video_config_.AsHumanReadableString() | |
408 : "DID NOT CHANGE") | |
409 << '}'; | |
410 main_task_runner_->PostTask( | |
411 FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote, | |
412 rpc_broker_, base::Passed(&rpc))); | |
413 // Resets callback handle after completing the reading request. | |
414 read_until_callback_handle_ = kInvalidHandle; | |
415 | |
416 // Resets audio/video decoder config since it only sends once. | |
417 if (audio_config_.IsValidConfig()) | |
418 audio_config_ = ::media::AudioDecoderConfig(); | |
419 if (video_config_.IsValidConfig()) | |
420 video_config_ = ::media::VideoDecoderConfig(); | |
421 } | |
422 | |
423 void RemoteDemuxerStreamAdapter::ResetPendingFrame() { | |
424 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
425 current_pending_frame_offset_ = 0; | |
426 pending_frame_.clear(); | |
427 pending_frame_is_eos_ = false; | |
428 } | |
429 | |
430 void RemoteDemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) { | |
431 DCHECK(media_task_runner_->BelongsToCurrentThread()); | |
432 | |
433 DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger; | |
434 | |
435 if (error_callback_.is_null()) | |
436 return; | |
437 | |
438 if (write_watcher_.IsWatching()) { | |
439 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; | |
440 write_watcher_.Cancel(); | |
441 } | |
442 | |
443 base::ResetAndReturn(&error_callback_).Run(stop_trigger); | |
444 } | |
445 | |
446 } // namespace remoting | |
447 } // namespace media | |
OLD | NEW |