Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(168)

Side by Side Diff: media/remoting/remote_demuxer_stream_adapter.cc

Issue 2631993002: Media Remoting: UMAs to track session events and measurements. (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "media/remoting/remote_demuxer_stream_adapter.h" 5 #include "media/remoting/remote_demuxer_stream_adapter.h"
6 6
7 #include "base/base64.h" 7 #include "base/base64.h"
8 #include "base/bind.h" 8 #include "base/bind.h"
9 #include "base/callback_helpers.h"
9 #include "media/base/bind_to_current_loop.h" 10 #include "media/base/bind_to_current_loop.h"
10 #include "media/base/decoder_buffer.h" 11 #include "media/base/decoder_buffer.h"
11 #include "media/base/timestamp_constants.h" 12 #include "media/base/timestamp_constants.h"
12 #include "media/remoting/rpc/proto_enum_utils.h" 13 #include "media/remoting/rpc/proto_enum_utils.h"
13 #include "media/remoting/rpc/proto_utils.h" 14 #include "media/remoting/rpc/proto_utils.h"
14 15
15 // Convenience logging macro used throughout this file. 16 // Convenience logging macro used throughout this file.
16 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " 17 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: "
17 18
18 namespace media { 19 namespace media {
19 namespace remoting { 20 namespace remoting {
20 21
21 namespace {
22 constexpr char kErrorLostMojoConnection[] = "Mojo connection error";
23 constexpr char kErrorDataPipeWrite[] = "Mojo data pipe write error";
24 constexpr char kErrorDuplicateInitialize[] = "Duplicate attempt to initialize";
25 constexpr char kErrorMissingMessageFields[] = "Missing required message fields";
26 } // namepsace
27
28 // static 22 // static
29 mojo::DataPipe* CreateDataPipe() { 23 mojo::DataPipe* CreateDataPipe() {
30 // Capacity in bytes for Mojo data pipe. 24 // Capacity in bytes for Mojo data pipe.
31 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; 25 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024;
32 26
33 MojoCreateDataPipeOptions options; 27 MojoCreateDataPipeOptions options;
34 options.struct_size = sizeof(MojoCreateDataPipeOptions); 28 options.struct_size = sizeof(MojoCreateDataPipeOptions);
35 options.flags = MOJO_WRITE_DATA_FLAG_NONE; 29 options.flags = MOJO_WRITE_DATA_FLAG_NONE;
36 options.element_num_bytes = 1; 30 options.element_num_bytes = 1;
37 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; 31 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes;
38 return new mojo::DataPipe(options); 32 return new mojo::DataPipe(options);
39 } 33 }
40 34
41 RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( 35 RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter(
42 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, 36 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
43 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, 37 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner,
44 const std::string& name, 38 const std::string& name,
45 ::media::DemuxerStream* demuxer_stream, 39 ::media::DemuxerStream* demuxer_stream,
46 const base::WeakPtr<RpcBroker>& rpc_broker, 40 const base::WeakPtr<RpcBroker>& rpc_broker,
47 int rpc_handle, 41 int rpc_handle,
48 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, 42 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info,
49 mojo::ScopedDataPipeProducerHandle producer_handle) 43 mojo::ScopedDataPipeProducerHandle producer_handle,
44 const ErrorCallback& error_callback)
50 : main_task_runner_(std::move(main_task_runner)), 45 : main_task_runner_(std::move(main_task_runner)),
51 media_task_runner_(std::move(media_task_runner)), 46 media_task_runner_(std::move(media_task_runner)),
52 name_(name), 47 name_(name),
53 rpc_broker_(rpc_broker), 48 rpc_broker_(rpc_broker),
54 rpc_handle_(rpc_handle), 49 rpc_handle_(rpc_handle),
55 demuxer_stream_(demuxer_stream), 50 demuxer_stream_(demuxer_stream),
56 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), 51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN),
52 error_callback_(error_callback),
57 remote_callback_handle_(kInvalidHandle), 53 remote_callback_handle_(kInvalidHandle),
58 read_until_callback_handle_(kInvalidHandle), 54 read_until_callback_handle_(kInvalidHandle),
59 read_until_count_(0), 55 read_until_count_(0),
60 last_count_(0), 56 last_count_(0),
61 pending_flush_(false), 57 pending_flush_(false),
62 current_pending_frame_offset_(0), 58 current_pending_frame_offset_(0),
63 pending_frame_is_eos_(false), 59 pending_frame_is_eos_(false),
64 media_status_(::media::DemuxerStream::kOk), 60 media_status_(::media::DemuxerStream::kOk),
65 producer_handle_(std::move(producer_handle)), 61 producer_handle_(std::move(producer_handle)),
62 bytes_written_to_pipe_(0),
66 request_buffer_weak_factory_(this), 63 request_buffer_weak_factory_(this),
67 weak_factory_(this) { 64 weak_factory_(this) {
68 DCHECK(main_task_runner_); 65 DCHECK(main_task_runner_);
69 DCHECK(media_task_runner_); 66 DCHECK(media_task_runner_);
70 DCHECK(media_task_runner_->BelongsToCurrentThread()); 67 DCHECK(media_task_runner_->BelongsToCurrentThread());
71 DCHECK(demuxer_stream); 68 DCHECK(demuxer_stream);
69 DCHECK(!error_callback.is_null());
72 const RpcBroker::ReceiveMessageCallback receive_callback = 70 const RpcBroker::ReceiveMessageCallback receive_callback =
73 media::BindToCurrentLoop( 71 media::BindToCurrentLoop(
74 base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc, 72 base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc,
75 weak_factory_.GetWeakPtr())); 73 weak_factory_.GetWeakPtr()));
76 main_task_runner_->PostTask( 74 main_task_runner_->PostTask(
77 FROM_HERE, 75 FROM_HERE,
78 base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback, 76 base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback,
79 rpc_broker_, rpc_handle_, receive_callback)); 77 rpc_broker_, rpc_handle_, receive_callback));
80 78
81 stream_sender_.Bind(std::move(stream_sender_info)); 79 stream_sender_.Bind(std::move(stream_sender_info));
82 stream_sender_.set_connection_error_handler( 80 stream_sender_.set_connection_error_handler(
83 base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError, 81 base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError,
84 weak_factory_.GetWeakPtr(), kErrorLostMojoConnection)); 82 weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR));
85 } 83 }
86 84
87 RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { 85 RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() {
88 DCHECK(media_task_runner_->BelongsToCurrentThread()); 86 DCHECK(media_task_runner_->BelongsToCurrentThread());
89 main_task_runner_->PostTask( 87 main_task_runner_->PostTask(
90 FROM_HERE, 88 FROM_HERE,
91 base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback, 89 base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback,
92 rpc_broker_, rpc_handle_)); 90 rpc_broker_, rpc_handle_));
93 } 91 }
94 92
93 int64_t RemoteDemuxerStreamAdapter::GetBytesWrittenAndReset() {
94 DCHECK(media_task_runner_->BelongsToCurrentThread());
95 const int64_t current_count = bytes_written_to_pipe_;
96 bytes_written_to_pipe_ = 0;
97 return current_count;
98 }
99
95 base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush( 100 base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush(
96 bool flushing) { 101 bool flushing) {
97 DCHECK(media_task_runner_->BelongsToCurrentThread()); 102 DCHECK(media_task_runner_->BelongsToCurrentThread());
98 DEMUXER_VLOG(2) << "flushing=" << flushing; 103 DEMUXER_VLOG(2) << "flushing=" << flushing;
99 104
100 // Ignores if |pending_flush_| states is same. 105 // Ignores if |pending_flush_| states is same.
101 if (pending_flush_ == flushing) 106 if (pending_flush_ == flushing)
102 return base::nullopt; 107 return base::nullopt;
103 108
104 // Cleans up pending frame data. 109 // Cleans up pending frame data.
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
149 154
150 // Checks if initialization had been called or not. 155 // Checks if initialization had been called or not.
151 if (remote_callback_handle_ != kInvalidHandle) { 156 if (remote_callback_handle_ != kInvalidHandle) {
152 DEMUXER_VLOG(1) << "Duplicated initialization. Have: " 157 DEMUXER_VLOG(1) << "Duplicated initialization. Have: "
153 << remote_callback_handle_ 158 << remote_callback_handle_
154 << ", Given: " << remote_callback_handle; 159 << ", Given: " << remote_callback_handle;
155 // Shuts down data pipe if available if providing different remote callback 160 // Shuts down data pipe if available if providing different remote callback
156 // handle for initialization. Otherwise, just silently ignore the duplicated 161 // handle for initialization. Otherwise, just silently ignore the duplicated
157 // request. 162 // request.
158 if (remote_callback_handle_ != remote_callback_handle) { 163 if (remote_callback_handle_ != remote_callback_handle) {
159 OnFatalError(kErrorDuplicateInitialize); 164 OnFatalError(PEERS_OUT_OF_SYNC);
160 } 165 }
161 return; 166 return;
162 } 167 }
163 remote_callback_handle_ = remote_callback_handle; 168 remote_callback_handle_ = remote_callback_handle;
164 169
165 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. 170 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message.
166 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); 171 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage());
167 rpc->set_handle(remote_callback_handle_); 172 rpc->set_handle(remote_callback_handle_);
168 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); 173 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK);
169 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); 174 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc();
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
205 weak_factory_.GetWeakPtr())); 210 weak_factory_.GetWeakPtr()));
206 } 211 }
207 } 212 }
208 213
209 void RemoteDemuxerStreamAdapter::ReadUntil( 214 void RemoteDemuxerStreamAdapter::ReadUntil(
210 std::unique_ptr<remoting::pb::RpcMessage> message) { 215 std::unique_ptr<remoting::pb::RpcMessage> message) {
211 DCHECK(media_task_runner_->BelongsToCurrentThread()); 216 DCHECK(media_task_runner_->BelongsToCurrentThread());
212 DCHECK(message); 217 DCHECK(message);
213 if (!message->has_demuxerstream_readuntil_rpc()) { 218 if (!message->has_demuxerstream_readuntil_rpc()) {
214 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; 219 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC";
215 OnFatalError(kErrorMissingMessageFields); 220 OnFatalError(RPC_INVALID);
216 return; 221 return;
217 } 222 }
218 223
219 const pb::DemuxerStreamReadUntil& rpc_message = 224 const pb::DemuxerStreamReadUntil& rpc_message =
220 message->demuxerstream_readuntil_rpc(); 225 message->demuxerstream_readuntil_rpc();
221 DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle=" 226 DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle="
222 << rpc_message.callback_handle() 227 << rpc_message.callback_handle()
223 << ", count=" << rpc_message.count(); 228 << ", count=" << rpc_message.count();
224 229
225 if (pending_flush_) { 230 if (pending_flush_) {
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
298 } break; 303 } break;
299 } 304 }
300 } 305 }
301 306
302 void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { 307 void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) {
303 DCHECK(media_task_runner_->BelongsToCurrentThread()); 308 DCHECK(media_task_runner_->BelongsToCurrentThread());
304 // The Mojo watcher will also call TryWriteData() sometimes as a notification 309 // The Mojo watcher will also call TryWriteData() sometimes as a notification
305 // that data pipe is ready. But that does not necessarily mean the data for a 310 // that data pipe is ready. But that does not necessarily mean the data for a
306 // read request is ready to be written into the pipe. 311 // read request is ready to be written into the pipe.
307 if (!IsProcessingReadRequest() || pending_flush_) { 312 if (!IsProcessingReadRequest() || pending_flush_) {
308 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; 313 DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state";
309 return; 314 return;
310 } 315 }
311 316
312 if (pending_frame_.empty()) { 317 if (pending_frame_.empty()) {
313 DEMUXER_VLOG(3) << "No data available, waiting for demuxer"; 318 DEMUXER_VLOG(3) << "No data available, waiting for demuxer";
314 return; 319 return;
315 } 320 }
316 321
317 if (!stream_sender_ || !producer_handle_.is_valid()) { 322 if (!stream_sender_ || !producer_handle_.is_valid()) {
318 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid"; 323 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid";
319 return; 324 return;
320 } 325 }
321 326
322 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_; 327 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_;
323 MojoResult mojo_result = 328 MojoResult mojo_result =
324 WriteDataRaw(producer_handle_.get(), 329 WriteDataRaw(producer_handle_.get(),
325 pending_frame_.data() + current_pending_frame_offset_, 330 pending_frame_.data() + current_pending_frame_offset_,
326 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); 331 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
327 if (mojo_result != MOJO_RESULT_OK) { 332 if (mojo_result != MOJO_RESULT_OK) {
328 if (mojo_result != MOJO_RESULT_SHOULD_WAIT) { 333 if (mojo_result != MOJO_RESULT_SHOULD_WAIT) {
329 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" 334 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:"
330 << mojo_result; 335 << mojo_result;
331 OnFatalError(kErrorDataPipeWrite); 336 OnFatalError(MOJO_PIPE_ERROR);
332 } 337 }
333 return; 338 return;
334 } 339 }
335 340
336 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, 341 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes,
337 pending_frame_.size()); 342 pending_frame_.size());
338 current_pending_frame_offset_ += num_bytes; 343 current_pending_frame_offset_ += num_bytes;
344 bytes_written_to_pipe_ += num_bytes;
339 345
340 // Checks if all buffer was written to browser process. 346 // Checks if all buffer was written to browser process.
341 if (current_pending_frame_offset_ != pending_frame_.size()) { 347 if (current_pending_frame_offset_ != pending_frame_.size()) {
342 // Returns and wait for mojo watcher to notify to write more data. 348 // Returns and wait for mojo watcher to notify to write more data.
343 return; 349 return;
344 } 350 }
345 351
346 // Signal mojo remoting service that all frame buffer is written to data pipe. 352 // Signal mojo remoting service that all frame buffer is written to data pipe.
347 stream_sender_->SendFrame(); 353 stream_sender_->SendFrame();
348 354
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
414 video_config_ = ::media::VideoDecoderConfig(); 420 video_config_ = ::media::VideoDecoderConfig();
415 } 421 }
416 422
417 void RemoteDemuxerStreamAdapter::ResetPendingFrame() { 423 void RemoteDemuxerStreamAdapter::ResetPendingFrame() {
418 DCHECK(media_task_runner_->BelongsToCurrentThread()); 424 DCHECK(media_task_runner_->BelongsToCurrentThread());
419 current_pending_frame_offset_ = 0; 425 current_pending_frame_offset_ = 0;
420 pending_frame_.clear(); 426 pending_frame_.clear();
421 pending_frame_is_eos_ = false; 427 pending_frame_is_eos_ = false;
422 } 428 }
423 429
424 void RemoteDemuxerStreamAdapter::OnFatalError(const char* reason) { 430 void RemoteDemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) {
425 DCHECK(media_task_runner_->BelongsToCurrentThread()); 431 DCHECK(media_task_runner_->BelongsToCurrentThread());
426 432
427 DEMUXER_VLOG(1) << reason; 433 DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger;
428 434
429 // Resets mojo data pipe producer handle. 435 if (error_callback_.is_null())
430 producer_handle_.reset(); 436 return;
431
432 // Resetting |stream_sender_| will close Mojo message pipe, which will cause
433 // the entire remoting session to be shut down.
434 if (stream_sender_) {
435 DEMUXER_VLOG(2) << "Reset data stream sender";
436 stream_sender_.reset();
437 }
438 437
439 if (write_watcher_.IsWatching()) { 438 if (write_watcher_.IsWatching()) {
440 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; 439 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher";
441 write_watcher_.Cancel(); 440 write_watcher_.Cancel();
442 } 441 }
442
443 base::ResetAndReturn(&error_callback_).Run(stop_trigger);
443 } 444 }
444 445
445 } // namespace remoting 446 } // namespace remoting
446 } // namespace media 447 } // namespace media
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698