OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "media/remoting/demuxer_stream_adapter.h" | 5 #include "media/remoting/demuxer_stream_adapter.h" |
6 | 6 |
7 #include "base/base64.h" | 7 #include "base/base64.h" |
8 #include "base/bind.h" | 8 #include "base/bind.h" |
9 #include "base/callback_helpers.h" | 9 #include "base/callback_helpers.h" |
10 #include "media/base/bind_to_current_loop.h" | 10 #include "media/base/bind_to_current_loop.h" |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
50 demuxer_stream_(demuxer_stream), | 50 demuxer_stream_(demuxer_stream), |
51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), | 51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), |
52 error_callback_(error_callback), | 52 error_callback_(error_callback), |
53 remote_callback_handle_(RpcBroker::kInvalidHandle), | 53 remote_callback_handle_(RpcBroker::kInvalidHandle), |
54 read_until_callback_handle_(RpcBroker::kInvalidHandle), | 54 read_until_callback_handle_(RpcBroker::kInvalidHandle), |
55 read_until_count_(0), | 55 read_until_count_(0), |
56 last_count_(0), | 56 last_count_(0), |
57 pending_flush_(false), | 57 pending_flush_(false), |
58 current_pending_frame_offset_(0), | 58 current_pending_frame_offset_(0), |
59 pending_frame_is_eos_(false), | 59 pending_frame_is_eos_(false), |
60 write_watcher_(FROM_HERE), | 60 write_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL), |
61 media_status_(DemuxerStream::kOk), | 61 media_status_(DemuxerStream::kOk), |
62 producer_handle_(std::move(producer_handle)), | 62 producer_handle_(std::move(producer_handle)), |
63 bytes_written_to_pipe_(0), | 63 bytes_written_to_pipe_(0), |
64 request_buffer_weak_factory_(this), | 64 request_buffer_weak_factory_(this), |
65 weak_factory_(this) { | 65 weak_factory_(this) { |
66 DCHECK(main_task_runner_); | 66 DCHECK(main_task_runner_); |
67 DCHECK(media_task_runner_); | 67 DCHECK(media_task_runner_); |
68 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 68 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
69 DCHECK(demuxer_stream); | 69 DCHECK(demuxer_stream); |
70 DCHECK(!error_callback.is_null()); | 70 DCHECK(!error_callback.is_null()); |
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
195 ? audio_config_.AsHumanReadableString() | 195 ? audio_config_.AsHumanReadableString() |
196 : video_config_.AsHumanReadableString()) | 196 : video_config_.AsHumanReadableString()) |
197 << '}'; | 197 << '}'; |
198 main_task_runner_->PostTask( | 198 main_task_runner_->PostTask( |
199 FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_, | 199 FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_, |
200 base::Passed(&rpc))); | 200 base::Passed(&rpc))); |
201 | 201 |
202 // Starts Mojo watcher. | 202 // Starts Mojo watcher. |
203 if (!write_watcher_.IsWatching()) { | 203 if (!write_watcher_.IsWatching()) { |
204 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher"; | 204 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher"; |
205 write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, | 205 write_watcher_.Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, |
206 base::Bind(&DemuxerStreamAdapter::TryWriteData, | 206 base::Bind(&DemuxerStreamAdapter::TryWriteData, |
207 weak_factory_.GetWeakPtr())); | 207 weak_factory_.GetWeakPtr())); |
| 208 write_watcher_.ArmOrNotify(); |
208 } | 209 } |
209 } | 210 } |
210 | 211 |
211 void DemuxerStreamAdapter::ReadUntil(std::unique_ptr<pb::RpcMessage> message) { | 212 void DemuxerStreamAdapter::ReadUntil(std::unique_ptr<pb::RpcMessage> message) { |
212 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 213 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
213 DCHECK(message); | 214 DCHECK(message); |
214 if (!message->has_demuxerstream_readuntil_rpc()) { | 215 if (!message->has_demuxerstream_readuntil_rpc()) { |
215 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; | 216 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; |
216 OnFatalError(RPC_INVALID); | 217 OnFatalError(RPC_INVALID); |
217 return; | 218 return; |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
319 if (!stream_sender_ || !producer_handle_.is_valid()) { | 320 if (!stream_sender_ || !producer_handle_.is_valid()) { |
320 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid"; | 321 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid"; |
321 return; | 322 return; |
322 } | 323 } |
323 | 324 |
324 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_; | 325 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_; |
325 MojoResult mojo_result = | 326 MojoResult mojo_result = |
326 WriteDataRaw(producer_handle_.get(), | 327 WriteDataRaw(producer_handle_.get(), |
327 pending_frame_.data() + current_pending_frame_offset_, | 328 pending_frame_.data() + current_pending_frame_offset_, |
328 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); | 329 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); |
329 if (mojo_result != MOJO_RESULT_OK) { | 330 if (mojo_result != MOJO_RESULT_OK && mojo_result != MOJO_RESULT_SHOULD_WAIT) { |
330 if (mojo_result != MOJO_RESULT_SHOULD_WAIT) { | 331 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" |
331 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" | 332 << mojo_result; |
332 << mojo_result; | 333 OnFatalError(MOJO_PIPE_ERROR); |
333 OnFatalError(MOJO_PIPE_ERROR); | |
334 } | |
335 return; | 334 return; |
336 } | 335 } |
337 | 336 |
| 337 write_watcher_.ArmOrNotify(); |
| 338 if (mojo_result != MOJO_RESULT_OK) |
| 339 return; |
| 340 |
338 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, | 341 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, |
339 pending_frame_.size()); | 342 pending_frame_.size()); |
340 current_pending_frame_offset_ += num_bytes; | 343 current_pending_frame_offset_ += num_bytes; |
341 bytes_written_to_pipe_ += num_bytes; | 344 bytes_written_to_pipe_ += num_bytes; |
342 | 345 |
343 // Checks if all buffer was written to browser process. | 346 // Checks if all buffer was written to browser process. |
344 if (current_pending_frame_offset_ != pending_frame_.size()) { | 347 if (current_pending_frame_offset_ != pending_frame_.size()) { |
345 // Returns and wait for mojo watcher to notify to write more data. | 348 // Returns and wait for mojo watcher to notify to write more data. |
346 return; | 349 return; |
347 } | 350 } |
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
434 if (write_watcher_.IsWatching()) { | 437 if (write_watcher_.IsWatching()) { |
435 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; | 438 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; |
436 write_watcher_.Cancel(); | 439 write_watcher_.Cancel(); |
437 } | 440 } |
438 | 441 |
439 base::ResetAndReturn(&error_callback_).Run(stop_trigger); | 442 base::ResetAndReturn(&error_callback_).Run(stop_trigger); |
440 } | 443 } |
441 | 444 |
442 } // namespace remoting | 445 } // namespace remoting |
443 } // namespace media | 446 } // namespace media |
OLD | NEW |