Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(497)

Side by Side Diff: media/remoting/demuxer_stream_adapter.cc

Issue 2750373002: Revert of Mojo: Armed Watchers (Closed)
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "media/remoting/demuxer_stream_adapter.h" 5 #include "media/remoting/demuxer_stream_adapter.h"
6 6
7 #include "base/base64.h" 7 #include "base/base64.h"
8 #include "base/bind.h" 8 #include "base/bind.h"
9 #include "base/callback_helpers.h" 9 #include "base/callback_helpers.h"
10 #include "media/base/bind_to_current_loop.h" 10 #include "media/base/bind_to_current_loop.h"
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
50 demuxer_stream_(demuxer_stream), 50 demuxer_stream_(demuxer_stream),
51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), 51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN),
52 error_callback_(error_callback), 52 error_callback_(error_callback),
53 remote_callback_handle_(RpcBroker::kInvalidHandle), 53 remote_callback_handle_(RpcBroker::kInvalidHandle),
54 read_until_callback_handle_(RpcBroker::kInvalidHandle), 54 read_until_callback_handle_(RpcBroker::kInvalidHandle),
55 read_until_count_(0), 55 read_until_count_(0),
56 last_count_(0), 56 last_count_(0),
57 pending_flush_(false), 57 pending_flush_(false),
58 current_pending_frame_offset_(0), 58 current_pending_frame_offset_(0),
59 pending_frame_is_eos_(false), 59 pending_frame_is_eos_(false),
60 write_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL), 60 write_watcher_(FROM_HERE),
61 media_status_(DemuxerStream::kOk), 61 media_status_(DemuxerStream::kOk),
62 producer_handle_(std::move(producer_handle)), 62 producer_handle_(std::move(producer_handle)),
63 bytes_written_to_pipe_(0), 63 bytes_written_to_pipe_(0),
64 request_buffer_weak_factory_(this), 64 request_buffer_weak_factory_(this),
65 weak_factory_(this) { 65 weak_factory_(this) {
66 DCHECK(main_task_runner_); 66 DCHECK(main_task_runner_);
67 DCHECK(media_task_runner_); 67 DCHECK(media_task_runner_);
68 DCHECK(media_task_runner_->BelongsToCurrentThread()); 68 DCHECK(media_task_runner_->BelongsToCurrentThread());
69 DCHECK(demuxer_stream); 69 DCHECK(demuxer_stream);
70 DCHECK(!error_callback.is_null()); 70 DCHECK(!error_callback.is_null());
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
195 ? audio_config_.AsHumanReadableString() 195 ? audio_config_.AsHumanReadableString()
196 : video_config_.AsHumanReadableString()) 196 : video_config_.AsHumanReadableString())
197 << '}'; 197 << '}';
198 main_task_runner_->PostTask( 198 main_task_runner_->PostTask(
199 FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_, 199 FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_,
200 base::Passed(&rpc))); 200 base::Passed(&rpc)));
201 201
202 // Starts Mojo watcher. 202 // Starts Mojo watcher.
203 if (!write_watcher_.IsWatching()) { 203 if (!write_watcher_.IsWatching()) {
204 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher"; 204 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher";
205 write_watcher_.Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, 205 write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
206 base::Bind(&DemuxerStreamAdapter::TryWriteData, 206 base::Bind(&DemuxerStreamAdapter::TryWriteData,
207 weak_factory_.GetWeakPtr())); 207 weak_factory_.GetWeakPtr()));
208 write_watcher_.ArmOrNotify();
209 } 208 }
210 } 209 }
211 210
212 void DemuxerStreamAdapter::ReadUntil(std::unique_ptr<pb::RpcMessage> message) { 211 void DemuxerStreamAdapter::ReadUntil(std::unique_ptr<pb::RpcMessage> message) {
213 DCHECK(media_task_runner_->BelongsToCurrentThread()); 212 DCHECK(media_task_runner_->BelongsToCurrentThread());
214 DCHECK(message); 213 DCHECK(message);
215 if (!message->has_demuxerstream_readuntil_rpc()) { 214 if (!message->has_demuxerstream_readuntil_rpc()) {
216 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; 215 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC";
217 OnFatalError(RPC_INVALID); 216 OnFatalError(RPC_INVALID);
218 return; 217 return;
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
320 if (!stream_sender_ || !producer_handle_.is_valid()) { 319 if (!stream_sender_ || !producer_handle_.is_valid()) {
321 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid"; 320 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid";
322 return; 321 return;
323 } 322 }
324 323
325 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_; 324 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_;
326 MojoResult mojo_result = 325 MojoResult mojo_result =
327 WriteDataRaw(producer_handle_.get(), 326 WriteDataRaw(producer_handle_.get(),
328 pending_frame_.data() + current_pending_frame_offset_, 327 pending_frame_.data() + current_pending_frame_offset_,
329 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); 328 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
330 if (mojo_result != MOJO_RESULT_OK && mojo_result != MOJO_RESULT_SHOULD_WAIT) { 329 if (mojo_result != MOJO_RESULT_OK) {
331 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" 330 if (mojo_result != MOJO_RESULT_SHOULD_WAIT) {
332 << mojo_result; 331 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:"
333 OnFatalError(MOJO_PIPE_ERROR); 332 << mojo_result;
333 OnFatalError(MOJO_PIPE_ERROR);
334 }
334 return; 335 return;
335 } 336 }
336 337
337 write_watcher_.ArmOrNotify();
338 if (mojo_result != MOJO_RESULT_OK)
339 return;
340
341 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, 338 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes,
342 pending_frame_.size()); 339 pending_frame_.size());
343 current_pending_frame_offset_ += num_bytes; 340 current_pending_frame_offset_ += num_bytes;
344 bytes_written_to_pipe_ += num_bytes; 341 bytes_written_to_pipe_ += num_bytes;
345 342
346 // Checks if all buffer was written to browser process. 343 // Checks if all buffer was written to browser process.
347 if (current_pending_frame_offset_ != pending_frame_.size()) { 344 if (current_pending_frame_offset_ != pending_frame_.size()) {
348 // Returns and wait for mojo watcher to notify to write more data. 345 // Returns and wait for mojo watcher to notify to write more data.
349 return; 346 return;
350 } 347 }
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
437 if (write_watcher_.IsWatching()) { 434 if (write_watcher_.IsWatching()) {
438 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; 435 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher";
439 write_watcher_.Cancel(); 436 write_watcher_.Cancel();
440 } 437 }
441 438
442 base::ResetAndReturn(&error_callback_).Run(stop_trigger); 439 base::ResetAndReturn(&error_callback_).Run(stop_trigger);
443 } 440 }
444 441
445 } // namespace remoting 442 } // namespace remoting
446 } // namespace media 443 } // namespace media
OLDNEW
« no previous file with comments | « media/remoting/demuxer_stream_adapter.h ('k') | mojo/android/javatests/src/org/chromium/mojo/system/impl/WatcherImplTest.java » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698