Chromium Code Reviews| Index: services/media/framework/engine.cc |
| diff --git a/services/media/framework/engine.cc b/services/media/framework/engine.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..96bbc7c62699e4be400c691f8d25a9fb71eef051 |
| --- /dev/null |
| +++ b/services/media/framework/engine.cc |
| @@ -0,0 +1,426 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "services/media/framework/engine.h" |
| + |
| +namespace mojo { |
| +namespace media { |
| + |
| +uint32_t Engine::Part::input_count() { |
| + DCHECK(stage_ != nullptr); |
| + return stage_->input_count(); |
| +} |
| + |
| +Engine::Input Engine::Part::input(uint32_t index) { |
| + DCHECK(stage_ != nullptr && index < stage_->input_count()); |
| + return Input(stage_, index); |
| +} |
| + |
| +Engine::Input Engine::Part::input() { |
| + DCHECK(stage_ != nullptr && stage_->input_count() == 1); |
| + return Input(stage_, 0); |
| +} |
| + |
| +uint32_t Engine::Part::output_count() { |
| + DCHECK(stage_ != nullptr); |
| + return stage_->output_count(); |
| +} |
| + |
| +Engine::Output Engine::Part::output(uint32_t index) { |
| + DCHECK(stage_ != nullptr && index < stage_->output_count()); |
| + return Output(stage_, index); |
| +} |
| + |
| +Engine::Output Engine::Part::output() { |
| + DCHECK(stage_ != nullptr && stage_->output_count() == 1); |
| + return Output(stage_, 0); |
| +} |
| + |
| +Engine::Part Engine::Part::upstream_part(uint32_t index) { |
| + DCHECK(stage_ != nullptr && index < stage_->input_count()); |
| + return Part(stage_->input(index).upstream_stage()); |
| +} |
| + |
| +Engine::Part Engine::Part::upstream_part() { |
| + DCHECK(stage_ != nullptr && stage_->input_count() == 1); |
| + return Part(stage_->input(0).upstream_stage()); |
| +} |
| + |
| +Engine::Part Engine::Part::downstream_part(uint32_t index) { |
| + DCHECK(stage_ != nullptr && index < stage_->output_count()); |
| + return Part(stage_->output(index).downstream_stage()); |
| +} |
| + |
| +Engine::Part Engine::Part::downstream_part() { |
| + DCHECK(stage_ != nullptr && stage_->output_count() == 1); |
| + return Part(stage_->output(0).downstream_stage()); |
| +} |
| + |
| +Engine::Engine() { |
| + update_function_ = [this](Stage* stage) { |
| + DCHECK(stage); |
| + base::AutoLock lock(lock_); |
| + UpdateUnsafe(stage); |
| + UpdateUnsafe(); |
| + }; |
| +} |
| + |
| +Engine::~Engine() { |
| + Reset(); |
| +} |
| + |
| +void Engine::Remove(Part part) { |
| + DCHECK(part); |
| + base::AutoLock lock(lock_); |
| + RemoveUnsafe(part.stage_); |
| +} |
| + |
| +Engine::Part Engine::Connect(Output output, Input input) { |
| + DCHECK(output); |
| + DCHECK(input); |
| + |
| + base::AutoLock lock(lock_); |
| + |
| + if (output.connected()) { |
| + DisconnectOutputUnsafe(output.stage_, output.index_); |
| + } |
| + if (input.connected()) { |
| + DisconnectInputUnsafe(input.stage_, input.index_); |
| + } |
| + |
| + output.stage_output().connect( |
| + input.stage_, |
| + input.index_); |
| + |
| + input.stage_input().connect( |
| + output.stage_, |
| + output.index_); |
| + |
| + return input.part(); |
| +} |
| + |
| +Engine::Part Engine::Connect(Part upstream_part, Part downstream_part) { |
| + DCHECK(upstream_part); |
| + DCHECK(downstream_part); |
| + Connect(upstream_part.output(), downstream_part.input()); |
| + return downstream_part; |
| +} |
| + |
| +Engine::Part Engine::Connect( |
| + Output output, |
| + Part downstream_part) { |
| + DCHECK(output); |
| + DCHECK(downstream_part); |
| + Connect(output, downstream_part.input()); |
| + return downstream_part; |
| +} |
| + |
| +Engine::Part Engine::Connect(Part upstream_part, Input input) { |
| + DCHECK(upstream_part); |
| + DCHECK(input); |
| + Connect(upstream_part.output(), input); |
| + return input.part(); |
| +} |
| + |
| +void Engine::Disconnect(Output output) { |
| + DCHECK(output); |
| + |
| + base::AutoLock lock(lock_); |
| + DisconnectOutputUnsafe(output.stage_, output.index_); |
| +} |
| + |
| +void Engine::Disconnect(Input input) { |
| + DCHECK(input); |
| + |
| + base::AutoLock lock(lock_); |
| + DisconnectInputUnsafe(input.stage_, input.index_); |
| +} |
| + |
| +void Engine::RemoveAll(Part part) { |
| + DCHECK(part); |
| + |
| + base::AutoLock lock(lock_); |
| + |
| + std::deque<Part> to_remove { part }; |
| + |
| + while (!to_remove.empty()) { |
| + Part part = to_remove.front(); |
| + to_remove.pop_front(); |
| + |
| + for (uint32_t i = 0; i < part.input_count(); ++i) { |
| + to_remove.push_back(part.upstream_part(i)); |
| + } |
| + |
| + for (uint32_t i = 0; i < part.output_count(); ++i) { |
| + to_remove.push_back(part.downstream_part(i)); |
| + } |
| + |
| + RemoveUnsafe(part.stage_); |
| + } |
| +} |
| + |
| +void Engine::RemoveAll(Output output) { |
| + DCHECK(output); |
| + |
| + if (!output.connected()) { |
| + return; |
| + } |
| + |
| + Part downstream_part = output.downstream_part(); |
| + Disconnect(output); |
| + RemoveAll(downstream_part); |
| +} |
| + |
| +void Engine::RemoveAll(Input input) { |
| + DCHECK(input); |
| + |
| + if (!input.connected()) { |
| + return; |
| + } |
| + |
| + Part upstream_part = input.upstream_part(); |
| + Disconnect(input); |
| + RemoveAll(upstream_part); |
| +} |
| + |
| +void Engine::Prepare() { |
| + base::AutoLock lock(lock_); |
| + for (Stage* sink : sinks_) { |
| + sink->Prepare(update_function_); |
| + sink->prepared_ = true; |
| + uint32_t input_count = sink->input_count(); |
| + for (uint32_t input_index = 0; input_index < input_count; input_index++) { |
| + PrepareUnsafe(sink->input(input_index).upstream_stage()); |
| + } |
| + } |
| +} |
| + |
| +void Engine::Prepare(Part part) { |
| + DCHECK(part); |
| + base::AutoLock lock(lock_); |
| + PrepareUnsafe(part.stage_); |
| +} |
| + |
| +void Engine::PrimeSinks() { |
| + lock_.Acquire(); |
| + std::list<Stage*> sinks(sinks_); |
| + lock_.Release(); |
|
johngro
2016/01/28 19:14:55
so, you made a copy of the sinks_ list which is ho
dalesat
2016/01/29 01:08:29
At this point, we are depending on the kindness of
johngro
2016/02/01 22:38:17
Acknowledged.
|
| + |
| + for (Stage* sink : sinks) { |
| + sink->Prime(); |
| + } |
| +} |
| + |
| +void Engine::Reset() { |
| + base::AutoLock lock(lock_); |
| + supply_backlog_.clear(); |
| + demand_backlog_.clear(); |
| + sources_.clear(); |
| + sinks_.clear(); |
| + while (!stages_.empty()) { |
| + Stage* stage = stages_.front(); |
| + stages_.pop_front(); |
| + delete stage; |
| + } |
| +} |
| + |
| +void Engine::PushToSupplyBacklogUnsafe(Stage* stage) { |
| + lock_.AssertAcquired(); |
| + |
| + DCHECK(stage); |
| + packets_produced_ = true; |
| + if (!stage->in_supply_backlog_) { |
| + // LIFO |
|
johngro
2016/01/27 22:35:22
These comments still seem backwards. Both Pop imp
dalesat
2016/01/28 18:49:17
Thanks for the tip on queue/stack. They don't have
johngro
2016/02/01 22:38:17
Acknowledged.
Re: ordering, this basically boils
dalesat
2016/02/01 23:01:28
Nice trick, thanks!
|
| + supply_backlog_.push_back(stage); |
| + stage->in_supply_backlog_ = true; |
| + } |
| +} |
| + |
| +void Engine::PushToDemandBacklogUnsafe(Stage* stage) { |
| + lock_.AssertAcquired(); |
| + |
| + DCHECK(stage); |
| + if (!stage->in_demand_backlog_) { |
| + // FIFO |
| + demand_backlog_.push_front(stage); |
| + stage->in_demand_backlog_ = true; |
| + } |
| +} |
| + |
| +Engine::Part Engine::Add(Stage* stage) { |
| + base::AutoLock lock(lock_); |
| + |
| + stages_.push_back(stage); |
| + if (stage->input_count() == 0) { |
| + sources_.push_back(stage); |
| + } |
| + if (stage->output_count() == 0) { |
| + sinks_.push_back(stage); |
| + } |
| + return Part(stage); |
| +} |
| + |
| +void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) { |
| + DCHECK(stage); |
| + DCHECK(index < stage->output_count()); |
| + |
| + lock_.AssertAcquired(); |
| + |
| + StageOutput& stage_output = stage->output(index); |
| + |
| + if (stage_output.downstream_stage() == nullptr) { |
| + return; |
| + } |
| + |
| + stage_output.mate().connect(nullptr, 0); |
| + stage_output.connect(nullptr, 0); |
| +} |
| + |
| +void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) { |
| + DCHECK(stage); |
| + DCHECK(index < stage->input_count()); |
| + |
| + lock_.AssertAcquired(); |
| + |
| + StageInput& stage_input = stage->input(index); |
| + |
| + if (stage_input.upstream_stage() == nullptr) { |
| + return; |
| + } |
| + |
| + stage_input.mate().connect(nullptr, 0); |
| + stage_input.connect(nullptr, 0); |
| +} |
| + |
| +void Engine::RemoveUnsafe(Stage* stage) { |
| + DCHECK(stage); |
| + |
| + lock_.AssertAcquired(); |
| + |
| + uint32_t input_count = stage->input_count(); |
| + for (uint32_t input_index = 0; input_index < input_count; input_index++) { |
| + if (stage->input(input_index).connected()) { |
| + DisconnectInputUnsafe(stage, input_index); |
| + } |
| + } |
| + |
| + uint32_t output_count = stage->output_count(); |
| + for (uint32_t output_index = 0; output_index < output_count; output_index++) { |
| + if (stage->output(output_index).connected()) { |
| + DisconnectOutputUnsafe(stage, output_index); |
| + } |
| + } |
| + |
| + sources_.remove(stage); |
| + sinks_.remove(stage); |
| + stages_.remove(stage); |
| + delete stage; |
| +} |
| + |
| +// static |
| +Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) { |
| + return new DistributorStage(source); |
| +} |
| + |
| +// static |
| +Stage* Engine::CreateStage(PacketTransformPtr transform) { |
| + return new PacketTransformStage(transform); |
| +} |
| + |
| +// static |
| +Stage* Engine::CreateStage(ActiveSourcePtr source) { |
| + return new ActiveSourceStage(source); |
| +} |
| + |
| +// static |
| +Stage* Engine::CreateStage(ActiveSinkPtr sink) { |
| + return new ActiveSinkStage(sink); |
| +} |
| + |
| +// static |
| +Stage* Engine::CreateStage(LpcmTransformPtr transform) { |
| + return new LpcmTransformStage(transform); |
| +} |
| + |
| +void Engine::PrepareUnsafe(Stage* stage) { |
| + lock_.AssertAcquired(); |
| + |
| + if (stage == nullptr || stage->prepared_) { |
| + return; |
| + } |
| + |
| + stage->Prepare(update_function_); |
|
johngro
2016/01/28 19:14:55
Following comment applies to UpdateUnsafe as well
dalesat
2016/01/29 01:08:30
The threading model will change dramatically as ti
johngro
2016/02/01 22:38:17
Acknowledged.
|
| + stage->prepared_ = true; |
| + |
| + uint32_t input_count = stage->input_count(); |
| + for (uint32_t input_index = 0; input_index < input_count; input_index++) { |
| + PrepareUnsafe(stage->input(input_index).upstream_stage()); |
| + } |
| +} |
| + |
| +void Engine::UpdateUnsafe() { |
| + lock_.AssertAcquired(); |
| + |
| + while (true) { |
| + Stage* stage = PopFromSupplyBacklogUnsafe(); |
| + if (stage != nullptr) { |
| + UpdateUnsafe(stage); |
| + continue; |
| + } |
| + |
| + stage = PopFromDemandBacklogUnsafe(); |
| + if (stage != nullptr) { |
| + UpdateUnsafe(stage); |
| + continue; |
| + } |
| + |
| + break; |
| + } |
| +} |
| + |
| +void Engine::UpdateUnsafe(Stage *stage) { |
| + lock_.AssertAcquired(); |
| + |
| + DCHECK(stage); |
| + |
| + packets_produced_ = false; |
| + |
| + stage->Update(this); |
| + |
| + // If the stage produced packets, it may need to reevaluate demand later. |
| + if (packets_produced_) { |
| + PushToDemandBacklogUnsafe(stage); |
| + } |
| +} |
| + |
| +Stage* Engine::PopFromSupplyBacklogUnsafe() { |
| + lock_.AssertAcquired(); |
| + |
| + if (supply_backlog_.empty()) { |
| + return nullptr; |
| + } |
| + |
| + Stage* stage = supply_backlog_.front(); |
| + supply_backlog_.pop_front(); |
| + DCHECK(stage->in_supply_backlog_); |
| + stage->in_supply_backlog_ = false; |
| + return stage; |
| +} |
| + |
| +Stage* Engine::PopFromDemandBacklogUnsafe() { |
| + lock_.AssertAcquired(); |
| + |
| + if (demand_backlog_.empty()) { |
| + return nullptr; |
| + } |
| + |
| + Stage* stage = demand_backlog_.front(); |
| + demand_backlog_.pop_front(); |
| + DCHECK(stage->in_demand_backlog_); |
| + stage->in_demand_backlog_ = false; |
| + return stage; |
| +} |
| + |
| +} // namespace media |
| +} // namespace mojo |