Index: media/remoting/remote_demuxer_stream_adapter.cc |
diff --git a/media/remoting/remote_demuxer_stream_adapter.cc b/media/remoting/remote_demuxer_stream_adapter.cc |
index d4d80a51d7a7d11e291e97f3dadcf697bc754243..9929c4a8af1adcb91e01b9902191760e94b918dc 100644 |
--- a/media/remoting/remote_demuxer_stream_adapter.cc |
+++ b/media/remoting/remote_demuxer_stream_adapter.cc |
@@ -6,6 +6,7 @@ |
#include "base/base64.h" |
#include "base/bind.h" |
+#include "base/callback_helpers.h" |
#include "media/base/bind_to_current_loop.h" |
#include "media/base/decoder_buffer.h" |
#include "media/base/timestamp_constants.h" |
@@ -18,13 +19,6 @@ |
namespace media { |
namespace remoting { |
-namespace { |
-constexpr char kErrorLostMojoConnection[] = "Mojo connection error"; |
-constexpr char kErrorDataPipeWrite[] = "Mojo data pipe write error"; |
-constexpr char kErrorDuplicateInitialize[] = "Duplicate attempt to initialize"; |
-constexpr char kErrorMissingMessageFields[] = "Missing required message fields"; |
-} // namepsace |
- |
// static |
mojo::DataPipe* CreateDataPipe() { |
// Capacity in bytes for Mojo data pipe. |
@@ -46,7 +40,8 @@ RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( |
const base::WeakPtr<RpcBroker>& rpc_broker, |
int rpc_handle, |
mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, |
- mojo::ScopedDataPipeProducerHandle producer_handle) |
+ mojo::ScopedDataPipeProducerHandle producer_handle, |
+ const ErrorCallback& error_callback) |
: main_task_runner_(std::move(main_task_runner)), |
media_task_runner_(std::move(media_task_runner)), |
name_(name), |
@@ -54,6 +49,7 @@ RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( |
rpc_handle_(rpc_handle), |
demuxer_stream_(demuxer_stream), |
type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), |
+ error_callback_(error_callback), |
remote_callback_handle_(kInvalidHandle), |
read_until_callback_handle_(kInvalidHandle), |
read_until_count_(0), |
@@ -63,12 +59,14 @@ RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( |
pending_frame_is_eos_(false), |
media_status_(::media::DemuxerStream::kOk), |
producer_handle_(std::move(producer_handle)), |
+ bytes_written_to_pipe_(0), |
request_buffer_weak_factory_(this), |
weak_factory_(this) { |
DCHECK(main_task_runner_); |
DCHECK(media_task_runner_); |
DCHECK(media_task_runner_->BelongsToCurrentThread()); |
DCHECK(demuxer_stream); |
+ DCHECK(!error_callback.is_null()); |
const RpcBroker::ReceiveMessageCallback receive_callback = |
media::BindToCurrentLoop( |
base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc, |
@@ -81,7 +79,7 @@ RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( |
stream_sender_.Bind(std::move(stream_sender_info)); |
stream_sender_.set_connection_error_handler( |
base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError, |
- weak_factory_.GetWeakPtr(), kErrorLostMojoConnection)); |
+ weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR)); |
} |
RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { |
@@ -92,6 +90,13 @@ RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { |
rpc_broker_, rpc_handle_)); |
} |
+int64_t RemoteDemuxerStreamAdapter::GetBytesWrittenAndReset() { |
+ DCHECK(media_task_runner_->BelongsToCurrentThread()); |
+ const int64_t current_count = bytes_written_to_pipe_; |
+ bytes_written_to_pipe_ = 0; |
+ return current_count; |
+} |
+ |
base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush( |
bool flushing) { |
DCHECK(media_task_runner_->BelongsToCurrentThread()); |
@@ -156,7 +161,7 @@ void RemoteDemuxerStreamAdapter::Initialize(int remote_callback_handle) { |
// handle for initialization. Otherwise, just silently ignore the duplicated |
// request. |
if (remote_callback_handle_ != remote_callback_handle) { |
- OnFatalError(kErrorDuplicateInitialize); |
+ OnFatalError(PEERS_OUT_OF_SYNC); |
} |
return; |
} |
@@ -212,7 +217,7 @@ void RemoteDemuxerStreamAdapter::ReadUntil( |
DCHECK(message); |
if (!message->has_demuxerstream_readuntil_rpc()) { |
DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; |
- OnFatalError(kErrorMissingMessageFields); |
+ OnFatalError(RPC_INVALID); |
return; |
} |
@@ -305,7 +310,7 @@ void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { |
// that data pipe is ready. But that does not necessarily mean the data for a |
// read request is ready to be written into the pipe. |
if (!IsProcessingReadRequest() || pending_flush_) { |
- DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; |
+ DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state"; |
return; |
} |
@@ -328,7 +333,7 @@ void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { |
if (mojo_result != MOJO_RESULT_SHOULD_WAIT) { |
DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" |
<< mojo_result; |
- OnFatalError(kErrorDataPipeWrite); |
+ OnFatalError(MOJO_PIPE_ERROR); |
} |
return; |
} |
@@ -336,6 +341,7 @@ void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { |
stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, |
pending_frame_.size()); |
current_pending_frame_offset_ += num_bytes; |
+ bytes_written_to_pipe_ += num_bytes; |
// Checks if all buffer was written to browser process. |
if (current_pending_frame_offset_ != pending_frame_.size()) { |
@@ -421,25 +427,20 @@ void RemoteDemuxerStreamAdapter::ResetPendingFrame() { |
pending_frame_is_eos_ = false; |
} |
-void RemoteDemuxerStreamAdapter::OnFatalError(const char* reason) { |
+void RemoteDemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) { |
DCHECK(media_task_runner_->BelongsToCurrentThread()); |
- DEMUXER_VLOG(1) << reason; |
+ DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger; |
- // Resets mojo data pipe producer handle. |
- producer_handle_.reset(); |
- |
- // Resetting |stream_sender_| will close Mojo message pipe, which will cause |
- // the entire remoting session to be shut down. |
- if (stream_sender_) { |
- DEMUXER_VLOG(2) << "Reset data stream sender"; |
- stream_sender_.reset(); |
- } |
+ if (error_callback_.is_null()) |
+ return; |
if (write_watcher_.IsWatching()) { |
DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; |
write_watcher_.Cancel(); |
} |
+ |
+ base::ResetAndReturn(&error_callback_).Run(stop_trigger); |
} |
} // namespace remoting |