Chromium Code Reviews| Index: services/media/framework/stages/active_multistream_sink_stage.cc |
| diff --git a/services/media/framework/stages/active_multistream_sink_stage.cc b/services/media/framework/stages/active_multistream_sink_stage.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..9cd13412573433a49b5e1b567f4bc2172c6336ad |
| --- /dev/null |
| +++ b/services/media/framework/stages/active_multistream_sink_stage.cc |
| @@ -0,0 +1,179 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "services/media/framework/stages/active_multistream_sink_stage.h" |
| + |
| +namespace mojo { |
| +namespace media { |
| + |
| +ActiveMultistreamSinkStage::ActiveMultistreamSinkStage( |
| + std::shared_ptr<ActiveMultistreamSink> sink) : sink_(sink) { |
| + DCHECK(sink_); |
| + sink_->SetHost(this); |
| +} |
| + |
| +ActiveMultistreamSinkStage::~ActiveMultistreamSinkStage() { |
| + base::AutoLock lock(lock_); |
| +} |
| + |
| +size_t ActiveMultistreamSinkStage::input_count() const { |
| + base::AutoLock lock(lock_); |
| + return inputs_.size(); |
| +}; |
| + |
| +Input& ActiveMultistreamSinkStage::input(size_t index) { |
| + base::AutoLock lock(lock_); |
| + DCHECK_LT(index, inputs_.size()); |
|
johngro
2016/02/08 22:33:37
DCHECK(inputs_[index].allocated_);
?
dalesat
2016/02/09 00:34:11
Hmmm. Seems like we should allow that. Some code m
johngro
2016/02/09 21:00:36
I'm really not comfortable with this. When an inp
dalesat
2016/02/10 21:37:24
Understood. We should discuss.
Adding a TODO
|
| + return inputs_[index]->input_; |
| +} |
| + |
| +size_t ActiveMultistreamSinkStage::output_count() const { |
| + return 0; |
| +} |
| + |
| +Output& ActiveMultistreamSinkStage::output(size_t index) { |
| + LOG(ERROR) << "output requested from sink"; |
| + static Output result; |
| + return result; |
| +} |
| + |
| +PayloadAllocator* ActiveMultistreamSinkStage::PrepareInput(size_t index) { |
| + return nullptr; |
| +} |
| + |
| +void ActiveMultistreamSinkStage::PrepareOutput( |
| + size_t index, |
| + PayloadAllocator* allocator, |
| + const PrepareCallback& callback) { |
| + LOG(ERROR) << "PrepareOutput called on sink"; |
| +} |
| + |
| +void ActiveMultistreamSinkStage::Prime() { |
| + DCHECK(sink_); |
| + sink_->Prime(); |
| +} |
| + |
| +void ActiveMultistreamSinkStage::Update(Engine* engine) { |
| + DCHECK(engine); |
| + DCHECK(sink_); |
| + |
| + base::AutoLock lock(lock_); |
| + |
| + for (auto iter = pending_inputs_.begin(); iter != pending_inputs_.end(); ) { |
| + StageInput* input = *iter; |
| + if (input->input_.packet_from_upstream()) { |
| + input->demand_ = sink_->SupplyPacket( |
| + input->index_, |
| + std::move(input->input_.packet_from_upstream())); |
| + |
| + if (input->demand_ == Demand::kNegative) { |
| + auto remove_iter = iter; |
| + ++iter; |
| + pending_inputs_.erase(remove_iter); |
| + } |
| + } else { |
| + ++iter; |
| + } |
| + |
| + input->input_.SetDemand(input->demand_, engine); |
| + } |
| +} |
| + |
| +void ActiveMultistreamSinkStage::FlushInput( |
| + size_t index, |
| + const FlushCallback& callback) { |
| + DCHECK(sink_); |
| + |
| + sink_->Flush(); |
| + |
| + base::AutoLock lock(lock_); |
| + inputs_[index]->demand_ = Demand::kNegative; |
| + inputs_[index]->input_.Flush(); |
| + |
| + pending_inputs_.remove(inputs_[index]); |
| +} |
| + |
| +void ActiveMultistreamSinkStage::FlushOutput(size_t index) { |
| + LOG(ERROR) << "FlushOutput called on sink"; |
| +} |
| + |
| +size_t ActiveMultistreamSinkStage::AllocateInput() { |
| + base::AutoLock lock(lock_); |
| + |
| + StageInput* input; |
| + if (unallocated_inputs_.empty()) { |
| + input = new StageInput(inputs_.size()); |
| + inputs_.emplace_back(input); |
| + } else { |
| + input = unallocated_inputs_.front(); |
| + // Allocate lowest indices first. |
| + unallocated_inputs_.pop_front(); |
| + } |
| + |
| + input->allocated_ = true; |
| + |
| + return input->index_; |
| +} |
| + |
| +size_t ActiveMultistreamSinkStage::ReleaseInput(size_t index) { |
| + base::AutoLock lock(lock_); |
| + DCHECK(index < inputs_.size()); |
| + |
| + StageInput* input = inputs_[index]; |
| + DCHECK(input); |
| + DCHECK(input->allocated_); |
| + DCHECK(!input->input_.connected()); |
| + |
| + input->allocated_ = false; |
| + |
| + // Pop input if it's at the end of inputs_. Otherwise, insert it into |
| + // unallocated_inputs_ in index order. |
|
johngro
2016/02/08 22:33:37
ok, so I am a bit confused about the intent behind
dalesat
2016/02/09 00:34:11
Q1) Yes, oversight.
Q2) Yes, also oversight. I thi
|
| + if (index == inputs_.size() - 1) { |
| + while (!unallocated_inputs_.empty() && |
| + unallocated_inputs_.back()->index_ == inputs_.size() - 1) { |
| + unallocated_inputs_.pop_back(); |
| + inputs_.pop_back(); |
| + } |
| + } else { |
| + AddToUnallocatedUnsafe(input); |
| + } |
| + |
| + return inputs_.size(); |
| +} |
| + |
| +void ActiveMultistreamSinkStage::UpdateDemand( |
| + size_t input_index, |
| + Demand demand) { |
| + lock_.Acquire(); |
| + DCHECK(input_index < inputs_.size()); |
| + DCHECK(demand != Demand::kNegative); |
| + |
| + StageInput* input = inputs_[input_index]; |
| + DCHECK(input); |
| + input->demand_ = demand; |
| + pending_inputs_.push_back(input); |
| + lock_.Release(); |
| + RequestUpdate(); |
| +} |
| + |
| +void ActiveMultistreamSinkStage::AddToUnallocatedUnsafe(StageInput* input) { |
| + DCHECK(input); |
| + |
| + if (unallocated_inputs_.empty() || |
| + input->index_ > unallocated_inputs_.back()->index_) { |
| + unallocated_inputs_.push_back(input); |
| + } else { |
| + for (auto iter = unallocated_inputs_.begin(); |
| + iter != unallocated_inputs_.end(); |
| + ++iter) { |
|
johngro
2016/02/08 22:33:37
if you are going to keep this list<> and not turn
dalesat
2016/02/09 00:34:11
Acknowledged.
|
| + if ((*iter)->index_ > input->index_) { |
| + unallocated_inputs_.insert(iter, input); |
| + break; |
| + } |
| + } |
| + } |
| +} |
| + |
| +} // namespace media |
| +} // namespace mojo |