Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(466)

Side by Side Diff: media/remoting/remote_demuxer_stream_adapter.cc

Issue 2643253003: Media Remoting Clean-up: Less-redundant naming, style consistency, etc. (Closed)
Patch Set: REBASE Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/remoting/remote_demuxer_stream_adapter.h"
6
7 #include "base/base64.h"
8 #include "base/bind.h"
9 #include "base/callback_helpers.h"
10 #include "media/base/bind_to_current_loop.h"
11 #include "media/base/decoder_buffer.h"
12 #include "media/base/timestamp_constants.h"
13 #include "media/remoting/rpc/proto_enum_utils.h"
14 #include "media/remoting/rpc/proto_utils.h"
15
16 // Convenience logging macro used throughout this file.
17 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: "
18
19 namespace media {
20 namespace remoting {
21
22 // static
23 mojo::DataPipe* CreateDataPipe() {
24 // Capacity in bytes for Mojo data pipe.
25 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024;
26
27 MojoCreateDataPipeOptions options;
28 options.struct_size = sizeof(MojoCreateDataPipeOptions);
29 options.flags = MOJO_WRITE_DATA_FLAG_NONE;
30 options.element_num_bytes = 1;
31 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes;
32 return new mojo::DataPipe(options);
33 }
34
35 RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter(
36 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
37 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner,
38 const std::string& name,
39 ::media::DemuxerStream* demuxer_stream,
40 const base::WeakPtr<RpcBroker>& rpc_broker,
41 int rpc_handle,
42 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info,
43 mojo::ScopedDataPipeProducerHandle producer_handle,
44 const ErrorCallback& error_callback)
45 : main_task_runner_(std::move(main_task_runner)),
46 media_task_runner_(std::move(media_task_runner)),
47 name_(name),
48 rpc_broker_(rpc_broker),
49 rpc_handle_(rpc_handle),
50 demuxer_stream_(demuxer_stream),
51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN),
52 error_callback_(error_callback),
53 remote_callback_handle_(kInvalidHandle),
54 read_until_callback_handle_(kInvalidHandle),
55 read_until_count_(0),
56 last_count_(0),
57 pending_flush_(false),
58 current_pending_frame_offset_(0),
59 pending_frame_is_eos_(false),
60 media_status_(::media::DemuxerStream::kOk),
61 producer_handle_(std::move(producer_handle)),
62 bytes_written_to_pipe_(0),
63 request_buffer_weak_factory_(this),
64 weak_factory_(this) {
65 DCHECK(main_task_runner_);
66 DCHECK(media_task_runner_);
67 DCHECK(media_task_runner_->BelongsToCurrentThread());
68 DCHECK(demuxer_stream);
69 DCHECK(!error_callback.is_null());
70 const RpcBroker::ReceiveMessageCallback receive_callback =
71 media::BindToCurrentLoop(
72 base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc,
73 weak_factory_.GetWeakPtr()));
74 main_task_runner_->PostTask(
75 FROM_HERE,
76 base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback,
77 rpc_broker_, rpc_handle_, receive_callback));
78
79 stream_sender_.Bind(std::move(stream_sender_info));
80 stream_sender_.set_connection_error_handler(
81 base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError,
82 weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR));
83 }
84
85 RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() {
86 DCHECK(media_task_runner_->BelongsToCurrentThread());
87 main_task_runner_->PostTask(
88 FROM_HERE,
89 base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback,
90 rpc_broker_, rpc_handle_));
91 }
92
93 int64_t RemoteDemuxerStreamAdapter::GetBytesWrittenAndReset() {
94 DCHECK(media_task_runner_->BelongsToCurrentThread());
95 const int64_t current_count = bytes_written_to_pipe_;
96 bytes_written_to_pipe_ = 0;
97 return current_count;
98 }
99
100 base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush(
101 bool flushing) {
102 DCHECK(media_task_runner_->BelongsToCurrentThread());
103 DEMUXER_VLOG(2) << "flushing=" << flushing;
104
105 // Ignores if |pending_flush_| states is same.
106 if (pending_flush_ == flushing)
107 return base::nullopt;
108
109 // Cleans up pending frame data.
110 pending_frame_.clear();
111 current_pending_frame_offset_ = 0;
112 pending_frame_is_eos_ = false;
113 // Invalidates pending Read() tasks.
114 request_buffer_weak_factory_.InvalidateWeakPtrs();
115
116 // Cancels in flight data in browser process.
117 pending_flush_ = flushing;
118 if (flushing) {
119 stream_sender_->CancelInFlightData();
120 } else {
121 // Sets callback handle invalid to abort ongoing read request.
122 read_until_callback_handle_ = kInvalidHandle;
123 }
124 return last_count_;
125 }
126
127 void RemoteDemuxerStreamAdapter::OnReceivedRpc(
128 std::unique_ptr<remoting::pb::RpcMessage> message) {
129 DCHECK(media_task_runner_->BelongsToCurrentThread());
130 DCHECK(message);
131 DCHECK(rpc_handle_ == message->handle());
132
133 switch (message->proc()) {
134 case remoting::pb::RpcMessage::RPC_DS_INITIALIZE:
135 Initialize(message->integer_value());
136 break;
137 case remoting::pb::RpcMessage::RPC_DS_READUNTIL:
138 ReadUntil(std::move(message));
139 break;
140 case remoting::pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER:
141 EnableBitstreamConverter();
142 break;
143
144 default:
145 DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc();
146 }
147 }
148
149 void RemoteDemuxerStreamAdapter::Initialize(int remote_callback_handle) {
150 DCHECK(media_task_runner_->BelongsToCurrentThread());
151 DCHECK(!pending_flush_);
152 DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle="
153 << remote_callback_handle;
154
155 // Checks if initialization had been called or not.
156 if (remote_callback_handle_ != kInvalidHandle) {
157 DEMUXER_VLOG(1) << "Duplicated initialization. Have: "
158 << remote_callback_handle_
159 << ", Given: " << remote_callback_handle;
160 // Shuts down data pipe if available if providing different remote callback
161 // handle for initialization. Otherwise, just silently ignore the duplicated
162 // request.
163 if (remote_callback_handle_ != remote_callback_handle) {
164 OnFatalError(PEERS_OUT_OF_SYNC);
165 }
166 return;
167 }
168 remote_callback_handle_ = remote_callback_handle;
169
170 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message.
171 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage());
172 rpc->set_handle(remote_callback_handle_);
173 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK);
174 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc();
175 init_cb_message->set_type(type_);
176 switch (type_) {
177 case ::media::DemuxerStream::Type::AUDIO: {
178 audio_config_ = demuxer_stream_->audio_decoder_config();
179 pb::AudioDecoderConfig* audio_message =
180 init_cb_message->mutable_audio_decoder_config();
181 ConvertAudioDecoderConfigToProto(audio_config_, audio_message);
182 break;
183 }
184 case ::media::DemuxerStream::Type::VIDEO: {
185 video_config_ = demuxer_stream_->video_decoder_config();
186 pb::VideoDecoderConfig* video_message =
187 init_cb_message->mutable_video_decoder_config();
188 ConvertVideoDecoderConfigToProto(video_config_, video_message);
189 break;
190 }
191 default:
192 NOTREACHED();
193 }
194
195 DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle()
196 << " with decoder_config={"
197 << (type_ == ::media::DemuxerStream::Type::AUDIO
198 ? audio_config_.AsHumanReadableString()
199 : video_config_.AsHumanReadableString())
200 << '}';
201 main_task_runner_->PostTask(
202 FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote,
203 rpc_broker_, base::Passed(&rpc)));
204
205 // Starts Mojo watcher.
206 if (!write_watcher_.IsWatching()) {
207 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher";
208 write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
209 base::Bind(&RemoteDemuxerStreamAdapter::TryWriteData,
210 weak_factory_.GetWeakPtr()));
211 }
212 }
213
214 void RemoteDemuxerStreamAdapter::ReadUntil(
215 std::unique_ptr<remoting::pb::RpcMessage> message) {
216 DCHECK(media_task_runner_->BelongsToCurrentThread());
217 DCHECK(message);
218 if (!message->has_demuxerstream_readuntil_rpc()) {
219 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC";
220 OnFatalError(RPC_INVALID);
221 return;
222 }
223
224 const pb::DemuxerStreamReadUntil& rpc_message =
225 message->demuxerstream_readuntil_rpc();
226 DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle="
227 << rpc_message.callback_handle()
228 << ", count=" << rpc_message.count();
229
230 if (pending_flush_) {
231 DEMUXER_VLOG(2) << "Skip actions since it's in the flushing state";
232 return;
233 }
234
235 if (IsProcessingReadRequest()) {
236 DEMUXER_VLOG(2) << "Ignore read request while it's in the reading state.";
237 return;
238 }
239
240 if (rpc_message.count() <= last_count_) {
241 DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to "
242 "current frame count";
243 return;
244 }
245
246 read_until_count_ = rpc_message.count();
247 read_until_callback_handle_ = rpc_message.callback_handle();
248 RequestBuffer();
249 }
250
251 void RemoteDemuxerStreamAdapter::EnableBitstreamConverter() {
252 DCHECK(media_task_runner_->BelongsToCurrentThread());
253 DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER";
254 demuxer_stream_->EnableBitstreamConverter();
255 }
256
257 void RemoteDemuxerStreamAdapter::RequestBuffer() {
258 DCHECK(media_task_runner_->BelongsToCurrentThread());
259 if (!IsProcessingReadRequest() || pending_flush_) {
260 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state";
261 return;
262 }
263 demuxer_stream_->Read(base::Bind(&RemoteDemuxerStreamAdapter::OnNewBuffer,
264 request_buffer_weak_factory_.GetWeakPtr()));
265 }
266
267 void RemoteDemuxerStreamAdapter::OnNewBuffer(
268 ::media::DemuxerStream::Status status,
269 const scoped_refptr<::media::DecoderBuffer>& input) {
270 DEMUXER_VLOG(3) << "status=" << status;
271 DCHECK(media_task_runner_->BelongsToCurrentThread());
272 if (!IsProcessingReadRequest() || pending_flush_) {
273 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state";
274 return;
275 }
276
277 switch (status) {
278 case ::media::DemuxerStream::kAborted:
279 DCHECK(!input);
280 SendReadAck();
281 return;
282 case ::media::DemuxerStream::kConfigChanged:
283 // TODO(erickung): consider sending updated Audio/Video decoder config to
284 // RemotingRendererController.
285 // Stores available audio/video decoder config and issues
286 // RPC_DS_READUNTIL_CALLBACK RPC to notify receiver.
287 DCHECK(!input);
288 media_status_ = status;
289 if (demuxer_stream_->type() == ::media::DemuxerStream::VIDEO)
290 video_config_ = demuxer_stream_->video_decoder_config();
291 if (demuxer_stream_->type() == ::media::DemuxerStream::AUDIO)
292 audio_config_ = demuxer_stream_->audio_decoder_config();
293 SendReadAck();
294 return;
295 case ::media::DemuxerStream::kOk: {
296 media_status_ = status;
297 DCHECK(pending_frame_.empty());
298 if (!producer_handle_.is_valid())
299 return; // Do not start sending (due to previous fatal error).
300 pending_frame_ = DecoderBufferToByteArray(input);
301 pending_frame_is_eos_ = input->end_of_stream();
302 TryWriteData(MOJO_RESULT_OK);
303 } break;
304 }
305 }
306
307 void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) {
308 DCHECK(media_task_runner_->BelongsToCurrentThread());
309 // The Mojo watcher will also call TryWriteData() sometimes as a notification
310 // that data pipe is ready. But that does not necessarily mean the data for a
311 // read request is ready to be written into the pipe.
312 if (!IsProcessingReadRequest() || pending_flush_) {
313 DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state";
314 return;
315 }
316
317 if (pending_frame_.empty()) {
318 DEMUXER_VLOG(3) << "No data available, waiting for demuxer";
319 return;
320 }
321
322 if (!stream_sender_ || !producer_handle_.is_valid()) {
323 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid";
324 return;
325 }
326
327 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_;
328 MojoResult mojo_result =
329 WriteDataRaw(producer_handle_.get(),
330 pending_frame_.data() + current_pending_frame_offset_,
331 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
332 if (mojo_result != MOJO_RESULT_OK) {
333 if (mojo_result != MOJO_RESULT_SHOULD_WAIT) {
334 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:"
335 << mojo_result;
336 OnFatalError(MOJO_PIPE_ERROR);
337 }
338 return;
339 }
340
341 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes,
342 pending_frame_.size());
343 current_pending_frame_offset_ += num_bytes;
344 bytes_written_to_pipe_ += num_bytes;
345
346 // Checks if all buffer was written to browser process.
347 if (current_pending_frame_offset_ != pending_frame_.size()) {
348 // Returns and wait for mojo watcher to notify to write more data.
349 return;
350 }
351
352 // Signal mojo remoting service that all frame buffer is written to data pipe.
353 stream_sender_->SendFrame();
354
355 // Resets frame buffer variables.
356 bool pending_frame_is_eos = pending_frame_is_eos_;
357 ++last_count_;
358 ResetPendingFrame();
359
360 // Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message.
361 if (read_until_count_ == last_count_ || pending_frame_is_eos) {
362 SendReadAck();
363 return;
364 }
365
366 // Contiune to read decoder buffer until reaching |read_until_count_| or
367 // end of stream.
368 media_task_runner_->PostTask(
369 FROM_HERE, base::Bind(&RemoteDemuxerStreamAdapter::RequestBuffer,
370 weak_factory_.GetWeakPtr()));
371 }
372
373 void RemoteDemuxerStreamAdapter::SendReadAck() {
374 DCHECK(media_task_runner_->BelongsToCurrentThread());
375 DEMUXER_VLOG(3) << "last_count_=" << last_count_
376 << ", remote_read_callback_handle="
377 << read_until_callback_handle_
378 << ", media_status=" << media_status_;
379 // Issues RPC_DS_READUNTIL_CALLBACK RPC message.
380 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage());
381 rpc->set_handle(read_until_callback_handle_);
382 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK);
383 auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc();
384 message->set_count(last_count_);
385 message->set_status(
386 remoting::ToProtoDemuxerStreamStatus(media_status_).value());
387 if (media_status_ == ::media::DemuxerStream::kConfigChanged) {
388 if (audio_config_.IsValidConfig()) {
389 pb::AudioDecoderConfig* audio_message =
390 message->mutable_audio_decoder_config();
391 ConvertAudioDecoderConfigToProto(audio_config_, audio_message);
392 } else if (video_config_.IsValidConfig()) {
393 pb::VideoDecoderConfig* video_message =
394 message->mutable_video_decoder_config();
395 ConvertVideoDecoderConfigToProto(video_config_, video_message);
396 } else {
397 NOTREACHED();
398 }
399 }
400
401 DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle()
402 << " with count=" << message->count()
403 << ", status=" << message->status() << ", decoder_config={"
404 << (audio_config_.IsValidConfig()
405 ? audio_config_.AsHumanReadableString()
406 : video_config_.IsValidConfig()
407 ? video_config_.AsHumanReadableString()
408 : "DID NOT CHANGE")
409 << '}';
410 main_task_runner_->PostTask(
411 FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote,
412 rpc_broker_, base::Passed(&rpc)));
413 // Resets callback handle after completing the reading request.
414 read_until_callback_handle_ = kInvalidHandle;
415
416 // Resets audio/video decoder config since it only sends once.
417 if (audio_config_.IsValidConfig())
418 audio_config_ = ::media::AudioDecoderConfig();
419 if (video_config_.IsValidConfig())
420 video_config_ = ::media::VideoDecoderConfig();
421 }
422
423 void RemoteDemuxerStreamAdapter::ResetPendingFrame() {
424 DCHECK(media_task_runner_->BelongsToCurrentThread());
425 current_pending_frame_offset_ = 0;
426 pending_frame_.clear();
427 pending_frame_is_eos_ = false;
428 }
429
430 void RemoteDemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) {
431 DCHECK(media_task_runner_->BelongsToCurrentThread());
432
433 DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger;
434
435 if (error_callback_.is_null())
436 return;
437
438 if (write_watcher_.IsWatching()) {
439 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher";
440 write_watcher_.Cancel();
441 }
442
443 base::ResetAndReturn(&error_callback_).Run(stop_trigger);
444 }
445
446 } // namespace remoting
447 } // namespace media
OLDNEW
« no previous file with comments | « media/remoting/remote_demuxer_stream_adapter.h ('k') | media/remoting/remote_demuxer_stream_adapter_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698