| Index: services/media/framework/engine.cc
|
| diff --git a/services/media/framework/engine.cc b/services/media/framework/engine.cc
|
| index 980440cdfeb95867d748853f71e8896c6ed76b5a..6c76e0b248147762535d7e86de0b05800745978f 100644
|
| --- a/services/media/framework/engine.cc
|
| +++ b/services/media/framework/engine.cc
|
| @@ -7,228 +7,60 @@
|
| namespace mojo {
|
| namespace media {
|
|
|
| -uint32_t Engine::Part::input_count() {
|
| - DCHECK(stage_ != nullptr);
|
| - return stage_->input_count();
|
| -}
|
| -
|
| -Engine::Input Engine::Part::input(uint32_t index) {
|
| - DCHECK(stage_ != nullptr && index < stage_->input_count());
|
| - return Input(stage_, index);
|
| -}
|
| -
|
| -Engine::Input Engine::Part::input() {
|
| - DCHECK(stage_ != nullptr && stage_->input_count() == 1);
|
| - return Input(stage_, 0);
|
| -}
|
| -
|
| -uint32_t Engine::Part::output_count() {
|
| - DCHECK(stage_ != nullptr);
|
| - return stage_->output_count();
|
| -}
|
| -
|
| -Engine::Output Engine::Part::output(uint32_t index) {
|
| - DCHECK(stage_ != nullptr && index < stage_->output_count());
|
| - return Output(stage_, index);
|
| -}
|
| -
|
| -Engine::Output Engine::Part::output() {
|
| - DCHECK(stage_ != nullptr && stage_->output_count() == 1);
|
| - return Output(stage_, 0);
|
| -}
|
| -
|
| -Engine::Part Engine::Part::upstream_part(uint32_t index) {
|
| - DCHECK(stage_ != nullptr && index < stage_->input_count());
|
| - return Part(stage_->input(index).upstream_stage());
|
| -}
|
| -
|
| -Engine::Part Engine::Part::upstream_part() {
|
| - DCHECK(stage_ != nullptr && stage_->input_count() == 1);
|
| - return Part(stage_->input(0).upstream_stage());
|
| -}
|
| -
|
| -Engine::Part Engine::Part::downstream_part(uint32_t index) {
|
| - DCHECK(stage_ != nullptr && index < stage_->output_count());
|
| - return Part(stage_->output(index).downstream_stage());
|
| -}
|
| -
|
| -Engine::Part Engine::Part::downstream_part() {
|
| - DCHECK(stage_ != nullptr && stage_->output_count() == 1);
|
| - return Part(stage_->output(0).downstream_stage());
|
| -}
|
| -
|
| -Engine::Engine() {
|
| - update_function_ = [this](Stage* stage) {
|
| - DCHECK(stage);
|
| - base::AutoLock lock(lock_);
|
| - UpdateUnsafe(stage);
|
| - UpdateUnsafe();
|
| - };
|
| -}
|
| +Engine::Engine() {}
|
|
|
| Engine::~Engine() {
|
| - Reset();
|
| -}
|
| -
|
| -void Engine::RemovePart(Part part) {
|
| - DCHECK(part);
|
| - base::AutoLock lock(lock_);
|
| - RemoveUnsafe(part.stage_);
|
| -}
|
| -
|
| -Engine::Part Engine::Connect(Output output, Input input) {
|
| - DCHECK(output);
|
| - DCHECK(input);
|
| -
|
| - base::AutoLock lock(lock_);
|
| -
|
| - if (output.connected()) {
|
| - DisconnectOutputUnsafe(output.stage_, output.index_);
|
| - }
|
| - if (input.connected()) {
|
| - DisconnectInputUnsafe(input.stage_, input.index_);
|
| - }
|
| -
|
| - output.stage_output().connect(input.stage_, input.index_);
|
| - input.stage_input().connect(output.stage_, output.index_);
|
| -
|
| - return input.part();
|
| -}
|
| -
|
| -Engine::Part Engine::ConnectParts(Part upstream_part, Part downstream_part) {
|
| - DCHECK(upstream_part);
|
| - DCHECK(downstream_part);
|
| - Connect(upstream_part.output(), downstream_part.input());
|
| - return downstream_part;
|
| -}
|
| -
|
| -Engine::Part Engine::ConnectOutputToPart(
|
| - Output output,
|
| - Part downstream_part) {
|
| - DCHECK(output);
|
| - DCHECK(downstream_part);
|
| - Connect(output, downstream_part.input());
|
| - return downstream_part;
|
| -}
|
| -
|
| -Engine::Part Engine::ConnectPartToInput(Part upstream_part, Input input) {
|
| - DCHECK(upstream_part);
|
| - DCHECK(input);
|
| - Connect(upstream_part.output(), input);
|
| - return input.part();
|
| -}
|
| -
|
| -void Engine::DisconnectOutput(Output output) {
|
| - DCHECK(output);
|
| -
|
| base::AutoLock lock(lock_);
|
| - DisconnectOutputUnsafe(output.stage_, output.index_);
|
| }
|
|
|
| -void Engine::DisconnectInput(Input input) {
|
| - DCHECK(input);
|
| -
|
| - base::AutoLock lock(lock_);
|
| - DisconnectInputUnsafe(input.stage_, input.index_);
|
| -}
|
| -
|
| -void Engine::RemovePartsConnectedToPart(Part part) {
|
| - DCHECK(part);
|
| -
|
| - base::AutoLock lock(lock_);
|
| -
|
| - std::deque<Part> to_remove { part };
|
| -
|
| - while (!to_remove.empty()) {
|
| - Part part = to_remove.front();
|
| - to_remove.pop_front();
|
| -
|
| - for (uint32_t i = 0; i < part.input_count(); ++i) {
|
| - to_remove.push_back(part.upstream_part(i));
|
| - }
|
| -
|
| - for (uint32_t i = 0; i < part.output_count(); ++i) {
|
| - to_remove.push_back(part.downstream_part(i));
|
| - }
|
| -
|
| - RemoveUnsafe(part.stage_);
|
| - }
|
| -}
|
| -
|
| -void Engine::RemovePartsConnectedToOutput(Output output) {
|
| - DCHECK(output);
|
| -
|
| - if (!output.connected()) {
|
| - return;
|
| - }
|
| -
|
| - Part downstream_part = output.downstream_part();
|
| - DisconnectOutput(output);
|
| - RemovePartsConnectedToPart(downstream_part);
|
| -}
|
| -
|
| -void Engine::RemovePartsConnectedToInput(Input input) {
|
| - DCHECK(input);
|
| -
|
| - if (!input.connected()) {
|
| - return;
|
| - }
|
| -
|
| - Part upstream_part = input.upstream_part();
|
| - DisconnectInput(input);
|
| - RemovePartsConnectedToPart(upstream_part);
|
| -}
|
| -
|
| -void Engine::Prepare() {
|
| - base::AutoLock lock(lock_);
|
| - for (Stage* sink : sinks_) {
|
| - sink->Prepare(update_function_);
|
| - sink->prepared_ = true;
|
| - uint32_t input_count = sink->input_count();
|
| - for (uint32_t input_index = 0; input_index < input_count; input_index++) {
|
| - MaybePrepareUnsafe(sink->input(input_index).upstream_stage());
|
| - }
|
| - }
|
| -}
|
| -
|
| -void Engine::Prepare(Part part) {
|
| - DCHECK(part);
|
| +void Engine::PrepareInput(const InputRef& input) {
|
| + VisitUpstream(
|
| + input,
|
| + [] (const InputRef& input,
|
| + const OutputRef& output,
|
| + const Stage::UpstreamCallback& callback) {
|
| + DCHECK(!input.actual().prepared());
|
| + PayloadAllocator* allocator = input.stage_->PrepareInput(input.index_);
|
| + input.actual().set_prepared(true);
|
| + output.stage_->PrepareOutput(output.index_, allocator, callback);
|
| + });
|
| +}
|
| +
|
| +void Engine::UnprepareInput(const InputRef& input) {
|
| + VisitUpstream(
|
| + input,
|
| + [] (const InputRef& input,
|
| + const OutputRef& output,
|
| + const Stage::UpstreamCallback& callback) {
|
| + DCHECK(input.actual().prepared());
|
| + input.stage_->UnprepareInput(input.index_);
|
| + output.stage_->UnprepareOutput(output.index_, callback);
|
| + });
|
| +}
|
| +
|
| +void Engine::FlushOutput(const OutputRef& output) {
|
| + VisitDownstream(
|
| + output,
|
| + [] (const OutputRef& output,
|
| + const InputRef& input,
|
| + const Stage::DownstreamCallback& callback) {
|
| + DCHECK(input.actual().prepared());
|
| + output.stage_->FlushOutput(output.index_);
|
| + input.stage_->FlushInput(input.index_, callback);
|
| + });
|
| +}
|
| +
|
| +void Engine::RequestUpdate(Stage* stage) {
|
| + DCHECK(stage);
|
| base::AutoLock lock(lock_);
|
| - MaybePrepareUnsafe(part.stage_);
|
| + Update(stage);
|
| + Update();
|
| }
|
|
|
| -void Engine::PrimeSinks() {
|
| - lock_.Acquire();
|
| - std::list<Stage*> sinks(sinks_);
|
| - lock_.Release();
|
| -
|
| - // TODO(dalesat): Threading issue: these sinks may go away during priming.
|
| - for (Stage* sink : sinks) {
|
| - sink->Prime();
|
| - }
|
| -}
|
| -
|
| -void Engine::Reset() {
|
| - base::AutoLock lock(lock_);
|
| - while (!supply_backlog_.empty()) {
|
| - supply_backlog_.pop();
|
| - }
|
| - while (!demand_backlog_.empty()) {
|
| - demand_backlog_.pop();
|
| - }
|
| - sources_.clear();
|
| - sinks_.clear();
|
| - while (!stages_.empty()) {
|
| - Stage* stage = stages_.front();
|
| - stages_.pop_front();
|
| - delete stage;
|
| - }
|
| -}
|
| -
|
| -void Engine::PushToSupplyBacklogUnsafe(Stage* stage) {
|
| +void Engine::PushToSupplyBacklog(Stage* stage) {
|
| lock_.AssertAcquired();
|
| -
|
| DCHECK(stage);
|
| +
|
| packets_produced_ = true;
|
| if (!stage->in_supply_backlog_) {
|
| supply_backlog_.push(stage);
|
| @@ -236,150 +68,81 @@ void Engine::PushToSupplyBacklogUnsafe(Stage* stage) {
|
| }
|
| }
|
|
|
| -void Engine::PushToDemandBacklogUnsafe(Stage* stage) {
|
| +void Engine::PushToDemandBacklog(Stage* stage) {
|
| lock_.AssertAcquired();
|
| -
|
| DCHECK(stage);
|
| +
|
| if (!stage->in_demand_backlog_) {
|
| demand_backlog_.push(stage);
|
| stage->in_demand_backlog_ = true;
|
| }
|
| }
|
|
|
| -Engine::Part Engine::Add(Stage* stage) {
|
| +void Engine::VisitUpstream(
|
| + const InputRef& input,
|
| + const UpstreamVisitor& vistor) {
|
| base::AutoLock lock(lock_);
|
|
|
| - stages_.push_back(stage);
|
| - if (stage->input_count() == 0) {
|
| - sources_.push_back(stage);
|
| - }
|
| - if (stage->output_count() == 0) {
|
| - sinks_.push_back(stage);
|
| - }
|
| - return Part(stage);
|
| -}
|
| -
|
| -void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) {
|
| - DCHECK(stage);
|
| - DCHECK(index < stage->output_count());
|
| + std::queue<InputRef> backlog;
|
| + backlog.push(input);
|
|
|
| - lock_.AssertAcquired();
|
| + while (!backlog.empty()) {
|
| + InputRef input = backlog.front();
|
| + backlog.pop();
|
| + DCHECK(input.valid());
|
| + DCHECK(input.connected());
|
|
|
| - StageOutput& stage_output = stage->output(index);
|
| + const OutputRef& output = input.mate();
|
| + Stage* output_stage = output.stage_;
|
|
|
| - if (stage_output.downstream_stage() == nullptr) {
|
| - return;
|
| + vistor(
|
| + input,
|
| + output,
|
| + [output_stage, &backlog](size_t input_index) {
|
| + backlog.push(InputRef(output_stage, input_index));
|
| + });
|
| }
|
| -
|
| - stage_output.mate().disconnect();
|
| - stage_output.disconnect();
|
| }
|
|
|
| -void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) {
|
| - DCHECK(stage);
|
| - DCHECK(index < stage->input_count());
|
| -
|
| - lock_.AssertAcquired();
|
| -
|
| - StageInput& stage_input = stage->input(index);
|
| -
|
| - if (stage_input.upstream_stage() == nullptr) {
|
| - return;
|
| - }
|
| -
|
| - stage_input.mate().disconnect();
|
| - stage_input.disconnect();
|
| -}
|
| -
|
| -void Engine::RemoveUnsafe(Stage* stage) {
|
| - DCHECK(stage);
|
| -
|
| - lock_.AssertAcquired();
|
| -
|
| - uint32_t input_count = stage->input_count();
|
| - for (uint32_t input_index = 0; input_index < input_count; input_index++) {
|
| - if (stage->input(input_index).connected()) {
|
| - DisconnectInputUnsafe(stage, input_index);
|
| - }
|
| - }
|
| -
|
| - uint32_t output_count = stage->output_count();
|
| - for (uint32_t output_index = 0; output_index < output_count; output_index++) {
|
| - if (stage->output(output_index).connected()) {
|
| - DisconnectOutputUnsafe(stage, output_index);
|
| - }
|
| - }
|
| -
|
| - sources_.remove(stage);
|
| - sinks_.remove(stage);
|
| - stages_.remove(stage);
|
| - delete stage;
|
| -}
|
| -
|
| -// static
|
| -Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) {
|
| - return new DistributorStage(source);
|
| -}
|
| -
|
| -// static
|
| -Stage* Engine::CreateStage(PacketTransformPtr transform) {
|
| - return new PacketTransformStage(transform);
|
| -}
|
| -
|
| -// static
|
| -Stage* Engine::CreateStage(ActiveSourcePtr source) {
|
| - return new ActiveSourceStage(source);
|
| -}
|
| -
|
| -// static
|
| -Stage* Engine::CreateStage(ActiveSinkPtr sink) {
|
| - return new ActiveSinkStage(sink);
|
| -}
|
| -
|
| -// static
|
| -Stage* Engine::CreateStage(LpcmTransformPtr transform) {
|
| - return new LpcmTransformStage(transform);
|
| -}
|
| -
|
| -void Engine::MaybePrepareUnsafe(Stage* stage) {
|
| - lock_.AssertAcquired();
|
| +void Engine::VisitDownstream(
|
| + const OutputRef& output,
|
| + const DownstreamVisitor& vistor) {
|
| + base::AutoLock lock(lock_);
|
|
|
| - if (stage == nullptr || stage->prepared_) {
|
| - return;
|
| - }
|
| + std::queue<OutputRef> backlog;
|
| + backlog.push(output);
|
|
|
| - // Make sure all downstream stages have been prepared.
|
| - uint32_t output_count = stage->output_count();
|
| - for (uint32_t output_index = 0; output_index < output_count; output_index++) {
|
| - StageOutput& output = stage->output(output_index);
|
| - if (output.connected() && !output.downstream_stage()->prepared()) {
|
| - return;
|
| - }
|
| - }
|
| + while (!backlog.empty()) {
|
| + OutputRef output = backlog.front();
|
| + backlog.pop();
|
| + DCHECK(output.valid());
|
| + DCHECK(output.connected());
|
|
|
| - stage->Prepare(update_function_);
|
| - stage->prepared_ = true;
|
| + const InputRef& input = output.mate();
|
| + Stage* input_stage = input.stage_;
|
|
|
| - // Prepare all upstream stages.
|
| - uint32_t input_count = stage->input_count();
|
| - for (uint32_t input_index = 0; input_index < input_count; input_index++) {
|
| - MaybePrepareUnsafe(stage->input(input_index).upstream_stage());
|
| + vistor(
|
| + output,
|
| + input,
|
| + [input_stage, &backlog](size_t output_index) {
|
| + backlog.push(OutputRef(input_stage, output_index));
|
| + });
|
| }
|
| }
|
|
|
| -void Engine::UpdateUnsafe() {
|
| +void Engine::Update() {
|
| lock_.AssertAcquired();
|
|
|
| while (true) {
|
| - Stage* stage = PopFromSupplyBacklogUnsafe();
|
| + Stage* stage = PopFromSupplyBacklog();
|
| if (stage != nullptr) {
|
| - UpdateUnsafe(stage);
|
| + Update(stage);
|
| continue;
|
| }
|
|
|
| - stage = PopFromDemandBacklogUnsafe();
|
| + stage = PopFromDemandBacklog();
|
| if (stage != nullptr) {
|
| - UpdateUnsafe(stage);
|
| + Update(stage);
|
| continue;
|
| }
|
|
|
| @@ -387,7 +150,7 @@ void Engine::UpdateUnsafe() {
|
| }
|
| }
|
|
|
| -void Engine::UpdateUnsafe(Stage *stage) {
|
| +void Engine::Update(Stage *stage) {
|
| lock_.AssertAcquired();
|
|
|
| DCHECK(stage);
|
| @@ -398,11 +161,11 @@ void Engine::UpdateUnsafe(Stage *stage) {
|
|
|
| // If the stage produced packets, it may need to reevaluate demand later.
|
| if (packets_produced_) {
|
| - PushToDemandBacklogUnsafe(stage);
|
| + PushToDemandBacklog(stage);
|
| }
|
| }
|
|
|
| -Stage* Engine::PopFromSupplyBacklogUnsafe() {
|
| +Stage* Engine::PopFromSupplyBacklog() {
|
| lock_.AssertAcquired();
|
|
|
| if (supply_backlog_.empty()) {
|
| @@ -416,7 +179,7 @@ Stage* Engine::PopFromSupplyBacklogUnsafe() {
|
| return stage;
|
| }
|
|
|
| -Stage* Engine::PopFromDemandBacklogUnsafe() {
|
| +Stage* Engine::PopFromDemandBacklog() {
|
| lock_.AssertAcquired();
|
|
|
| if (demand_backlog_.empty()) {
|
|
|