Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(257)

Side by Side Diff: media/remoting/demuxer_stream_adapter.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "media/remoting/demuxer_stream_adapter.h" 5 #include "media/remoting/demuxer_stream_adapter.h"
6 6
7 #include "base/base64.h" 7 #include "base/base64.h"
8 #include "base/bind.h" 8 #include "base/bind.h"
9 #include "base/callback_helpers.h" 9 #include "base/callback_helpers.h"
10 #include "media/base/bind_to_current_loop.h" 10 #include "media/base/bind_to_current_loop.h"
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
50 demuxer_stream_(demuxer_stream), 50 demuxer_stream_(demuxer_stream),
51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN), 51 type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN),
52 error_callback_(error_callback), 52 error_callback_(error_callback),
53 remote_callback_handle_(RpcBroker::kInvalidHandle), 53 remote_callback_handle_(RpcBroker::kInvalidHandle),
54 read_until_callback_handle_(RpcBroker::kInvalidHandle), 54 read_until_callback_handle_(RpcBroker::kInvalidHandle),
55 read_until_count_(0), 55 read_until_count_(0),
56 last_count_(0), 56 last_count_(0),
57 pending_flush_(false), 57 pending_flush_(false),
58 current_pending_frame_offset_(0), 58 current_pending_frame_offset_(0),
59 pending_frame_is_eos_(false), 59 pending_frame_is_eos_(false),
60 write_watcher_(FROM_HERE), 60 write_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL),
61 media_status_(DemuxerStream::kOk), 61 media_status_(DemuxerStream::kOk),
62 producer_handle_(std::move(producer_handle)), 62 producer_handle_(std::move(producer_handle)),
63 bytes_written_to_pipe_(0), 63 bytes_written_to_pipe_(0),
64 request_buffer_weak_factory_(this), 64 request_buffer_weak_factory_(this),
65 weak_factory_(this) { 65 weak_factory_(this) {
66 DCHECK(main_task_runner_); 66 DCHECK(main_task_runner_);
67 DCHECK(media_task_runner_); 67 DCHECK(media_task_runner_);
68 DCHECK(media_task_runner_->BelongsToCurrentThread()); 68 DCHECK(media_task_runner_->BelongsToCurrentThread());
69 DCHECK(demuxer_stream); 69 DCHECK(demuxer_stream);
70 DCHECK(!error_callback.is_null()); 70 DCHECK(!error_callback.is_null());
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
195 ? audio_config_.AsHumanReadableString() 195 ? audio_config_.AsHumanReadableString()
196 : video_config_.AsHumanReadableString()) 196 : video_config_.AsHumanReadableString())
197 << '}'; 197 << '}';
198 main_task_runner_->PostTask( 198 main_task_runner_->PostTask(
199 FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_, 199 FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_,
200 base::Passed(&rpc))); 200 base::Passed(&rpc)));
201 201
202 // Starts Mojo watcher. 202 // Starts Mojo watcher.
203 if (!write_watcher_.IsWatching()) { 203 if (!write_watcher_.IsWatching()) {
204 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher"; 204 DEMUXER_VLOG(2) << "Start Mojo data pipe watcher";
205 write_watcher_.Start(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, 205 write_watcher_.Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
206 base::Bind(&DemuxerStreamAdapter::TryWriteData, 206 base::Bind(&DemuxerStreamAdapter::TryWriteData,
207 weak_factory_.GetWeakPtr())); 207 weak_factory_.GetWeakPtr()));
208 write_watcher_.ArmOrNotify();
208 } 209 }
209 } 210 }
210 211
211 void DemuxerStreamAdapter::ReadUntil(std::unique_ptr<pb::RpcMessage> message) { 212 void DemuxerStreamAdapter::ReadUntil(std::unique_ptr<pb::RpcMessage> message) {
212 DCHECK(media_task_runner_->BelongsToCurrentThread()); 213 DCHECK(media_task_runner_->BelongsToCurrentThread());
213 DCHECK(message); 214 DCHECK(message);
214 if (!message->has_demuxerstream_readuntil_rpc()) { 215 if (!message->has_demuxerstream_readuntil_rpc()) {
215 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC"; 216 DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC";
216 OnFatalError(RPC_INVALID); 217 OnFatalError(RPC_INVALID);
217 return; 218 return;
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
319 if (!stream_sender_ || !producer_handle_.is_valid()) { 320 if (!stream_sender_ || !producer_handle_.is_valid()) {
320 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid"; 321 DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid";
321 return; 322 return;
322 } 323 }
323 324
324 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_; 325 uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_;
325 MojoResult mojo_result = 326 MojoResult mojo_result =
326 WriteDataRaw(producer_handle_.get(), 327 WriteDataRaw(producer_handle_.get(),
327 pending_frame_.data() + current_pending_frame_offset_, 328 pending_frame_.data() + current_pending_frame_offset_,
328 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); 329 &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
329 if (mojo_result != MOJO_RESULT_OK) { 330 if (mojo_result != MOJO_RESULT_OK && mojo_result != MOJO_RESULT_SHOULD_WAIT) {
330 if (mojo_result != MOJO_RESULT_SHOULD_WAIT) { 331 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:"
331 DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:" 332 << mojo_result;
332 << mojo_result; 333 OnFatalError(MOJO_PIPE_ERROR);
333 OnFatalError(MOJO_PIPE_ERROR);
334 }
335 return; 334 return;
336 } 335 }
337 336
337 write_watcher_.ArmOrNotify();
338 if (mojo_result != MOJO_RESULT_OK)
339 return;
340
338 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes, 341 stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes,
339 pending_frame_.size()); 342 pending_frame_.size());
340 current_pending_frame_offset_ += num_bytes; 343 current_pending_frame_offset_ += num_bytes;
341 bytes_written_to_pipe_ += num_bytes; 344 bytes_written_to_pipe_ += num_bytes;
342 345
343 // Checks if all buffer was written to browser process. 346 // Checks if all buffer was written to browser process.
344 if (current_pending_frame_offset_ != pending_frame_.size()) { 347 if (current_pending_frame_offset_ != pending_frame_.size()) {
345 // Returns and wait for mojo watcher to notify to write more data. 348 // Returns and wait for mojo watcher to notify to write more data.
346 return; 349 return;
347 } 350 }
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
434 if (write_watcher_.IsWatching()) { 437 if (write_watcher_.IsWatching()) {
435 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher"; 438 DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher";
436 write_watcher_.Cancel(); 439 write_watcher_.Cancel();
437 } 440 }
438 441
439 base::ResetAndReturn(&error_callback_).Run(stop_trigger); 442 base::ResetAndReturn(&error_callback_).Run(stop_trigger);
440 } 443 }
441 444
442 } // namespace remoting 445 } // namespace remoting
443 } // namespace media 446 } // namespace media
OLDNEW
« no previous file with comments | « media/remoting/demuxer_stream_adapter.h ('k') | mojo/android/javatests/src/org/chromium/mojo/system/impl/WatcherImplTest.java » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698