OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "media/remoting/remote_demuxer_stream_adapter.h" | 5 #include "media/remoting/demuxer_stream_adapter.h" |
6 | 6 |
7 #include "base/base64.h" | 7 #include "base/base64.h" |
8 #include "base/bind.h" | 8 #include "base/bind.h" |
9 #include "base/callback_helpers.h" | 9 #include "base/callback_helpers.h" |
10 #include "media/base/bind_to_current_loop.h" | 10 #include "media/base/bind_to_current_loop.h" |
11 #include "media/base/decoder_buffer.h" | 11 #include "media/base/decoder_buffer.h" |
12 #include "media/base/timestamp_constants.h" | 12 #include "media/base/timestamp_constants.h" |
13 #include "media/remoting/rpc/proto_enum_utils.h" | 13 #include "media/remoting/proto_enum_utils.h" |
14 #include "media/remoting/rpc/proto_utils.h" | 14 #include "media/remoting/proto_utils.h" |
15 | 15 |
16 // Convenience logging macro used throughout this file. | 16 // Convenience logging macro used throughout this file. |
17 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " | 17 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " |
18 | 18 |
19 namespace media { | 19 namespace media { |
20 namespace remoting { | 20 namespace remoting { |
21 | 21 |
22 // static | 22 // static |
23 mojo::DataPipe* CreateDataPipe() { | 23 mojo::DataPipe* DemuxerStreamAdapter::CreateDataPipe() { |
24 // Capacity in bytes for Mojo data pipe. | 24 // Capacity in bytes for Mojo data pipe. |
25 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; | 25 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; |
26 | 26 |
27 MojoCreateDataPipeOptions options; | 27 MojoCreateDataPipeOptions options; |
28 options.struct_size = sizeof(MojoCreateDataPipeOptions); | 28 options.struct_size = sizeof(MojoCreateDataPipeOptions); |
29 options.flags = MOJO_WRITE_DATA_FLAG_NONE; | 29 options.flags = MOJO_WRITE_DATA_FLAG_NONE; |
30 options.element_num_bytes = 1; | 30 options.element_num_bytes = 1; |
31 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; | 31 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; |
32 return new mojo::DataPipe(options); | 32 return new mojo::DataPipe(options); |
33 } | 33 } |
34 | 34 |
35 RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( | 35 DemuxerStreamAdapter::DemuxerStreamAdapter( |
36 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, | 36 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, |
37 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, | 37 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, |
38 const std::string& name, | 38 const std::string& name, |
39 ::media::DemuxerStream* demuxer_stream, | 39 DemuxerStream* demuxer_stream, |
40 const base::WeakPtr<RpcBroker>& rpc_broker, | 40 const base::WeakPtr<RpcBroker>& rpc_broker, |
41 int rpc_handle, | 41 int rpc_handle, |
42 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, | 42 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, |
43 mojo::ScopedDataPipeProducerHandle producer_handle, | 43 mojo::ScopedDataPipeProducerHandle producer_handle, |
44 const ErrorCallback& error_callback) | 44 const ErrorCallback& error_callback) |
45 : main_task_runner_(std::move(main_task_runner)), | 45 : main_task_runner_(std::move(main_task_runner)), |
46 media_task_runner_(std::move(media_task_runner)), | 46 media_task_runner_(std::move(media_task_runner)), |
47 name_(name), | 47 name_(name), |
48 rpc_broker_(rpc_broker), | 48 rpc_broker_(rpc_broker), |
49 rpc_handle_(rpc_handle), | 49 rpc_handle_(rpc_handle), |
50 demuxer_stream_(demuxer_stream), | 50 demuxer_stream_(demuxer_stream), |
51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), | 51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), |
52 error_callback_(error_callback), | 52 error_callback_(error_callback), |
53 remote_callback_handle_(kInvalidHandle), | 53 remote_callback_handle_(RpcBroker::kInvalidHandle), |
54 read_until_callback_handle_(kInvalidHandle), | 54 read_until_callback_handle_(RpcBroker::kInvalidHandle), |
55 read_until_count_(0), | 55 read_until_count_(0), |
56 last_count_(0), | 56 last_count_(0), |
57 pending_flush_(false), | 57 pending_flush_(false), |
58 current_pending_frame_offset_(0), | 58 current_pending_frame_offset_(0), |
59 pending_frame_is_eos_(false), | 59 pending_frame_is_eos_(false), |
60 media_status_(::media::DemuxerStream::kOk), | 60 media_status_(DemuxerStream::kOk), |
61 producer_handle_(std::move(producer_handle)), | 61 producer_handle_(std::move(producer_handle)), |
62 bytes_written_to_pipe_(0), | 62 bytes_written_to_pipe_(0), |
63 request_buffer_weak_factory_(this), | 63 request_buffer_weak_factory_(this), |
64 weak_factory_(this) { | 64 weak_factory_(this) { |
65 DCHECK(main_task_runner_); | 65 DCHECK(main_task_runner_); |
66 DCHECK(media_task_runner_); | 66 DCHECK(media_task_runner_); |
67 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 67 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
68 DCHECK(demuxer_stream); | 68 DCHECK(demuxer_stream); |
69 DCHECK(!error_callback.is_null()); | 69 DCHECK(!error_callback.is_null()); |
70 const RpcBroker::ReceiveMessageCallback receive_callback = | 70 const RpcBroker::ReceiveMessageCallback receive_callback = |
71 media::BindToCurrentLoop( | 71 BindToCurrentLoop(base::Bind(&DemuxerStreamAdapter::OnReceivedRpc, |
72 base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc, | 72 weak_factory_.GetWeakPtr())); |
73 weak_factory_.GetWeakPtr())); | |
74 main_task_runner_->PostTask( | 73 main_task_runner_->PostTask( |
75 FROM_HERE, | 74 FROM_HERE, base::Bind(&RpcBroker::RegisterMessageReceiverCallback, |
76 base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback, | 75 rpc_broker_, rpc_handle_, receive_callback)); |
77 rpc_broker_, rpc_handle_, receive_callback)); | |
78 | 76 |
79 stream_sender_.Bind(std::move(stream_sender_info)); | 77 stream_sender_.Bind(std::move(stream_sender_info)); |
80 stream_sender_.set_connection_error_handler( | 78 stream_sender_.set_connection_error_handler( |
81 base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError, | 79 base::Bind(&DemuxerStreamAdapter::OnFatalError, |
82 weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR)); | 80 weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR)); |
83 } | 81 } |
84 | 82 |
85 RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { | 83 DemuxerStreamAdapter::~DemuxerStreamAdapter() { |
86 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 84 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
87 main_task_runner_->PostTask( | 85 main_task_runner_->PostTask( |
88 FROM_HERE, | 86 FROM_HERE, base::Bind(&RpcBroker::UnregisterMessageReceiverCallback, |
89 base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback, | 87 rpc_broker_, rpc_handle_)); |
90 rpc_broker_, rpc_handle_)); | |
91 } | 88 } |
92 | 89 |
93 int64_t RemoteDemuxerStreamAdapter::GetBytesWrittenAndReset() { | 90 int64_t DemuxerStreamAdapter::GetBytesWrittenAndReset() { |
94 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 91 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
95 const int64_t current_count = bytes_written_to_pipe_; | 92 const int64_t current_count = bytes_written_to_pipe_; |
96 bytes_written_to_pipe_ = 0; | 93 bytes_written_to_pipe_ = 0; |
97 return current_count; | 94 return current_count; |
98 } | 95 } |
99 | 96 |
100 base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush( | 97 base::Optional<uint32_t> DemuxerStreamAdapter::SignalFlush(bool flushing) { |
101 bool flushing) { | |
102 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 98 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
103 DEMUXER_VLOG(2) << "flushing=" << flushing; | 99 DEMUXER_VLOG(2) << "flushing=" << flushing; |
104 | 100 |
105 // Ignores if |pending_flush_| states is same. | 101 // Ignores if |pending_flush_| states is same. |
106 if (pending_flush_ == flushing) | 102 if (pending_flush_ == flushing) |
107 return base::nullopt; | 103 return base::nullopt; |
108 | 104 |
109 // Cleans up pending frame data. | 105 // Cleans up pending frame data. |
110 pending_frame_.clear(); | 106 pending_frame_.clear(); |
111 current_pending_frame_offset_ = 0; | 107 current_pending_frame_offset_ = 0; |
112 pending_frame_is_eos_ = false; | 108 pending_frame_is_eos_ = false; |
113 // Invalidates pending Read() tasks. | 109 // Invalidates pending Read() tasks. |
114 request_buffer_weak_factory_.InvalidateWeakPtrs(); | 110 request_buffer_weak_factory_.InvalidateWeakPtrs(); |
115 | 111 |
116 // Cancels in flight data in browser process. | 112 // Cancels in flight data in browser process. |
117 pending_flush_ = flushing; | 113 pending_flush_ = flushing; |
118 if (flushing) { | 114 if (flushing) { |
119 stream_sender_->CancelInFlightData(); | 115 stream_sender_->CancelInFlightData(); |
120 } else { | 116 } else { |
121 // Sets callback handle invalid to abort ongoing read request. | 117 // Sets callback handle invalid to abort ongoing read request. |
122 read_until_callback_handle_ = kInvalidHandle; | 118 read_until_callback_handle_ = RpcBroker::kInvalidHandle; |
123 } | 119 } |
124 return last_count_; | 120 return last_count_; |
125 } | 121 } |
126 | 122 |
127 void RemoteDemuxerStreamAdapter::OnReceivedRpc( | 123 void DemuxerStreamAdapter::OnReceivedRpc( |
128 std::unique_ptr<remoting::pb::RpcMessage> message) { | 124 std::unique_ptr<pb::RpcMessage> message) { |
129 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 125 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
130 DCHECK(message); | 126 DCHECK(message); |
131 DCHECK(rpc_handle_ == message->handle()); | 127 DCHECK(rpc_handle_ == message->handle()); |
132 | 128 |
133 switch (message->proc()) { | 129 switch (message->proc()) { |
134 case remoting::pb::RpcMessage::RPC_DS_INITIALIZE: | 130 case pb::RpcMessage::RPC_DS_INITIALIZE: |
135 Initialize(message->integer_value()); | 131 Initialize(message->integer_value()); |
136 break; | 132 break; |
137 case remoting::pb::RpcMessage::RPC_DS_READUNTIL: | 133 case pb::RpcMessage::RPC_DS_READUNTIL: |
138 ReadUntil(std::move(message)); | 134 ReadUntil(std::move(message)); |
139 break; | 135 break; |
140 case remoting::pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER: | 136 case pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER: |
141 EnableBitstreamConverter(); | 137 EnableBitstreamConverter(); |
142 break; | 138 break; |
143 | 139 |
144 default: | 140 default: |
145 DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc(); | 141 DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc(); |
146 } | 142 } |
147 } | 143 } |
148 | 144 |
149 void RemoteDemuxerStreamAdapter::Initialize(int remote_callback_handle) { | 145 void DemuxerStreamAdapter::Initialize(int remote_callback_handle) { |
150 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 146 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
151 DCHECK(!pending_flush_); | 147 DCHECK(!pending_flush_); |
152 DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle=" | 148 DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle=" |
153 << remote_callback_handle; | 149 << remote_callback_handle; |
154 | 150 |
155 // Checks if initialization had been called or not. | 151 // Checks if initialization had been called or not. |
156 if (remote_callback_handle_ != kInvalidHandle) { | 152 if (remote_callback_handle_ != RpcBroker::kInvalidHandle) { |
157 DEMUXER_VLOG(1) << "Duplicated initialization. Have: " | 153 DEMUXER_VLOG(1) << "Duplicated initialization. Have: " |
158 << remote_callback_handle_ | 154 << remote_callback_handle_ |
159 << ", Given: " << remote_callback_handle; | 155 << ", Given: " << remote_callback_handle; |
160 // Shuts down data pipe if available if providing different remote callback | 156 // Shuts down data pipe if available if providing different remote callback |
161 // handle for initialization. Otherwise, just silently ignore the duplicated | 157 // handle for initialization. Otherwise, just silently ignore the duplicated |
162 // request. | 158 // request. |
163 if (remote_callback_handle_ != remote_callback_handle) { | 159 if (remote_callback_handle_ != remote_callback_handle) { |
164 OnFatalError(PEERS_OUT_OF_SYNC); | 160 OnFatalError(PEERS_OUT_OF_SYNC); |
165 } | 161 } |
166 return; | 162 return; |
167 } | 163 } |
168 remote_callback_handle_ = remote_callback_handle; | 164 remote_callback_handle_ = remote_callback_handle; |
169 | 165 |
170 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. | 166 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. |
171 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); | 167 std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); |
172 rpc->set_handle(remote_callback_handle_); | 168 rpc->set_handle(remote_callback_handle_); |
173 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); | 169 rpc->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); |
174 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); | 170 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); |
175 init_cb_message->set_type(type_); | 171 init_cb_message->set_type(type_); |
176 switch (type_) { | 172 switch (type_) { |
177 case ::media::DemuxerStream::Type::AUDIO: { | 173 case DemuxerStream::Type::AUDIO: { |
178 audio_config_ = demuxer_stream_->audio_decoder_config(); | 174 audio_config_ = demuxer_stream_->audio_decoder_config(); |
179 pb::AudioDecoderConfig* audio_message = | 175 pb::AudioDecoderConfig* audio_message = |
180 init_cb_message->mutable_audio_decoder_config(); | 176 init_cb_message->mutable_audio_decoder_config(); |
181 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); | 177 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); |
182 break; | 178 break; |
183 } | 179 } |
184 case ::media::DemuxerStream::Type::VIDEO: { | 180 case DemuxerStream::Type::VIDEO: { |
185 video_config_ = demuxer_stream_->video_decoder_config(); | 181 video_config_ = demuxer_stream_->video_decoder_config(); |
186 pb::VideoDecoderConfig* video_message = | 182 pb::VideoDecoderConfig* video_message = |
187 init_cb_message->mutable_video_decoder_config(); | 183 init_cb_message->mutable_video_decoder_config(); |
188 ConvertVideoDecoderConfigToProto(video_config_, video_message); | 184 ConvertVideoDecoderConfigToProto(video_config_, video_message); |
189 break; | 185 break; |
190 } | 186 } |
191 default: | 187 default: |
192 NOTREACHED(); | 188 NOTREACHED(); |
193 } | 189 } |
194 | 190 |
195 DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle() | 191 DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle() |
196 << " with decoder_config={" | 192 << " with decoder_config={" |
197 << (type_ == ::media::DemuxerStream::Type::AUDIO | 193 << (type_ == DemuxerStream::Type::AUDIO |
198 ? audio_config_.AsHumanReadableString() | 194 ? audio_config_.AsHumanReadableString() |
199 : video_config_.AsHumanReadableString()) | 195 : video_config_.AsHumanReadableString()) |
200 << '}'; | 196 << '}'; |
201 main_task_runner_->PostTask( | 197 main_task_runner_->PostTask( |
202 FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote, | 198 FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_, |
203 rpc_broker_, base::Passed(&rpc))); | 199 base::Passed(&rpc))); |
204 | 200 |
205 // Starts Mojo watcher. | 201 // Starts Mojo watcher. |
206 if (!write_watcher_.IsWatching()) { | 202 if (!write_watcher_.IsWatching()) { |
207 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher"; | 203 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher"; |
208 write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, | 204 write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, |
209 base::Bind(&RemoteDemuxerStreamAdapter::TryWriteData, | 205 base::Bind(&DemuxerStreamAdapter::TryWriteData, |
210 weak_factory_.GetWeakPtr())); | 206 weak_factory_.GetWeakPtr())); |
211 } | 207 } |
212 } | 208 } |
213 | 209 |
214 void RemoteDemuxerStreamAdapter::ReadUntil( | 210 void DemuxerStreamAdapter::ReadUntil(std::unique_ptr<pb::RpcMessage> message) { |
215 std::unique_ptr<remoting::pb::RpcMessage> message) { | |
216 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 211 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
217 DCHECK(message); | 212 DCHECK(message); |
218 if (!message->has_demuxerstream_readuntil_rpc()) { | 213 if (!message->has_demuxerstream_readuntil_rpc()) { |
219 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; | 214 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; |
220 OnFatalError(RPC_INVALID); | 215 OnFatalError(RPC_INVALID); |
221 return; | 216 return; |
222 } | 217 } |
223 | 218 |
224 const pb::DemuxerStreamReadUntil& rpc_message = | 219 const pb::DemuxerStreamReadUntil& rpc_message = |
225 message->demuxerstream_readuntil_rpc(); | 220 message->demuxerstream_readuntil_rpc(); |
(...skipping 15 matching lines...) Expand all Loading... |
241 DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to " | 236 DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to " |
242 "current frame count"; | 237 "current frame count"; |
243 return; | 238 return; |
244 } | 239 } |
245 | 240 |
246 read_until_count_ = rpc_message.count(); | 241 read_until_count_ = rpc_message.count(); |
247 read_until_callback_handle_ = rpc_message.callback_handle(); | 242 read_until_callback_handle_ = rpc_message.callback_handle(); |
248 RequestBuffer(); | 243 RequestBuffer(); |
249 } | 244 } |
250 | 245 |
251 void RemoteDemuxerStreamAdapter::EnableBitstreamConverter() { | 246 void DemuxerStreamAdapter::EnableBitstreamConverter() { |
252 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 247 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
253 DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER"; | 248 DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER"; |
254 demuxer_stream_->EnableBitstreamConverter(); | 249 demuxer_stream_->EnableBitstreamConverter(); |
255 } | 250 } |
256 | 251 |
257 void RemoteDemuxerStreamAdapter::RequestBuffer() { | 252 void DemuxerStreamAdapter::RequestBuffer() { |
258 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 253 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
259 if (!IsProcessingReadRequest() || pending_flush_) { | 254 if (!IsProcessingReadRequest() || pending_flush_) { |
260 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; | 255 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; |
261 return; | 256 return; |
262 } | 257 } |
263 demuxer_stream_->Read(base::Bind(&RemoteDemuxerStreamAdapter::OnNewBuffer, | 258 demuxer_stream_->Read(base::Bind(&DemuxerStreamAdapter::OnNewBuffer, |
264 request_buffer_weak_factory_.GetWeakPtr())); | 259 request_buffer_weak_factory_.GetWeakPtr())); |
265 } | 260 } |
266 | 261 |
267 void RemoteDemuxerStreamAdapter::OnNewBuffer( | 262 void DemuxerStreamAdapter::OnNewBuffer( |
268 ::media::DemuxerStream::Status status, | 263 DemuxerStream::Status status, |
269 const scoped_refptr<::media::DecoderBuffer>& input) { | 264 const scoped_refptr<DecoderBuffer>& input) { |
270 DEMUXER_VLOG(3) << "status=" << status; | 265 DEMUXER_VLOG(3) << "status=" << status; |
271 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 266 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
272 if (!IsProcessingReadRequest() || pending_flush_) { | 267 if (!IsProcessingReadRequest() || pending_flush_) { |
273 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; | 268 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; |
274 return; | 269 return; |
275 } | 270 } |
276 | 271 |
277 switch (status) { | 272 switch (status) { |
278 case ::media::DemuxerStream::kAborted: | 273 case DemuxerStream::kAborted: |
279 DCHECK(!input); | 274 DCHECK(!input); |
280 SendReadAck(); | 275 SendReadAck(); |
281 return; | 276 return; |
282 case ::media::DemuxerStream::kConfigChanged: | 277 case DemuxerStream::kConfigChanged: |
283 // TODO(erickung): consider sending updated Audio/Video decoder config to | |
284 // RemotingRendererController. | |
285 // Stores available audio/video decoder config and issues | 278 // Stores available audio/video decoder config and issues |
286 // RPC_DS_READUNTIL_CALLBACK RPC to notify receiver. | 279 // RPC_DS_READUNTIL_CALLBACK RPC to notify receiver. |
287 DCHECK(!input); | 280 DCHECK(!input); |
288 media_status_ = status; | 281 media_status_ = status; |
289 if (demuxer_stream_->type() == ::media::DemuxerStream::VIDEO) | 282 if (demuxer_stream_->type() == DemuxerStream::VIDEO) |
290 video_config_ = demuxer_stream_->video_decoder_config(); | 283 video_config_ = demuxer_stream_->video_decoder_config(); |
291 if (demuxer_stream_->type() == ::media::DemuxerStream::AUDIO) | 284 if (demuxer_stream_->type() == DemuxerStream::AUDIO) |
292 audio_config_ = demuxer_stream_->audio_decoder_config(); | 285 audio_config_ = demuxer_stream_->audio_decoder_config(); |
293 SendReadAck(); | 286 SendReadAck(); |
294 return; | 287 return; |
295 case ::media::DemuxerStream::kOk: { | 288 case DemuxerStream::kOk: { |
296 media_status_ = status; | 289 media_status_ = status; |
297 DCHECK(pending_frame_.empty()); | 290 DCHECK(pending_frame_.empty()); |
298 if (!producer_handle_.is_valid()) | 291 if (!producer_handle_.is_valid()) |
299 return; // Do not start sending (due to previous fatal error). | 292 return; // Do not start sending (due to previous fatal error). |
300 pending_frame_ = DecoderBufferToByteArray(input); | 293 pending_frame_ = DecoderBufferToByteArray(*input); |
301 pending_frame_is_eos_ = input->end_of_stream(); | 294 pending_frame_is_eos_ = input->end_of_stream(); |
302 TryWriteData(MOJO_RESULT_OK); | 295 TryWriteData(MOJO_RESULT_OK); |
303 } break; | 296 } break; |
304 } | 297 } |
305 } | 298 } |
306 | 299 |
307 void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { | 300 void DemuxerStreamAdapter::TryWriteData(MojoResult result) { |
308 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 301 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
309 // The Mojo watcher will also call TryWriteData() sometimes as a notification | 302 // The Mojo watcher will also call TryWriteData() sometimes as a notification |
310 // that data pipe is ready. But that does not necessarily mean the data for a | 303 // that data pipe is ready. But that does not necessarily mean the data for a |
311 // read request is ready to be written into the pipe. | 304 // read request is ready to be written into the pipe. |
312 if (!IsProcessingReadRequest() || pending_flush_) { | 305 if (!IsProcessingReadRequest() || pending_flush_) { |
313 DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state"; | 306 DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state"; |
314 return; | 307 return; |
315 } | 308 } |
316 | 309 |
317 if (pending_frame_.empty()) { | 310 if (pending_frame_.empty()) { |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
358 ResetPendingFrame(); | 351 ResetPendingFrame(); |
359 | 352 |
360 // Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message. | 353 // Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message. |
361 if (read_until_count_ == last_count_ || pending_frame_is_eos) { | 354 if (read_until_count_ == last_count_ || pending_frame_is_eos) { |
362 SendReadAck(); | 355 SendReadAck(); |
363 return; | 356 return; |
364 } | 357 } |
365 | 358 |
366 // Contiune to read decoder buffer until reaching |read_until_count_| or | 359 // Contiune to read decoder buffer until reaching |read_until_count_| or |
367 // end of stream. | 360 // end of stream. |
368 media_task_runner_->PostTask( | 361 media_task_runner_->PostTask(FROM_HERE, |
369 FROM_HERE, base::Bind(&RemoteDemuxerStreamAdapter::RequestBuffer, | 362 base::Bind(&DemuxerStreamAdapter::RequestBuffer, |
370 weak_factory_.GetWeakPtr())); | 363 weak_factory_.GetWeakPtr())); |
371 } | 364 } |
372 | 365 |
373 void RemoteDemuxerStreamAdapter::SendReadAck() { | 366 void DemuxerStreamAdapter::SendReadAck() { |
374 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 367 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
375 DEMUXER_VLOG(3) << "last_count_=" << last_count_ | 368 DEMUXER_VLOG(3) << "last_count_=" << last_count_ |
376 << ", remote_read_callback_handle=" | 369 << ", remote_read_callback_handle=" |
377 << read_until_callback_handle_ | 370 << read_until_callback_handle_ |
378 << ", media_status=" << media_status_; | 371 << ", media_status=" << media_status_; |
379 // Issues RPC_DS_READUNTIL_CALLBACK RPC message. | 372 // Issues RPC_DS_READUNTIL_CALLBACK RPC message. |
380 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); | 373 std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); |
381 rpc->set_handle(read_until_callback_handle_); | 374 rpc->set_handle(read_until_callback_handle_); |
382 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); | 375 rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); |
383 auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc(); | 376 auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc(); |
384 message->set_count(last_count_); | 377 message->set_count(last_count_); |
385 message->set_status( | 378 message->set_status(ToProtoDemuxerStreamStatus(media_status_).value()); |
386 remoting::ToProtoDemuxerStreamStatus(media_status_).value()); | 379 if (media_status_ == DemuxerStream::kConfigChanged) { |
387 if (media_status_ == ::media::DemuxerStream::kConfigChanged) { | |
388 if (audio_config_.IsValidConfig()) { | 380 if (audio_config_.IsValidConfig()) { |
389 pb::AudioDecoderConfig* audio_message = | 381 pb::AudioDecoderConfig* audio_message = |
390 message->mutable_audio_decoder_config(); | 382 message->mutable_audio_decoder_config(); |
391 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); | 383 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); |
392 } else if (video_config_.IsValidConfig()) { | 384 } else if (video_config_.IsValidConfig()) { |
393 pb::VideoDecoderConfig* video_message = | 385 pb::VideoDecoderConfig* video_message = |
394 message->mutable_video_decoder_config(); | 386 message->mutable_video_decoder_config(); |
395 ConvertVideoDecoderConfigToProto(video_config_, video_message); | 387 ConvertVideoDecoderConfigToProto(video_config_, video_message); |
396 } else { | 388 } else { |
397 NOTREACHED(); | 389 NOTREACHED(); |
398 } | 390 } |
399 } | 391 } |
400 | 392 |
401 DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle() | 393 DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle() |
402 << " with count=" << message->count() | 394 << " with count=" << message->count() |
403 << ", status=" << message->status() << ", decoder_config={" | 395 << ", status=" << message->status() << ", decoder_config={" |
404 << (audio_config_.IsValidConfig() | 396 << (audio_config_.IsValidConfig() |
405 ? audio_config_.AsHumanReadableString() | 397 ? audio_config_.AsHumanReadableString() |
406 : video_config_.IsValidConfig() | 398 : video_config_.IsValidConfig() |
407 ? video_config_.AsHumanReadableString() | 399 ? video_config_.AsHumanReadableString() |
408 : "DID NOT CHANGE") | 400 : "DID NOT CHANGE") |
409 << '}'; | 401 << '}'; |
410 main_task_runner_->PostTask( | 402 main_task_runner_->PostTask( |
411 FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote, | 403 FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_, |
412 rpc_broker_, base::Passed(&rpc))); | 404 base::Passed(&rpc))); |
413 // Resets callback handle after completing the reading request. | 405 // Resets callback handle after completing the reading request. |
414 read_until_callback_handle_ = kInvalidHandle; | 406 read_until_callback_handle_ = RpcBroker::kInvalidHandle; |
415 | 407 |
416 // Resets audio/video decoder config since it only sends once. | 408 // Resets audio/video decoder config since it only sends once. |
417 if (audio_config_.IsValidConfig()) | 409 if (audio_config_.IsValidConfig()) |
418 audio_config_ = ::media::AudioDecoderConfig(); | 410 audio_config_ = AudioDecoderConfig(); |
419 if (video_config_.IsValidConfig()) | 411 if (video_config_.IsValidConfig()) |
420 video_config_ = ::media::VideoDecoderConfig(); | 412 video_config_ = VideoDecoderConfig(); |
421 } | 413 } |
422 | 414 |
423 void RemoteDemuxerStreamAdapter::ResetPendingFrame() { | 415 void DemuxerStreamAdapter::ResetPendingFrame() { |
424 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 416 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
425 current_pending_frame_offset_ = 0; | 417 current_pending_frame_offset_ = 0; |
426 pending_frame_.clear(); | 418 pending_frame_.clear(); |
427 pending_frame_is_eos_ = false; | 419 pending_frame_is_eos_ = false; |
428 } | 420 } |
429 | 421 |
430 void RemoteDemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) { | 422 void DemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) { |
431 DCHECK(media_task_runner_->BelongsToCurrentThread()); | 423 DCHECK(media_task_runner_->BelongsToCurrentThread()); |
432 | 424 |
433 DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger; | 425 DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger; |
434 | 426 |
435 if (error_callback_.is_null()) | 427 if (error_callback_.is_null()) |
436 return; | 428 return; |
437 | 429 |
438 if (write_watcher_.IsWatching()) { | 430 if (write_watcher_.IsWatching()) { |
439 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; | 431 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; |
440 write_watcher_.Cancel(); | 432 write_watcher_.Cancel(); |
441 } | 433 } |
442 | 434 |
443 base::ResetAndReturn(&error_callback_).Run(stop_trigger); | 435 base::ResetAndReturn(&error_callback_).Run(stop_trigger); |
444 } | 436 } |
445 | 437 |
446 } // namespace remoting | 438 } // namespace remoting |
447 } // namespace media | 439 } // namespace media |
OLD | NEW |