Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(86)

Side by Side Diff: media/remoting/stream_provider.cc

Issue 2808583002: RELAND: Media Remoting end to end integration tests. (Closed)
Patch Set: Rebased. Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « media/remoting/stream_provider.h ('k') | media/test/BUILD.gn » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/remoting/stream_provider.h"
6
7 #include "base/bind.h"
8 #include "base/callback.h"
9 #include "base/callback_helpers.h"
10 #include "base/logging.h"
11 #include "media/base/decoder_buffer.h"
12 #include "media/base/video_rotation.h"
13 #include "media/remoting/proto_enum_utils.h"
14 #include "media/remoting/proto_utils.h"
15
16 namespace media {
17 namespace remoting {
18
19 namespace {
20 // The number of frames requested in each ReadUntil RPC message.
21 constexpr int kNumFramesInEachReadUntil = 10;
22 }
23
24 // An implementation of media::DemuxerStream on Media Remoting receiver.
25 // Receives data from mojo data pipe, and returns one frame or/and status when
26 // Read() is called.
27 class MediaStream final : public DemuxerStream {
28 public:
29 MediaStream(RpcBroker* rpc_broker,
30 Type type,
31 int remote_handle,
32 const base::Closure& error_callback);
33 ~MediaStream() override;
34
35 // DemuxerStream implementation.
36 void Read(const ReadCB& read_cb) override;
37 AudioDecoderConfig audio_decoder_config() override;
38 VideoDecoderConfig video_decoder_config() override;
39 DemuxerStream::Type type() const override;
40 Liveness liveness() const override;
41 bool SupportsConfigChanges() override;
42 VideoRotation video_rotation() override;
43
44 void Initialize(const base::Closure& init_done_cb);
45 void FlushUntil(int count);
46 void AppendBuffer(scoped_refptr<DecoderBuffer> buffer);
47
48 private:
49 // RPC messages handlers.
50 void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message);
51 void OnInitializeCallback(std::unique_ptr<pb::RpcMessage> message);
52 void OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message);
53
54 // Issues the ReadUntil RPC message when read is pending and buffer is empty.
55 void SendReadUntil();
56
57 // Run and reset the read callback.
58 void CompleteRead(DemuxerStream::Status status);
59
60 // Update the audio/video decoder config When config changes in the mid
61 // stream, the new config will be stored in
62 // |next_audio/video_decoder_config_|. Old config will be droped when all
63 // associated frames are consumed.
64 void UpdateConfig(const pb::AudioDecoderConfig* audio_message,
65 const pb::VideoDecoderConfig* video_message);
66
67 // Called when any error occurs.
68 void OnError(const std::string& error);
69
70 RpcBroker* const rpc_broker_; // Outlives this class.
71 const Type type_;
72 const int remote_handle_;
73 const int rpc_handle_;
74
75 // Set when Initialize() is called, and will be run only once after
76 // initialization is done.
77 base::Closure init_done_callback_;
78
79 // The read until count in the last ReadUntil RPC message.
80 int last_read_until_count_ = 0;
81
82 // Indicates whether Audio/VideoDecoderConfig changed and the frames with the
83 // old config are not yet consumed. The new config is stored in the end of
84 // |audio/video_decoder_config_|;
85 bool config_changed_ = false;
86
87 // Indicates whether a ReadUntil RPC message was sent without receiving the
88 // ReadUntilCallback message yet.
89 bool read_until_sent_ = false;
90
91 // Set when Read() is called. Run only once when read completes.
92 ReadCB read_complete_callback_;
93
94 base::Closure error_callback_; // Called only once when first error occurs.
95
96 std::deque<scoped_refptr<DecoderBuffer>> buffers_;
97
98 // Current audio/video config.
99 AudioDecoderConfig audio_decoder_config_;
100 VideoDecoderConfig video_decoder_config_;
101
102 // Stores the new auido/video config when config changes.
103 AudioDecoderConfig next_audio_decoder_config_;
104 VideoDecoderConfig next_video_decoder_config_;
105
106 base::WeakPtrFactory<MediaStream> weak_factory_;
107
108 DISALLOW_COPY_AND_ASSIGN(MediaStream);
109 };
110
111 MediaStream::MediaStream(RpcBroker* rpc_broker,
112 Type type,
113 int remote_handle,
114 const base::Closure& error_callback)
115 : rpc_broker_(rpc_broker),
116 type_(type),
117 remote_handle_(remote_handle),
118 rpc_handle_(rpc_broker_->GetUniqueHandle()),
119 error_callback_(error_callback),
120 weak_factory_(this) {
121 DCHECK(remote_handle_ != RpcBroker::kInvalidHandle);
122 rpc_broker_->RegisterMessageReceiverCallback(
123 rpc_handle_,
124 base::Bind(&MediaStream::OnReceivedRpc, weak_factory_.GetWeakPtr()));
125 }
126
127 MediaStream::~MediaStream() {
128 rpc_broker_->UnregisterMessageReceiverCallback(rpc_handle_);
129 }
130
131 void MediaStream::Initialize(const base::Closure& init_done_cb) {
132 DCHECK(!init_done_cb.is_null());
133 if (!init_done_callback_.is_null()) {
134 OnError("Duplicate initialization");
135 return;
136 }
137 init_done_callback_ = init_done_cb;
138
139 DVLOG(3) << __func__ << "Issues RpcMessage::RPC_DS_INITIALIZE with "
140 << "remote_handle=" << remote_handle_
141 << " rpc_handle=" << rpc_handle_;
142 std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
143 rpc->set_handle(remote_handle_);
144 rpc->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE);
145 rpc->set_integer_value(rpc_handle_);
146 rpc_broker_->SendMessageToRemote(std::move(rpc));
147 }
148
149 void MediaStream::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
150 DCHECK(message->handle() == rpc_handle_);
151
152 switch (message->proc()) {
153 case pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK:
154 OnInitializeCallback(std::move(message));
155 break;
156 case pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK:
157 OnReadUntilCallback(std::move(message));
158 break;
159 default:
160 VLOG(3) << __func__ << "Unknow RPC message.";
161 }
162 }
163
164 void MediaStream::OnInitializeCallback(
165 std::unique_ptr<pb::RpcMessage> message) {
166 DVLOG(3) << __func__ << "Receives RPC_DS_INITIALIZE_CALLBACK message.";
167 const pb::DemuxerStreamInitializeCallback callback_message =
168 message->demuxerstream_initializecb_rpc();
169 if (callback_message.type() != type_) {
170 OnError("Wrong type");
171 return;
172 }
173 if ((type_ == DemuxerStream::AUDIO &&
174 audio_decoder_config_.IsValidConfig()) ||
175 (type_ == DemuxerStream::VIDEO &&
176 video_decoder_config_.IsValidConfig())) {
177 OnError("Duplicate Iniitialize");
178 return;
179 }
180 if (init_done_callback_.is_null()) {
181 OnError("Iniitialize callback missing");
182 return;
183 }
184
185 if (type_ == DemuxerStream::AUDIO &&
186 callback_message.has_audio_decoder_config()) {
187 const pb::AudioDecoderConfig audio_message =
188 callback_message.audio_decoder_config();
189 UpdateConfig(&audio_message, nullptr);
190 } else if (type_ == DemuxerStream::VIDEO &&
191 callback_message.has_video_decoder_config()) {
192 const pb::VideoDecoderConfig video_message =
193 callback_message.video_decoder_config();
194 UpdateConfig(nullptr, &video_message);
195 } else {
196 OnError("config missing");
197 return;
198 }
199 base::ResetAndReturn(&init_done_callback_).Run();
200 }
201
202 void MediaStream::OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message) {
203 DVLOG(3) << __func__ << ": Receives RPC_DS_READUNTIL_CALLBACK message.";
204 if (!read_until_sent_) {
205 OnError("Unexpected ReadUntilCallback");
206 return;
207 }
208 read_until_sent_ = false;
209 const pb::DemuxerStreamReadUntilCallback callback_message =
210 message->demuxerstream_readuntilcb_rpc();
211 last_read_until_count_ = callback_message.count();
212 if (ToDemuxerStreamStatus(callback_message.status()) == kConfigChanged) {
213 config_changed_ = true;
214 if (callback_message.has_audio_decoder_config()) {
215 const pb::AudioDecoderConfig audio_message =
216 callback_message.audio_decoder_config();
217 UpdateConfig(&audio_message, nullptr);
218 }
219 if (callback_message.has_video_decoder_config()) {
220 const pb::VideoDecoderConfig video_message =
221 callback_message.video_decoder_config();
222 UpdateConfig(nullptr, &video_message);
223 }
224 if (buffers_.empty() && !read_complete_callback_.is_null())
225 CompleteRead(DemuxerStream::kConfigChanged);
226 return;
227 }
228 if (buffers_.empty() && !read_complete_callback_.is_null())
229 SendReadUntil();
230 }
231
232 void MediaStream::UpdateConfig(const pb::AudioDecoderConfig* audio_message,
233 const pb::VideoDecoderConfig* video_message) {
234 if (type_ == AUDIO) {
235 DCHECK(audio_message && !video_message);
236 AudioDecoderConfig audio_config;
237 ConvertProtoToAudioDecoderConfig(*audio_message, &audio_config);
238 if (!audio_config.IsValidConfig()) {
239 OnError("Invalid audio config");
240 return;
241 }
242 if (config_changed_) {
243 DCHECK(audio_decoder_config_.IsValidConfig());
244 DCHECK(!next_audio_decoder_config_.IsValidConfig());
245 next_audio_decoder_config_ = audio_config;
246 } else {
247 DCHECK(!audio_decoder_config_.IsValidConfig());
248 audio_decoder_config_ = audio_config;
249 }
250 } else if (type_ == VIDEO) {
251 DCHECK(video_message && !audio_message);
252 VideoDecoderConfig video_config;
253 ConvertProtoToVideoDecoderConfig(*video_message, &video_config);
254 if (!video_config.IsValidConfig()) {
255 OnError("Invalid video config");
256 return;
257 }
258 if (config_changed_) {
259 DCHECK(video_decoder_config_.IsValidConfig());
260 DCHECK(!next_video_decoder_config_.IsValidConfig());
261 next_video_decoder_config_ = video_config;
262 } else {
263 DCHECK(!video_decoder_config_.IsValidConfig());
264 video_decoder_config_ = video_config;
265 }
266 } else {
267 NOTREACHED() << ": Only supports video or audio stream.";
268 }
269 }
270
271 void MediaStream::SendReadUntil() {
272 if (read_until_sent_)
273 return;
274 DVLOG(3) << "Issues RPC_DS_READUNTIL RPC message to remote_handle_="
275 << remote_handle_ << " with callback handle=" << rpc_handle_
276 << " count=" << last_read_until_count_;
277
278 std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
279 rpc->set_handle(remote_handle_);
280 rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL);
281 auto* message = rpc->mutable_demuxerstream_readuntil_rpc();
282 last_read_until_count_ += kNumFramesInEachReadUntil;
283 message->set_count(last_read_until_count_);
284 message->set_callback_handle(rpc_handle_);
285 rpc_broker_->SendMessageToRemote(std::move(rpc));
286 read_until_sent_ = true;
287 }
288
289 void MediaStream::FlushUntil(int count) {
290 while (!buffers_.empty()) {
291 buffers_.pop_front();
292 }
293
294 last_read_until_count_ = count;
295 if (!read_complete_callback_.is_null())
296 CompleteRead(DemuxerStream::kAborted);
297 read_until_sent_ = false;
298 }
299
300 void MediaStream::Read(const ReadCB& read_cb) {
301 DCHECK(read_complete_callback_.is_null());
302 DCHECK(!read_cb.is_null());
303 read_complete_callback_ = read_cb;
304 if (buffers_.empty() && config_changed_) {
305 CompleteRead(DemuxerStream::kConfigChanged);
306 return;
307 }
308
309 // Wait for more data.
310 if (buffers_.empty()) {
311 SendReadUntil();
312 return;
313 }
314
315 CompleteRead(DemuxerStream::kOk);
316 }
317
318 void MediaStream::CompleteRead(DemuxerStream::Status status) {
319 DVLOG(3) << __func__ << ": " << status;
320 switch (status) {
321 case DemuxerStream::kConfigChanged:
322 if (type_ == AUDIO) {
323 DCHECK(next_audio_decoder_config_.IsValidConfig());
324 audio_decoder_config_ = next_audio_decoder_config_;
325 #if DCHECK_IS_ON()
326 next_audio_decoder_config_ = AudioDecoderConfig();
327 #endif // DCHECK_IS_ON()
328 } else {
329 DCHECK(next_video_decoder_config_.IsValidConfig());
330 video_decoder_config_ = next_video_decoder_config_;
331 #if DCHECK_IS_ON()
332 next_video_decoder_config_ = VideoDecoderConfig();
333 #endif // DCHECK_IS_ON()
334 }
335 config_changed_ = false;
336 base::ResetAndReturn(&read_complete_callback_).Run(status, nullptr);
337 return;
338 case DemuxerStream::kAborted:
339 base::ResetAndReturn(&read_complete_callback_).Run(status, nullptr);
340 return;
341 case DemuxerStream::kOk:
342 DCHECK(!buffers_.empty());
343 scoped_refptr<DecoderBuffer> frame_data = buffers_.front();
344 buffers_.pop_front();
345 base::ResetAndReturn(&read_complete_callback_).Run(status, frame_data);
346 return;
347 }
348 }
349
350 AudioDecoderConfig MediaStream::audio_decoder_config() {
351 DVLOG(3) << __func__;
352 DCHECK(type_ == DemuxerStream::AUDIO);
353 return audio_decoder_config_;
354 }
355
356 VideoDecoderConfig MediaStream::video_decoder_config() {
357 DVLOG(3) << __func__;
358 DCHECK(type_ == DemuxerStream::VIDEO);
359 return video_decoder_config_;
360 }
361
362 DemuxerStream::Type MediaStream::type() const {
363 return type_;
364 }
365
366 DemuxerStream::Liveness MediaStream::liveness() const {
367 return DemuxerStream::LIVENESS_LIVE;
368 }
369
370 bool MediaStream::SupportsConfigChanges() {
371 return true;
372 }
373
374 VideoRotation MediaStream::video_rotation() {
375 return VideoRotation::VIDEO_ROTATION_0;
376 }
377
378 void MediaStream::AppendBuffer(scoped_refptr<DecoderBuffer> buffer) {
379 DVLOG(3) << __func__;
380 buffers_.push_back(buffer);
381 if (!read_complete_callback_.is_null())
382 CompleteRead(DemuxerStream::kOk);
383 }
384
385 void MediaStream::OnError(const std::string& error) {
386 VLOG(1) << __func__ << ": " << error;
387 if (error_callback_.is_null())
388 return;
389 base::ResetAndReturn(&error_callback_).Run();
390 }
391
392 StreamProvider::StreamProvider(RpcBroker* rpc_broker,
393 const base::Closure& error_callback)
394 : rpc_broker_(rpc_broker),
395 error_callback_(error_callback),
396 weak_factory_(this) {}
397
398 StreamProvider::~StreamProvider() {}
399
400 void StreamProvider::Initialize(int remote_audio_handle,
401 int remote_video_handle,
402 const base::Closure& callback) {
403 DVLOG(3) << __func__ << ": remote_audio_handle=" << remote_audio_handle
404 << " remote_video_handle=" << remote_video_handle;
405 if (!init_done_callback_.is_null()) {
406 OnError("Duplicate initialization.");
407 return;
408 }
409 if (remote_audio_handle == RpcBroker::kInvalidHandle &&
410 remote_video_handle == RpcBroker::kInvalidHandle) {
411 OnError("Invalid handle.");
412 return;
413 }
414
415 init_done_callback_ = callback;
416 if (remote_audio_handle != RpcBroker::kInvalidHandle) {
417 audio_stream_.reset(new MediaStream(
418 rpc_broker_, DemuxerStream::AUDIO, remote_audio_handle,
419 base::Bind(&StreamProvider::OnError, weak_factory_.GetWeakPtr(),
420 "Media stream error")));
421 audio_stream_->Initialize(base::Bind(
422 &StreamProvider::AudioStreamInitialized, weak_factory_.GetWeakPtr()));
423 }
424 if (remote_video_handle != RpcBroker::kInvalidHandle) {
425 video_stream_.reset(new MediaStream(
426 rpc_broker_, DemuxerStream::VIDEO, remote_video_handle,
427 base::Bind(&StreamProvider::OnError, weak_factory_.GetWeakPtr(),
428 "Media stream error")));
429 video_stream_->Initialize(base::Bind(
430 &StreamProvider::VideoStreamInitialized, weak_factory_.GetWeakPtr()));
431 }
432 }
433
434 void StreamProvider::OnError(const std::string& error) {
435 VLOG(1) << __func__ << ": " << error;
436 if (error_callback_.is_null())
437 return;
438 base::ResetAndReturn(&error_callback_).Run();
439 }
440
441 void StreamProvider::AudioStreamInitialized() {
442 DCHECK(!init_done_callback_.is_null());
443 audio_stream_initialized_ = true;
444 if (video_stream_initialized_ || !video_stream_)
445 base::ResetAndReturn(&init_done_callback_).Run();
446 }
447
448 void StreamProvider::VideoStreamInitialized() {
449 DCHECK(!init_done_callback_.is_null());
450 video_stream_initialized_ = true;
451 if (audio_stream_initialized_ || !audio_stream_)
452 base::ResetAndReturn(&init_done_callback_).Run();
453 }
454
455 std::vector<DemuxerStream*> StreamProvider::GetAllStreams() {
456 std::vector<DemuxerStream*> streams;
457 if (audio_stream_)
458 streams.push_back(audio_stream_.get());
459 if (video_stream_)
460 streams.push_back(video_stream_.get());
461 return streams;
462 }
463
464 void StreamProvider::AppendBuffer(DemuxerStream::Type type,
465 scoped_refptr<DecoderBuffer> buffer) {
466 if (type == DemuxerStream::AUDIO)
467 audio_stream_->AppendBuffer(buffer);
468 else if (type == DemuxerStream::VIDEO)
469 video_stream_->AppendBuffer(buffer);
470 else
471 NOTREACHED() << ": Only supports video or audio stream.";
472 }
473
474 void StreamProvider::FlushUntil(DemuxerStream::Type type, int count) {
475 DVLOG(3) << __func__ << ": type=" << type << " count=" << count;
476 if (type == DemuxerStream::AUDIO)
477 audio_stream_->FlushUntil(count);
478 else if (type == DemuxerStream::VIDEO)
479 video_stream_->FlushUntil(count);
480 else
481 NOTREACHED() << ": Only supports video or audio stream.";
482 }
483
484 } // namespace remoting
485 } // namespace media
OLDNEW
« no previous file with comments | « media/remoting/stream_provider.h ('k') | media/test/BUILD.gn » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698