Index: services/media/framework/stages/active_multistream_sink_stage.cc |
diff --git a/services/media/framework/stages/active_multistream_sink_stage.cc b/services/media/framework/stages/active_multistream_sink_stage.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..9cd13412573433a49b5e1b567f4bc2172c6336ad |
--- /dev/null |
+++ b/services/media/framework/stages/active_multistream_sink_stage.cc |
@@ -0,0 +1,179 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "services/media/framework/stages/active_multistream_sink_stage.h" |
+ |
+namespace mojo { |
+namespace media { |
+ |
+ActiveMultistreamSinkStage::ActiveMultistreamSinkStage( |
+ std::shared_ptr<ActiveMultistreamSink> sink) : sink_(sink) { |
+ DCHECK(sink_); |
+ sink_->SetHost(this); |
+} |
+ |
+ActiveMultistreamSinkStage::~ActiveMultistreamSinkStage() { |
+ base::AutoLock lock(lock_); |
+} |
+ |
+size_t ActiveMultistreamSinkStage::input_count() const { |
+ base::AutoLock lock(lock_); |
+ return inputs_.size(); |
+}; |
+ |
+Input& ActiveMultistreamSinkStage::input(size_t index) { |
+ base::AutoLock lock(lock_); |
+ DCHECK_LT(index, inputs_.size()); |
johngro
2016/02/08 22:33:37
DCHECK(inputs_[index].allocated_);
?
dalesat
2016/02/09 00:34:11
Hmmm. Seems like we should allow that. Some code m
johngro
2016/02/09 21:00:36
I'm really not comfortable with this. When an inp
dalesat
2016/02/10 21:37:24
Understood. We should discuss.
Adding a TODO
|
+ return inputs_[index]->input_; |
+} |
+ |
+size_t ActiveMultistreamSinkStage::output_count() const { |
+ return 0; |
+} |
+ |
+Output& ActiveMultistreamSinkStage::output(size_t index) { |
+ LOG(ERROR) << "output requested from sink"; |
+ static Output result; |
+ return result; |
+} |
+ |
+PayloadAllocator* ActiveMultistreamSinkStage::PrepareInput(size_t index) { |
+ return nullptr; |
+} |
+ |
+void ActiveMultistreamSinkStage::PrepareOutput( |
+ size_t index, |
+ PayloadAllocator* allocator, |
+ const PrepareCallback& callback) { |
+ LOG(ERROR) << "PrepareOutput called on sink"; |
+} |
+ |
+void ActiveMultistreamSinkStage::Prime() { |
+ DCHECK(sink_); |
+ sink_->Prime(); |
+} |
+ |
+void ActiveMultistreamSinkStage::Update(Engine* engine) { |
+ DCHECK(engine); |
+ DCHECK(sink_); |
+ |
+ base::AutoLock lock(lock_); |
+ |
+ for (auto iter = pending_inputs_.begin(); iter != pending_inputs_.end(); ) { |
+ StageInput* input = *iter; |
+ if (input->input_.packet_from_upstream()) { |
+ input->demand_ = sink_->SupplyPacket( |
+ input->index_, |
+ std::move(input->input_.packet_from_upstream())); |
+ |
+ if (input->demand_ == Demand::kNegative) { |
+ auto remove_iter = iter; |
+ ++iter; |
+ pending_inputs_.erase(remove_iter); |
+ } |
+ } else { |
+ ++iter; |
+ } |
+ |
+ input->input_.SetDemand(input->demand_, engine); |
+ } |
+} |
+ |
+void ActiveMultistreamSinkStage::FlushInput( |
+ size_t index, |
+ const FlushCallback& callback) { |
+ DCHECK(sink_); |
+ |
+ sink_->Flush(); |
+ |
+ base::AutoLock lock(lock_); |
+ inputs_[index]->demand_ = Demand::kNegative; |
+ inputs_[index]->input_.Flush(); |
+ |
+ pending_inputs_.remove(inputs_[index]); |
+} |
+ |
+void ActiveMultistreamSinkStage::FlushOutput(size_t index) { |
+ LOG(ERROR) << "FlushOutput called on sink"; |
+} |
+ |
+size_t ActiveMultistreamSinkStage::AllocateInput() { |
+ base::AutoLock lock(lock_); |
+ |
+ StageInput* input; |
+ if (unallocated_inputs_.empty()) { |
+ input = new StageInput(inputs_.size()); |
+ inputs_.emplace_back(input); |
+ } else { |
+ input = unallocated_inputs_.front(); |
+ // Allocate lowest indices first. |
+ unallocated_inputs_.pop_front(); |
+ } |
+ |
+ input->allocated_ = true; |
+ |
+ return input->index_; |
+} |
+ |
+size_t ActiveMultistreamSinkStage::ReleaseInput(size_t index) { |
+ base::AutoLock lock(lock_); |
+ DCHECK(index < inputs_.size()); |
+ |
+ StageInput* input = inputs_[index]; |
+ DCHECK(input); |
+ DCHECK(input->allocated_); |
+ DCHECK(!input->input_.connected()); |
+ |
+ input->allocated_ = false; |
+ |
+ // Pop input if it's at the end of inputs_. Otherwise, insert it into |
+ // unallocated_inputs_ in index order. |
johngro
2016/02/08 22:33:37
ok, so I am a bit confused about the intent behind
dalesat
2016/02/09 00:34:11
Q1) Yes, oversight.
Q2) Yes, also oversight. I thi
|
+ if (index == inputs_.size() - 1) { |
+ while (!unallocated_inputs_.empty() && |
+ unallocated_inputs_.back()->index_ == inputs_.size() - 1) { |
+ unallocated_inputs_.pop_back(); |
+ inputs_.pop_back(); |
+ } |
+ } else { |
+ AddToUnallocatedUnsafe(input); |
+ } |
+ |
+ return inputs_.size(); |
+} |
+ |
+void ActiveMultistreamSinkStage::UpdateDemand( |
+ size_t input_index, |
+ Demand demand) { |
+ lock_.Acquire(); |
+ DCHECK(input_index < inputs_.size()); |
+ DCHECK(demand != Demand::kNegative); |
+ |
+ StageInput* input = inputs_[input_index]; |
+ DCHECK(input); |
+ input->demand_ = demand; |
+ pending_inputs_.push_back(input); |
+ lock_.Release(); |
+ RequestUpdate(); |
+} |
+ |
+void ActiveMultistreamSinkStage::AddToUnallocatedUnsafe(StageInput* input) { |
+ DCHECK(input); |
+ |
+ if (unallocated_inputs_.empty() || |
+ input->index_ > unallocated_inputs_.back()->index_) { |
+ unallocated_inputs_.push_back(input); |
+ } else { |
+ for (auto iter = unallocated_inputs_.begin(); |
+ iter != unallocated_inputs_.end(); |
+ ++iter) { |
johngro
2016/02/08 22:33:37
if you are going to keep this list<> and not turn
dalesat
2016/02/09 00:34:11
Acknowledged.
|
+ if ((*iter)->index_ > input->index_) { |
+ unallocated_inputs_.insert(iter, input); |
+ break; |
+ } |
+ } |
+ } |
+} |
+ |
+} // namespace media |
+} // namespace mojo |