Chromium Code Reviews| Index: services/media/framework/engine.cc |
| diff --git a/services/media/framework/engine.cc b/services/media/framework/engine.cc |
| index 980440cdfeb95867d748853f71e8896c6ed76b5a..8072d42ab2588fad5280f0f2bd94b7dc7970d45a 100644 |
| --- a/services/media/framework/engine.cc |
| +++ b/services/media/framework/engine.cc |
| @@ -7,228 +7,95 @@ |
| namespace mojo { |
| namespace media { |
| -uint32_t Engine::Part::input_count() { |
| - DCHECK(stage_ != nullptr); |
| - return stage_->input_count(); |
| -} |
| - |
| -Engine::Input Engine::Part::input(uint32_t index) { |
| - DCHECK(stage_ != nullptr && index < stage_->input_count()); |
| - return Input(stage_, index); |
| -} |
| - |
| -Engine::Input Engine::Part::input() { |
| - DCHECK(stage_ != nullptr && stage_->input_count() == 1); |
| - return Input(stage_, 0); |
| -} |
| - |
| -uint32_t Engine::Part::output_count() { |
| - DCHECK(stage_ != nullptr); |
| - return stage_->output_count(); |
| -} |
| - |
| -Engine::Output Engine::Part::output(uint32_t index) { |
| - DCHECK(stage_ != nullptr && index < stage_->output_count()); |
| - return Output(stage_, index); |
| -} |
| - |
| -Engine::Output Engine::Part::output() { |
| - DCHECK(stage_ != nullptr && stage_->output_count() == 1); |
| - return Output(stage_, 0); |
| -} |
| - |
| -Engine::Part Engine::Part::upstream_part(uint32_t index) { |
| - DCHECK(stage_ != nullptr && index < stage_->input_count()); |
| - return Part(stage_->input(index).upstream_stage()); |
| -} |
| - |
| -Engine::Part Engine::Part::upstream_part() { |
| - DCHECK(stage_ != nullptr && stage_->input_count() == 1); |
| - return Part(stage_->input(0).upstream_stage()); |
| -} |
| - |
| -Engine::Part Engine::Part::downstream_part(uint32_t index) { |
| - DCHECK(stage_ != nullptr && index < stage_->output_count()); |
| - return Part(stage_->output(index).downstream_stage()); |
| -} |
| - |
| -Engine::Part Engine::Part::downstream_part() { |
| - DCHECK(stage_ != nullptr && stage_->output_count() == 1); |
| - return Part(stage_->output(0).downstream_stage()); |
| -} |
| - |
| -Engine::Engine() { |
| - update_function_ = [this](Stage* stage) { |
| - DCHECK(stage); |
| - base::AutoLock lock(lock_); |
| - UpdateUnsafe(stage); |
| - UpdateUnsafe(); |
| - }; |
| -} |
| +Engine::Engine() {} |
| Engine::~Engine() { |
| - Reset(); |
| -} |
| - |
| -void Engine::RemovePart(Part part) { |
| - DCHECK(part); |
| base::AutoLock lock(lock_); |
| - RemoveUnsafe(part.stage_); |
| } |
| -Engine::Part Engine::Connect(Output output, Input input) { |
| - DCHECK(output); |
| - DCHECK(input); |
| - |
| +void Engine::PrepareInput(const InputRef& input_ref) { |
| base::AutoLock lock(lock_); |
| - if (output.connected()) { |
| - DisconnectOutputUnsafe(output.stage_, output.index_); |
| - } |
| - if (input.connected()) { |
| - DisconnectInputUnsafe(input.stage_, input.index_); |
| - } |
| - |
| - output.stage_output().connect(input.stage_, input.index_); |
| - input.stage_input().connect(output.stage_, output.index_); |
| - |
| - return input.part(); |
| -} |
| - |
| -Engine::Part Engine::ConnectParts(Part upstream_part, Part downstream_part) { |
| - DCHECK(upstream_part); |
| - DCHECK(downstream_part); |
| - Connect(upstream_part.output(), downstream_part.input()); |
| - return downstream_part; |
| -} |
| - |
| -Engine::Part Engine::ConnectOutputToPart( |
| - Output output, |
| - Part downstream_part) { |
| - DCHECK(output); |
| - DCHECK(downstream_part); |
| - Connect(output, downstream_part.input()); |
| - return downstream_part; |
| -} |
| - |
| -Engine::Part Engine::ConnectPartToInput(Part upstream_part, Input input) { |
| - DCHECK(upstream_part); |
| - DCHECK(input); |
| - Connect(upstream_part.output(), input); |
| - return input.part(); |
| -} |
| + std::queue<InputRef> backlog; |
| + backlog.push(input_ref); |
| -void Engine::DisconnectOutput(Output output) { |
| - DCHECK(output); |
| + while (!backlog.empty()) { |
| + InputRef input = backlog.front(); |
| + backlog.pop(); |
| + DCHECK(input.valid()); |
| + DCHECK(input.connected()); |
| + DCHECK(!input.actual().prepared()); |
| - base::AutoLock lock(lock_); |
| - DisconnectOutputUnsafe(output.stage_, output.index_); |
| -} |
| + PayloadAllocator* allocator = input.stage_->PrepareInput(input.index_); |
| + input.actual().set_prepared(true); |
| -void Engine::DisconnectInput(Input input) { |
| - DCHECK(input); |
| + const OutputRef& output = input.mate(); |
| - base::AutoLock lock(lock_); |
| - DisconnectInputUnsafe(input.stage_, input.index_); |
| + output.stage_->PrepareOutput( |
| + output.index_, |
| + allocator, |
| + [this, &output, &backlog](size_t input_index) { |
|
johngro
2016/02/08 22:33:36
I don't think that you are using your captured 'th
dalesat
2016/02/09 00:34:11
I've changed this a bit to use a standard visitor.
|
| + backlog.push(InputRef(output.stage_, input_index)); |
| + }); |
| + } |
| } |
| -void Engine::RemovePartsConnectedToPart(Part part) { |
| - DCHECK(part); |
| - |
| +void Engine::UnprepareInput(const InputRef& input_ref) { |
| base::AutoLock lock(lock_); |
| - std::deque<Part> to_remove { part }; |
| - |
| - while (!to_remove.empty()) { |
| - Part part = to_remove.front(); |
| - to_remove.pop_front(); |
| + std::queue<InputRef> backlog; |
|
johngro
2016/02/08 22:33:36
backlog seems to be essentially unused here. In P
dalesat
2016/02/09 00:34:11
Done.
|
| + backlog.push(input_ref); |
| - for (uint32_t i = 0; i < part.input_count(); ++i) { |
| - to_remove.push_back(part.upstream_part(i)); |
| - } |
| + while (!backlog.empty()) { |
| + InputRef input = backlog.front(); |
| + backlog.pop(); |
| + DCHECK(input.valid()); |
| + DCHECK(input.connected()); |
| + DCHECK(input.actual().prepared()); |
| - for (uint32_t i = 0; i < part.output_count(); ++i) { |
| - to_remove.push_back(part.downstream_part(i)); |
| - } |
| + input.stage_->UnprepareInput(input.index_); |
| + input.actual().set_prepared(false); |
| - RemoveUnsafe(part.stage_); |
| + input.mate().stage_->UnprepareOutput(input.mate().index_); |
| } |
| } |
| -void Engine::RemovePartsConnectedToOutput(Output output) { |
| - DCHECK(output); |
| - |
| - if (!output.connected()) { |
| - return; |
| - } |
| - |
| - Part downstream_part = output.downstream_part(); |
| - DisconnectOutput(output); |
| - RemovePartsConnectedToPart(downstream_part); |
| -} |
| +void Engine::FlushOutput(const OutputRef& output_ref) { |
| + base::AutoLock lock(lock_); |
| -void Engine::RemovePartsConnectedToInput(Input input) { |
| - DCHECK(input); |
| + std::queue<OutputRef> backlog; |
| + backlog.push(output_ref); |
| - if (!input.connected()) { |
| - return; |
| - } |
| + while (!backlog.empty()) { |
| + OutputRef output = backlog.front(); |
| + backlog.pop(); |
| + DCHECK(output.valid()); |
| + DCHECK(output.connected()); |
| + DCHECK(output.mate().actual().prepared()); |
| - Part upstream_part = input.upstream_part(); |
| - DisconnectInput(input); |
| - RemovePartsConnectedToPart(upstream_part); |
| -} |
| + output.stage_->FlushOutput(output.index_); |
| -void Engine::Prepare() { |
| - base::AutoLock lock(lock_); |
| - for (Stage* sink : sinks_) { |
| - sink->Prepare(update_function_); |
| - sink->prepared_ = true; |
| - uint32_t input_count = sink->input_count(); |
| - for (uint32_t input_index = 0; input_index < input_count; input_index++) { |
| - MaybePrepareUnsafe(sink->input(input_index).upstream_stage()); |
| - } |
| - } |
| -} |
| + const InputRef& input = output.mate(); |
| -void Engine::Prepare(Part part) { |
| - DCHECK(part); |
| - base::AutoLock lock(lock_); |
| - MaybePrepareUnsafe(part.stage_); |
| -} |
| - |
| -void Engine::PrimeSinks() { |
| - lock_.Acquire(); |
| - std::list<Stage*> sinks(sinks_); |
| - lock_.Release(); |
| - |
| - // TODO(dalesat): Threading issue: these sinks may go away during priming. |
| - for (Stage* sink : sinks) { |
| - sink->Prime(); |
| + input.stage_->FlushInput( |
| + input.index_, |
| + [this, &input, &backlog](size_t output_index) { |
|
johngro
2016/02/08 22:33:36
same comment as before. 'this' is unused and it i
|
| + backlog.push(OutputRef(input.stage_, output_index)); |
| + }); |
| } |
| } |
| -void Engine::Reset() { |
| +void Engine::RequestUpdate(Stage* stage) { |
| + DCHECK(stage); |
| base::AutoLock lock(lock_); |
| - while (!supply_backlog_.empty()) { |
| - supply_backlog_.pop(); |
| - } |
| - while (!demand_backlog_.empty()) { |
| - demand_backlog_.pop(); |
| - } |
| - sources_.clear(); |
| - sinks_.clear(); |
| - while (!stages_.empty()) { |
| - Stage* stage = stages_.front(); |
| - stages_.pop_front(); |
| - delete stage; |
| - } |
| + Update(stage); |
| + Update(); |
| } |
| -void Engine::PushToSupplyBacklogUnsafe(Stage* stage) { |
| +void Engine::PushToSupplyBacklog(Stage* stage) { |
| lock_.AssertAcquired(); |
| - |
| DCHECK(stage); |
| + |
| packets_produced_ = true; |
| if (!stage->in_supply_backlog_) { |
| supply_backlog_.push(stage); |
| @@ -236,150 +103,29 @@ void Engine::PushToSupplyBacklogUnsafe(Stage* stage) { |
| } |
| } |
| -void Engine::PushToDemandBacklogUnsafe(Stage* stage) { |
| +void Engine::PushToDemandBacklog(Stage* stage) { |
| lock_.AssertAcquired(); |
| - |
| DCHECK(stage); |
| + |
| if (!stage->in_demand_backlog_) { |
| demand_backlog_.push(stage); |
| stage->in_demand_backlog_ = true; |
| } |
| } |
| -Engine::Part Engine::Add(Stage* stage) { |
| - base::AutoLock lock(lock_); |
| - |
| - stages_.push_back(stage); |
| - if (stage->input_count() == 0) { |
| - sources_.push_back(stage); |
| - } |
| - if (stage->output_count() == 0) { |
| - sinks_.push_back(stage); |
| - } |
| - return Part(stage); |
| -} |
| - |
| -void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) { |
| - DCHECK(stage); |
| - DCHECK(index < stage->output_count()); |
| - |
| - lock_.AssertAcquired(); |
| - |
| - StageOutput& stage_output = stage->output(index); |
| - |
| - if (stage_output.downstream_stage() == nullptr) { |
| - return; |
| - } |
| - |
| - stage_output.mate().disconnect(); |
| - stage_output.disconnect(); |
| -} |
| - |
| -void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) { |
| - DCHECK(stage); |
| - DCHECK(index < stage->input_count()); |
| - |
| - lock_.AssertAcquired(); |
| - |
| - StageInput& stage_input = stage->input(index); |
| - |
| - if (stage_input.upstream_stage() == nullptr) { |
| - return; |
| - } |
| - |
| - stage_input.mate().disconnect(); |
| - stage_input.disconnect(); |
| -} |
| - |
| -void Engine::RemoveUnsafe(Stage* stage) { |
| - DCHECK(stage); |
| - |
| - lock_.AssertAcquired(); |
| - |
| - uint32_t input_count = stage->input_count(); |
| - for (uint32_t input_index = 0; input_index < input_count; input_index++) { |
| - if (stage->input(input_index).connected()) { |
| - DisconnectInputUnsafe(stage, input_index); |
| - } |
| - } |
| - |
| - uint32_t output_count = stage->output_count(); |
| - for (uint32_t output_index = 0; output_index < output_count; output_index++) { |
| - if (stage->output(output_index).connected()) { |
| - DisconnectOutputUnsafe(stage, output_index); |
| - } |
| - } |
| - |
| - sources_.remove(stage); |
| - sinks_.remove(stage); |
| - stages_.remove(stage); |
| - delete stage; |
| -} |
| - |
| -// static |
| -Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) { |
| - return new DistributorStage(source); |
| -} |
| - |
| -// static |
| -Stage* Engine::CreateStage(PacketTransformPtr transform) { |
| - return new PacketTransformStage(transform); |
| -} |
| - |
| -// static |
| -Stage* Engine::CreateStage(ActiveSourcePtr source) { |
| - return new ActiveSourceStage(source); |
| -} |
| - |
| -// static |
| -Stage* Engine::CreateStage(ActiveSinkPtr sink) { |
| - return new ActiveSinkStage(sink); |
| -} |
| - |
| -// static |
| -Stage* Engine::CreateStage(LpcmTransformPtr transform) { |
| - return new LpcmTransformStage(transform); |
| -} |
| - |
| -void Engine::MaybePrepareUnsafe(Stage* stage) { |
| - lock_.AssertAcquired(); |
| - |
| - if (stage == nullptr || stage->prepared_) { |
| - return; |
| - } |
| - |
| - // Make sure all downstream stages have been prepared. |
| - uint32_t output_count = stage->output_count(); |
| - for (uint32_t output_index = 0; output_index < output_count; output_index++) { |
| - StageOutput& output = stage->output(output_index); |
| - if (output.connected() && !output.downstream_stage()->prepared()) { |
| - return; |
| - } |
| - } |
| - |
| - stage->Prepare(update_function_); |
| - stage->prepared_ = true; |
| - |
| - // Prepare all upstream stages. |
| - uint32_t input_count = stage->input_count(); |
| - for (uint32_t input_index = 0; input_index < input_count; input_index++) { |
| - MaybePrepareUnsafe(stage->input(input_index).upstream_stage()); |
| - } |
| -} |
| - |
| -void Engine::UpdateUnsafe() { |
| +void Engine::Update() { |
| lock_.AssertAcquired(); |
| while (true) { |
| - Stage* stage = PopFromSupplyBacklogUnsafe(); |
| + Stage* stage = PopFromSupplyBacklog(); |
| if (stage != nullptr) { |
| - UpdateUnsafe(stage); |
| + Update(stage); |
| continue; |
| } |
| - stage = PopFromDemandBacklogUnsafe(); |
| + stage = PopFromDemandBacklog(); |
| if (stage != nullptr) { |
| - UpdateUnsafe(stage); |
| + Update(stage); |
| continue; |
| } |
| @@ -387,7 +133,7 @@ void Engine::UpdateUnsafe() { |
| } |
| } |
| -void Engine::UpdateUnsafe(Stage *stage) { |
| +void Engine::Update(Stage *stage) { |
| lock_.AssertAcquired(); |
| DCHECK(stage); |
| @@ -398,11 +144,11 @@ void Engine::UpdateUnsafe(Stage *stage) { |
| // If the stage produced packets, it may need to reevaluate demand later. |
| if (packets_produced_) { |
| - PushToDemandBacklogUnsafe(stage); |
| + PushToDemandBacklog(stage); |
| } |
| } |
| -Stage* Engine::PopFromSupplyBacklogUnsafe() { |
| +Stage* Engine::PopFromSupplyBacklog() { |
| lock_.AssertAcquired(); |
| if (supply_backlog_.empty()) { |
| @@ -416,7 +162,7 @@ Stage* Engine::PopFromSupplyBacklogUnsafe() { |
| return stage; |
| } |
| -Stage* Engine::PopFromDemandBacklogUnsafe() { |
| +Stage* Engine::PopFromDemandBacklog() { |
| lock_.AssertAcquired(); |
| if (demand_backlog_.empty()) { |