Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(181)

Side by Side Diff: media/remoting/demuxer_stream_adapter.cc

Issue 2643253003: Media Remoting Clean-up: Less-redundant naming, style consistency, etc. (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "media/remoting/remote_demuxer_stream_adapter.h" 5 #include "media/remoting/demuxer_stream_adapter.h"
6 6
7 #include "base/base64.h" 7 #include "base/base64.h"
8 #include "base/bind.h" 8 #include "base/bind.h"
9 #include "base/callback_helpers.h" 9 #include "base/callback_helpers.h"
10 #include "media/base/bind_to_current_loop.h" 10 #include "media/base/bind_to_current_loop.h"
11 #include "media/base/decoder_buffer.h" 11 #include "media/base/decoder_buffer.h"
12 #include "media/base/timestamp_constants.h" 12 #include "media/base/timestamp_constants.h"
13 #include "media/remoting/rpc/proto_enum_utils.h" 13 #include "media/remoting/proto_enum_utils.h"
14 #include "media/remoting/rpc/proto_utils.h" 14 #include "media/remoting/proto_utils.h"
15 15
16 // Convenience logging macro used throughout this file. 16 // Convenience logging macro used throughout this file.
17 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: " 17 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: "
18 18
19 namespace media { 19 namespace media {
20 namespace remoting { 20 namespace remoting {
21 21
22 // static 22 // static
23 mojo::DataPipe* CreateDataPipe() { 23 mojo::DataPipe* DemuxerStreamAdapter::CreateDataPipe() {
24 // Capacity in bytes for Mojo data pipe. 24 // Capacity in bytes for Mojo data pipe.
25 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024; 25 constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024;
26 26
27 MojoCreateDataPipeOptions options; 27 MojoCreateDataPipeOptions options;
28 options.struct_size = sizeof(MojoCreateDataPipeOptions); 28 options.struct_size = sizeof(MojoCreateDataPipeOptions);
29 options.flags = MOJO_WRITE_DATA_FLAG_NONE; 29 options.flags = MOJO_WRITE_DATA_FLAG_NONE;
30 options.element_num_bytes = 1; 30 options.element_num_bytes = 1;
31 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes; 31 options.capacity_num_bytes = kMojoDataPipeCapacityInBytes;
32 return new mojo::DataPipe(options); 32 return new mojo::DataPipe(options);
33 } 33 }
34 34
35 RemoteDemuxerStreamAdapter::RemoteDemuxerStreamAdapter( 35 DemuxerStreamAdapter::DemuxerStreamAdapter(
36 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, 36 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
37 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, 37 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner,
38 const std::string& name, 38 const std::string& name,
39 ::media::DemuxerStream* demuxer_stream, 39 DemuxerStream* demuxer_stream,
40 const base::WeakPtr<RpcBroker>& rpc_broker, 40 const base::WeakPtr<RpcBroker>& rpc_broker,
41 int rpc_handle, 41 int rpc_handle,
42 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, 42 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info,
43 mojo::ScopedDataPipeProducerHandle producer_handle, 43 mojo::ScopedDataPipeProducerHandle producer_handle,
44 const ErrorCallback& error_callback) 44 const ErrorCallback& error_callback)
45 : main_task_runner_(std::move(main_task_runner)), 45 : main_task_runner_(std::move(main_task_runner)),
46 media_task_runner_(std::move(media_task_runner)), 46 media_task_runner_(std::move(media_task_runner)),
47 name_(name), 47 name_(name),
48 rpc_broker_(rpc_broker), 48 rpc_broker_(rpc_broker),
49 rpc_handle_(rpc_handle), 49 rpc_handle_(rpc_handle),
50 demuxer_stream_(demuxer_stream), 50 demuxer_stream_(demuxer_stream),
51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), 51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN),
52 error_callback_(error_callback), 52 error_callback_(error_callback),
53 remote_callback_handle_(kInvalidHandle), 53 remote_callback_handle_(RpcBroker::kInvalidHandle),
54 read_until_callback_handle_(kInvalidHandle), 54 read_until_callback_handle_(RpcBroker::kInvalidHandle),
55 read_until_count_(0), 55 read_until_count_(0),
56 last_count_(0), 56 last_count_(0),
57 pending_flush_(false), 57 pending_flush_(false),
58 current_pending_frame_offset_(0), 58 current_pending_frame_offset_(0),
59 pending_frame_is_eos_(false), 59 pending_frame_is_eos_(false),
60 media_status_(::media::DemuxerStream::kOk), 60 media_status_(DemuxerStream::kOk),
61 producer_handle_(std::move(producer_handle)), 61 producer_handle_(std::move(producer_handle)),
62 bytes_written_to_pipe_(0), 62 bytes_written_to_pipe_(0),
63 request_buffer_weak_factory_(this), 63 request_buffer_weak_factory_(this),
64 weak_factory_(this) { 64 weak_factory_(this) {
65 DCHECK(main_task_runner_); 65 DCHECK(main_task_runner_);
66 DCHECK(media_task_runner_); 66 DCHECK(media_task_runner_);
67 DCHECK(media_task_runner_->BelongsToCurrentThread()); 67 DCHECK(media_task_runner_->BelongsToCurrentThread());
68 DCHECK(demuxer_stream); 68 DCHECK(demuxer_stream);
69 DCHECK(!error_callback.is_null()); 69 DCHECK(!error_callback.is_null());
70 const RpcBroker::ReceiveMessageCallback receive_callback = 70 const RpcBroker::ReceiveMessageCallback receive_callback =
71 media::BindToCurrentLoop( 71 BindToCurrentLoop(base::Bind(&DemuxerStreamAdapter::OnReceivedRpc,
72 base::Bind(&RemoteDemuxerStreamAdapter::OnReceivedRpc, 72 weak_factory_.GetWeakPtr()));
73 weak_factory_.GetWeakPtr()));
74 main_task_runner_->PostTask( 73 main_task_runner_->PostTask(
75 FROM_HERE, 74 FROM_HERE, base::Bind(&RpcBroker::RegisterMessageReceiverCallback,
76 base::Bind(&remoting::RpcBroker::RegisterMessageReceiverCallback, 75 rpc_broker_, rpc_handle_, receive_callback));
77 rpc_broker_, rpc_handle_, receive_callback));
78 76
79 stream_sender_.Bind(std::move(stream_sender_info)); 77 stream_sender_.Bind(std::move(stream_sender_info));
80 stream_sender_.set_connection_error_handler( 78 stream_sender_.set_connection_error_handler(
81 base::Bind(&RemoteDemuxerStreamAdapter::OnFatalError, 79 base::Bind(&DemuxerStreamAdapter::OnFatalError,
82 weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR)); 80 weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR));
83 } 81 }
84 82
85 RemoteDemuxerStreamAdapter::~RemoteDemuxerStreamAdapter() { 83 DemuxerStreamAdapter::~DemuxerStreamAdapter() {
86 DCHECK(media_task_runner_->BelongsToCurrentThread()); 84 DCHECK(media_task_runner_->BelongsToCurrentThread());
87 main_task_runner_->PostTask( 85 main_task_runner_->PostTask(
88 FROM_HERE, 86 FROM_HERE, base::Bind(&RpcBroker::UnregisterMessageReceiverCallback,
89 base::Bind(&remoting::RpcBroker::UnregisterMessageReceiverCallback, 87 rpc_broker_, rpc_handle_));
90 rpc_broker_, rpc_handle_));
91 } 88 }
92 89
93 int64_t RemoteDemuxerStreamAdapter::GetBytesWrittenAndReset() { 90 int64_t DemuxerStreamAdapter::GetBytesWrittenAndReset() {
94 DCHECK(media_task_runner_->BelongsToCurrentThread()); 91 DCHECK(media_task_runner_->BelongsToCurrentThread());
95 const int64_t current_count = bytes_written_to_pipe_; 92 const int64_t current_count = bytes_written_to_pipe_;
96 bytes_written_to_pipe_ = 0; 93 bytes_written_to_pipe_ = 0;
97 return current_count; 94 return current_count;
98 } 95 }
99 96
100 base::Optional<uint32_t> RemoteDemuxerStreamAdapter::SignalFlush( 97 base::Optional<uint32_t> DemuxerStreamAdapter::SignalFlush(bool flushing) {
101 bool flushing) {
102 DCHECK(media_task_runner_->BelongsToCurrentThread()); 98 DCHECK(media_task_runner_->BelongsToCurrentThread());
103 DEMUXER_VLOG(2) << "flushing=" << flushing; 99 DEMUXER_VLOG(2) << "flushing=" << flushing;
104 100
105 // Ignores if |pending_flush_| states is same. 101 // Ignores if |pending_flush_| states is same.
106 if (pending_flush_ == flushing) 102 if (pending_flush_ == flushing)
107 return base::nullopt; 103 return base::nullopt;
108 104
109 // Cleans up pending frame data. 105 // Cleans up pending frame data.
110 pending_frame_.clear(); 106 pending_frame_.clear();
111 current_pending_frame_offset_ = 0; 107 current_pending_frame_offset_ = 0;
112 pending_frame_is_eos_ = false; 108 pending_frame_is_eos_ = false;
113 // Invalidates pending Read() tasks. 109 // Invalidates pending Read() tasks.
114 request_buffer_weak_factory_.InvalidateWeakPtrs(); 110 request_buffer_weak_factory_.InvalidateWeakPtrs();
115 111
116 // Cancels in flight data in browser process. 112 // Cancels in flight data in browser process.
117 pending_flush_ = flushing; 113 pending_flush_ = flushing;
118 if (flushing) { 114 if (flushing) {
119 stream_sender_->CancelInFlightData(); 115 stream_sender_->CancelInFlightData();
120 } else { 116 } else {
121 // Sets callback handle invalid to abort ongoing read request. 117 // Sets callback handle invalid to abort ongoing read request.
122 read_until_callback_handle_ = kInvalidHandle; 118 read_until_callback_handle_ = RpcBroker::kInvalidHandle;
123 } 119 }
124 return last_count_; 120 return last_count_;
125 } 121 }
126 122
127 void RemoteDemuxerStreamAdapter::OnReceivedRpc( 123 void DemuxerStreamAdapter::OnReceivedRpc(
128 std::unique_ptr<remoting::pb::RpcMessage> message) { 124 std::unique_ptr<pb::RpcMessage> message) {
129 DCHECK(media_task_runner_->BelongsToCurrentThread()); 125 DCHECK(media_task_runner_->BelongsToCurrentThread());
130 DCHECK(message); 126 DCHECK(message);
131 DCHECK(rpc_handle_ == message->handle()); 127 DCHECK(rpc_handle_ == message->handle());
132 128
133 switch (message->proc()) { 129 switch (message->proc()) {
134 case remoting::pb::RpcMessage::RPC_DS_INITIALIZE: 130 case pb::RpcMessage::RPC_DS_INITIALIZE:
135 Initialize(message->integer_value()); 131 Initialize(message->integer_value());
136 break; 132 break;
137 case remoting::pb::RpcMessage::RPC_DS_READUNTIL: 133 case pb::RpcMessage::RPC_DS_READUNTIL:
138 ReadUntil(std::move(message)); 134 ReadUntil(std::move(message));
139 break; 135 break;
140 case remoting::pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER: 136 case pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER:
141 EnableBitstreamConverter(); 137 EnableBitstreamConverter();
142 break; 138 break;
143 139
144 default: 140 default:
145 DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc(); 141 DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc();
146 } 142 }
147 } 143 }
148 144
149 void RemoteDemuxerStreamAdapter::Initialize(int remote_callback_handle) { 145 void DemuxerStreamAdapter::Initialize(int remote_callback_handle) {
150 DCHECK(media_task_runner_->BelongsToCurrentThread()); 146 DCHECK(media_task_runner_->BelongsToCurrentThread());
151 DCHECK(!pending_flush_); 147 DCHECK(!pending_flush_);
152 DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle=" 148 DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle="
153 << remote_callback_handle; 149 << remote_callback_handle;
154 150
155 // Checks if initialization had been called or not. 151 // Checks if initialization had been called or not.
156 if (remote_callback_handle_ != kInvalidHandle) { 152 if (remote_callback_handle_ != RpcBroker::kInvalidHandle) {
157 DEMUXER_VLOG(1) << "Duplicated initialization. Have: " 153 DEMUXER_VLOG(1) << "Duplicated initialization. Have: "
158 << remote_callback_handle_ 154 << remote_callback_handle_
159 << ", Given: " << remote_callback_handle; 155 << ", Given: " << remote_callback_handle;
160 // Shuts down data pipe if available if providing different remote callback 156 // Shuts down data pipe if available if providing different remote callback
161 // handle for initialization. Otherwise, just silently ignore the duplicated 157 // handle for initialization. Otherwise, just silently ignore the duplicated
162 // request. 158 // request.
163 if (remote_callback_handle_ != remote_callback_handle) { 159 if (remote_callback_handle_ != remote_callback_handle) {
164 OnFatalError(PEERS_OUT_OF_SYNC); 160 OnFatalError(PEERS_OUT_OF_SYNC);
165 } 161 }
166 return; 162 return;
167 } 163 }
168 remote_callback_handle_ = remote_callback_handle; 164 remote_callback_handle_ = remote_callback_handle;
169 165
170 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. 166 // Issues RPC_DS_INITIALIZE_CALLBACK RPC message.
171 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); 167 std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
172 rpc->set_handle(remote_callback_handle_); 168 rpc->set_handle(remote_callback_handle_);
173 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); 169 rpc->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK);
174 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); 170 auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc();
175 init_cb_message->set_type(type_); 171 init_cb_message->set_type(type_);
176 switch (type_) { 172 switch (type_) {
177 case ::media::DemuxerStream::Type::AUDIO: { 173 case DemuxerStream::Type::AUDIO: {
178 audio_config_ = demuxer_stream_->audio_decoder_config(); 174 audio_config_ = demuxer_stream_->audio_decoder_config();
179 pb::AudioDecoderConfig* audio_message = 175 pb::AudioDecoderConfig* audio_message =
180 init_cb_message->mutable_audio_decoder_config(); 176 init_cb_message->mutable_audio_decoder_config();
181 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); 177 ConvertAudioDecoderConfigToProto(audio_config_, audio_message);
182 break; 178 break;
183 } 179 }
184 case ::media::DemuxerStream::Type::VIDEO: { 180 case DemuxerStream::Type::VIDEO: {
185 video_config_ = demuxer_stream_->video_decoder_config(); 181 video_config_ = demuxer_stream_->video_decoder_config();
186 pb::VideoDecoderConfig* video_message = 182 pb::VideoDecoderConfig* video_message =
187 init_cb_message->mutable_video_decoder_config(); 183 init_cb_message->mutable_video_decoder_config();
188 ConvertVideoDecoderConfigToProto(video_config_, video_message); 184 ConvertVideoDecoderConfigToProto(video_config_, video_message);
189 break; 185 break;
190 } 186 }
191 default: 187 default:
192 NOTREACHED(); 188 NOTREACHED();
193 } 189 }
194 190
195 DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle() 191 DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle()
196 << " with decoder_config={" 192 << " with decoder_config={"
197 << (type_ == ::media::DemuxerStream::Type::AUDIO 193 << (type_ == DemuxerStream::Type::AUDIO
198 ? audio_config_.AsHumanReadableString() 194 ? audio_config_.AsHumanReadableString()
199 : video_config_.AsHumanReadableString()) 195 : video_config_.AsHumanReadableString())
200 << '}'; 196 << '}';
201 main_task_runner_->PostTask( 197 main_task_runner_->PostTask(
202 FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote, 198 FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_,
203 rpc_broker_, base::Passed(&rpc))); 199 base::Passed(&rpc)));
204 200
205 // Starts Mojo watcher. 201 // Starts Mojo watcher.
206 if (!write_watcher_.IsWatching()) { 202 if (!write_watcher_.IsWatching()) {
207 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher"; 203 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher";
208 write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, 204 write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
209 base::Bind(&RemoteDemuxerStreamAdapter::TryWriteData, 205 base::Bind(&DemuxerStreamAdapter::TryWriteData,
210 weak_factory_.GetWeakPtr())); 206 weak_factory_.GetWeakPtr()));
211 } 207 }
212 } 208 }
213 209
214 void RemoteDemuxerStreamAdapter::ReadUntil( 210 void DemuxerStreamAdapter::ReadUntil(std::unique_ptr<pb::RpcMessage> message) {
215 std::unique_ptr<remoting::pb::RpcMessage> message) {
216 DCHECK(media_task_runner_->BelongsToCurrentThread()); 211 DCHECK(media_task_runner_->BelongsToCurrentThread());
217 DCHECK(message); 212 DCHECK(message);
218 if (!message->has_demuxerstream_readuntil_rpc()) { 213 if (!message->has_demuxerstream_readuntil_rpc()) {
219 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; 214 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC";
220 OnFatalError(RPC_INVALID); 215 OnFatalError(RPC_INVALID);
221 return; 216 return;
222 } 217 }
223 218
224 const pb::DemuxerStreamReadUntil& rpc_message = 219 const pb::DemuxerStreamReadUntil& rpc_message =
225 message->demuxerstream_readuntil_rpc(); 220 message->demuxerstream_readuntil_rpc();
(...skipping 15 matching lines...) Expand all
241 DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to " 236 DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to "
242 "current frame count"; 237 "current frame count";
243 return; 238 return;
244 } 239 }
245 240
246 read_until_count_ = rpc_message.count(); 241 read_until_count_ = rpc_message.count();
247 read_until_callback_handle_ = rpc_message.callback_handle(); 242 read_until_callback_handle_ = rpc_message.callback_handle();
248 RequestBuffer(); 243 RequestBuffer();
249 } 244 }
250 245
251 void RemoteDemuxerStreamAdapter::EnableBitstreamConverter() { 246 void DemuxerStreamAdapter::EnableBitstreamConverter() {
252 DCHECK(media_task_runner_->BelongsToCurrentThread()); 247 DCHECK(media_task_runner_->BelongsToCurrentThread());
253 DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER"; 248 DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER";
254 demuxer_stream_->EnableBitstreamConverter(); 249 demuxer_stream_->EnableBitstreamConverter();
255 } 250 }
256 251
257 void RemoteDemuxerStreamAdapter::RequestBuffer() { 252 void DemuxerStreamAdapter::RequestBuffer() {
258 DCHECK(media_task_runner_->BelongsToCurrentThread()); 253 DCHECK(media_task_runner_->BelongsToCurrentThread());
259 if (!IsProcessingReadRequest() || pending_flush_) { 254 if (!IsProcessingReadRequest() || pending_flush_) {
260 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; 255 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state";
261 return; 256 return;
262 } 257 }
263 demuxer_stream_->Read(base::Bind(&RemoteDemuxerStreamAdapter::OnNewBuffer, 258 demuxer_stream_->Read(base::Bind(&DemuxerStreamAdapter::OnNewBuffer,
264 request_buffer_weak_factory_.GetWeakPtr())); 259 request_buffer_weak_factory_.GetWeakPtr()));
265 } 260 }
266 261
267 void RemoteDemuxerStreamAdapter::OnNewBuffer( 262 void DemuxerStreamAdapter::OnNewBuffer(
268 ::media::DemuxerStream::Status status, 263 DemuxerStream::Status status,
269 const scoped_refptr<::media::DecoderBuffer>& input) { 264 const scoped_refptr<DecoderBuffer>& input) {
270 DEMUXER_VLOG(3) << "status=" << status; 265 DEMUXER_VLOG(3) << "status=" << status;
271 DCHECK(media_task_runner_->BelongsToCurrentThread()); 266 DCHECK(media_task_runner_->BelongsToCurrentThread());
272 if (!IsProcessingReadRequest() || pending_flush_) { 267 if (!IsProcessingReadRequest() || pending_flush_) {
273 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state"; 268 DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state";
274 return; 269 return;
275 } 270 }
276 271
277 switch (status) { 272 switch (status) {
278 case ::media::DemuxerStream::kAborted: 273 case DemuxerStream::kAborted:
279 DCHECK(!input); 274 DCHECK(!input);
280 SendReadAck(); 275 SendReadAck();
281 return; 276 return;
282 case ::media::DemuxerStream::kConfigChanged: 277 case DemuxerStream::kConfigChanged:
283 // TODO(erickung): consider sending updated Audio/Video decoder config to
miu 2017/01/20 23:14:41 Removed this TODO comment because we did this rece
xjz 2017/01/21 06:12:33 The controller now only get the update of natural_
miu 2017/01/23 20:57:36 Ok. I put this TODO back and explained further.
284 // RemotingRendererController.
285 // Stores available audio/video decoder config and issues 278 // Stores available audio/video decoder config and issues
286 // RPC_DS_READUNTIL_CALLBACK RPC to notify receiver. 279 // RPC_DS_READUNTIL_CALLBACK RPC to notify receiver.
287 DCHECK(!input); 280 DCHECK(!input);
288 media_status_ = status; 281 media_status_ = status;
289 if (demuxer_stream_->type() == ::media::DemuxerStream::VIDEO) 282 if (demuxer_stream_->type() == DemuxerStream::VIDEO)
290 video_config_ = demuxer_stream_->video_decoder_config(); 283 video_config_ = demuxer_stream_->video_decoder_config();
291 if (demuxer_stream_->type() == ::media::DemuxerStream::AUDIO) 284 if (demuxer_stream_->type() == DemuxerStream::AUDIO)
292 audio_config_ = demuxer_stream_->audio_decoder_config(); 285 audio_config_ = demuxer_stream_->audio_decoder_config();
293 SendReadAck(); 286 SendReadAck();
294 return; 287 return;
295 case ::media::DemuxerStream::kOk: { 288 case DemuxerStream::kOk: {
296 media_status_ = status; 289 media_status_ = status;
297 DCHECK(pending_frame_.empty()); 290 DCHECK(pending_frame_.empty());
298 if (!producer_handle_.is_valid()) 291 if (!producer_handle_.is_valid())
299 return; // Do not start sending (due to previous fatal error). 292 return; // Do not start sending (due to previous fatal error).
300 pending_frame_ = DecoderBufferToByteArray(input); 293 pending_frame_ = DecoderBufferToByteArray(*input);
301 pending_frame_is_eos_ = input->end_of_stream(); 294 pending_frame_is_eos_ = input->end_of_stream();
302 TryWriteData(MOJO_RESULT_OK); 295 TryWriteData(MOJO_RESULT_OK);
303 } break; 296 } break;
304 } 297 }
305 } 298 }
306 299
307 void RemoteDemuxerStreamAdapter::TryWriteData(MojoResult result) { 300 void DemuxerStreamAdapter::TryWriteData(MojoResult result) {
308 DCHECK(media_task_runner_->BelongsToCurrentThread()); 301 DCHECK(media_task_runner_->BelongsToCurrentThread());
309 // The Mojo watcher will also call TryWriteData() sometimes as a notification 302 // The Mojo watcher will also call TryWriteData() sometimes as a notification
310 // that data pipe is ready. But that does not necessarily mean the data for a 303 // that data pipe is ready. But that does not necessarily mean the data for a
311 // read request is ready to be written into the pipe. 304 // read request is ready to be written into the pipe.
312 if (!IsProcessingReadRequest() || pending_flush_) { 305 if (!IsProcessingReadRequest() || pending_flush_) {
313 DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state"; 306 DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state";
314 return; 307 return;
315 } 308 }
316 309
317 if (pending_frame_.empty()) { 310 if (pending_frame_.empty()) {
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
358 ResetPendingFrame(); 351 ResetPendingFrame();
359 352
360 // Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message. 353 // Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message.
361 if (read_until_count_ == last_count_ || pending_frame_is_eos) { 354 if (read_until_count_ == last_count_ || pending_frame_is_eos) {
362 SendReadAck(); 355 SendReadAck();
363 return; 356 return;
364 } 357 }
365 358
366 // Contiune to read decoder buffer until reaching |read_until_count_| or 359 // Contiune to read decoder buffer until reaching |read_until_count_| or
367 // end of stream. 360 // end of stream.
368 media_task_runner_->PostTask( 361 media_task_runner_->PostTask(FROM_HERE,
369 FROM_HERE, base::Bind(&RemoteDemuxerStreamAdapter::RequestBuffer, 362 base::Bind(&DemuxerStreamAdapter::RequestBuffer,
370 weak_factory_.GetWeakPtr())); 363 weak_factory_.GetWeakPtr()));
371 } 364 }
372 365
373 void RemoteDemuxerStreamAdapter::SendReadAck() { 366 void DemuxerStreamAdapter::SendReadAck() {
374 DCHECK(media_task_runner_->BelongsToCurrentThread()); 367 DCHECK(media_task_runner_->BelongsToCurrentThread());
375 DEMUXER_VLOG(3) << "last_count_=" << last_count_ 368 DEMUXER_VLOG(3) << "last_count_=" << last_count_
376 << ", remote_read_callback_handle=" 369 << ", remote_read_callback_handle="
377 << read_until_callback_handle_ 370 << read_until_callback_handle_
378 << ", media_status=" << media_status_; 371 << ", media_status=" << media_status_;
379 // Issues RPC_DS_READUNTIL_CALLBACK RPC message. 372 // Issues RPC_DS_READUNTIL_CALLBACK RPC message.
380 std::unique_ptr<remoting::pb::RpcMessage> rpc(new remoting::pb::RpcMessage()); 373 std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
381 rpc->set_handle(read_until_callback_handle_); 374 rpc->set_handle(read_until_callback_handle_);
382 rpc->set_proc(remoting::pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); 375 rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK);
383 auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc(); 376 auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc();
384 message->set_count(last_count_); 377 message->set_count(last_count_);
385 message->set_status( 378 message->set_status(ToProtoDemuxerStreamStatus(media_status_).value());
386 remoting::ToProtoDemuxerStreamStatus(media_status_).value()); 379 if (media_status_ == DemuxerStream::kConfigChanged) {
387 if (media_status_ == ::media::DemuxerStream::kConfigChanged) {
388 if (audio_config_.IsValidConfig()) { 380 if (audio_config_.IsValidConfig()) {
389 pb::AudioDecoderConfig* audio_message = 381 pb::AudioDecoderConfig* audio_message =
390 message->mutable_audio_decoder_config(); 382 message->mutable_audio_decoder_config();
391 ConvertAudioDecoderConfigToProto(audio_config_, audio_message); 383 ConvertAudioDecoderConfigToProto(audio_config_, audio_message);
392 } else if (video_config_.IsValidConfig()) { 384 } else if (video_config_.IsValidConfig()) {
393 pb::VideoDecoderConfig* video_message = 385 pb::VideoDecoderConfig* video_message =
394 message->mutable_video_decoder_config(); 386 message->mutable_video_decoder_config();
395 ConvertVideoDecoderConfigToProto(video_config_, video_message); 387 ConvertVideoDecoderConfigToProto(video_config_, video_message);
396 } else { 388 } else {
397 NOTREACHED(); 389 NOTREACHED();
398 } 390 }
399 } 391 }
400 392
401 DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle() 393 DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle()
402 << " with count=" << message->count() 394 << " with count=" << message->count()
403 << ", status=" << message->status() << ", decoder_config={" 395 << ", status=" << message->status() << ", decoder_config={"
404 << (audio_config_.IsValidConfig() 396 << (audio_config_.IsValidConfig()
405 ? audio_config_.AsHumanReadableString() 397 ? audio_config_.AsHumanReadableString()
406 : video_config_.IsValidConfig() 398 : video_config_.IsValidConfig()
407 ? video_config_.AsHumanReadableString() 399 ? video_config_.AsHumanReadableString()
408 : "DID NOT CHANGE") 400 : "DID NOT CHANGE")
409 << '}'; 401 << '}';
410 main_task_runner_->PostTask( 402 main_task_runner_->PostTask(
411 FROM_HERE, base::Bind(&remoting::RpcBroker::SendMessageToRemote, 403 FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_,
412 rpc_broker_, base::Passed(&rpc))); 404 base::Passed(&rpc)));
413 // Resets callback handle after completing the reading request. 405 // Resets callback handle after completing the reading request.
414 read_until_callback_handle_ = kInvalidHandle; 406 read_until_callback_handle_ = RpcBroker::kInvalidHandle;
415 407
416 // Resets audio/video decoder config since it only sends once. 408 // Resets audio/video decoder config since it only sends once.
417 if (audio_config_.IsValidConfig()) 409 if (audio_config_.IsValidConfig())
418 audio_config_ = ::media::AudioDecoderConfig(); 410 audio_config_ = AudioDecoderConfig();
419 if (video_config_.IsValidConfig()) 411 if (video_config_.IsValidConfig())
420 video_config_ = ::media::VideoDecoderConfig(); 412 video_config_ = VideoDecoderConfig();
421 } 413 }
422 414
423 void RemoteDemuxerStreamAdapter::ResetPendingFrame() { 415 void DemuxerStreamAdapter::ResetPendingFrame() {
424 DCHECK(media_task_runner_->BelongsToCurrentThread()); 416 DCHECK(media_task_runner_->BelongsToCurrentThread());
425 current_pending_frame_offset_ = 0; 417 current_pending_frame_offset_ = 0;
426 pending_frame_.clear(); 418 pending_frame_.clear();
427 pending_frame_is_eos_ = false; 419 pending_frame_is_eos_ = false;
428 } 420 }
429 421
430 void RemoteDemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) { 422 void DemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) {
431 DCHECK(media_task_runner_->BelongsToCurrentThread()); 423 DCHECK(media_task_runner_->BelongsToCurrentThread());
432 424
433 DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger; 425 DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger;
434 426
435 if (error_callback_.is_null()) 427 if (error_callback_.is_null())
436 return; 428 return;
437 429
438 if (write_watcher_.IsWatching()) { 430 if (write_watcher_.IsWatching()) {
439 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; 431 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher";
440 write_watcher_.Cancel(); 432 write_watcher_.Cancel();
441 } 433 }
442 434
443 base::ResetAndReturn(&error_callback_).Run(stop_trigger); 435 base::ResetAndReturn(&error_callback_).Run(stop_trigger);
444 } 436 }
445 437
446 } // namespace remoting 438 } // namespace remoting
447 } // namespace media 439 } // namespace media
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698