Index: services/media/framework/engine.cc |
diff --git a/services/media/framework/engine.cc b/services/media/framework/engine.cc |
index 980440cdfeb95867d748853f71e8896c6ed76b5a..6c76e0b248147762535d7e86de0b05800745978f 100644 |
--- a/services/media/framework/engine.cc |
+++ b/services/media/framework/engine.cc |
@@ -7,228 +7,60 @@ |
namespace mojo { |
namespace media { |
-uint32_t Engine::Part::input_count() { |
- DCHECK(stage_ != nullptr); |
- return stage_->input_count(); |
-} |
- |
-Engine::Input Engine::Part::input(uint32_t index) { |
- DCHECK(stage_ != nullptr && index < stage_->input_count()); |
- return Input(stage_, index); |
-} |
- |
-Engine::Input Engine::Part::input() { |
- DCHECK(stage_ != nullptr && stage_->input_count() == 1); |
- return Input(stage_, 0); |
-} |
- |
-uint32_t Engine::Part::output_count() { |
- DCHECK(stage_ != nullptr); |
- return stage_->output_count(); |
-} |
- |
-Engine::Output Engine::Part::output(uint32_t index) { |
- DCHECK(stage_ != nullptr && index < stage_->output_count()); |
- return Output(stage_, index); |
-} |
- |
-Engine::Output Engine::Part::output() { |
- DCHECK(stage_ != nullptr && stage_->output_count() == 1); |
- return Output(stage_, 0); |
-} |
- |
-Engine::Part Engine::Part::upstream_part(uint32_t index) { |
- DCHECK(stage_ != nullptr && index < stage_->input_count()); |
- return Part(stage_->input(index).upstream_stage()); |
-} |
- |
-Engine::Part Engine::Part::upstream_part() { |
- DCHECK(stage_ != nullptr && stage_->input_count() == 1); |
- return Part(stage_->input(0).upstream_stage()); |
-} |
- |
-Engine::Part Engine::Part::downstream_part(uint32_t index) { |
- DCHECK(stage_ != nullptr && index < stage_->output_count()); |
- return Part(stage_->output(index).downstream_stage()); |
-} |
- |
-Engine::Part Engine::Part::downstream_part() { |
- DCHECK(stage_ != nullptr && stage_->output_count() == 1); |
- return Part(stage_->output(0).downstream_stage()); |
-} |
- |
-Engine::Engine() { |
- update_function_ = [this](Stage* stage) { |
- DCHECK(stage); |
- base::AutoLock lock(lock_); |
- UpdateUnsafe(stage); |
- UpdateUnsafe(); |
- }; |
-} |
+Engine::Engine() {} |
Engine::~Engine() { |
- Reset(); |
-} |
- |
-void Engine::RemovePart(Part part) { |
- DCHECK(part); |
- base::AutoLock lock(lock_); |
- RemoveUnsafe(part.stage_); |
-} |
- |
-Engine::Part Engine::Connect(Output output, Input input) { |
- DCHECK(output); |
- DCHECK(input); |
- |
- base::AutoLock lock(lock_); |
- |
- if (output.connected()) { |
- DisconnectOutputUnsafe(output.stage_, output.index_); |
- } |
- if (input.connected()) { |
- DisconnectInputUnsafe(input.stage_, input.index_); |
- } |
- |
- output.stage_output().connect(input.stage_, input.index_); |
- input.stage_input().connect(output.stage_, output.index_); |
- |
- return input.part(); |
-} |
- |
-Engine::Part Engine::ConnectParts(Part upstream_part, Part downstream_part) { |
- DCHECK(upstream_part); |
- DCHECK(downstream_part); |
- Connect(upstream_part.output(), downstream_part.input()); |
- return downstream_part; |
-} |
- |
-Engine::Part Engine::ConnectOutputToPart( |
- Output output, |
- Part downstream_part) { |
- DCHECK(output); |
- DCHECK(downstream_part); |
- Connect(output, downstream_part.input()); |
- return downstream_part; |
-} |
- |
-Engine::Part Engine::ConnectPartToInput(Part upstream_part, Input input) { |
- DCHECK(upstream_part); |
- DCHECK(input); |
- Connect(upstream_part.output(), input); |
- return input.part(); |
-} |
- |
-void Engine::DisconnectOutput(Output output) { |
- DCHECK(output); |
- |
base::AutoLock lock(lock_); |
- DisconnectOutputUnsafe(output.stage_, output.index_); |
} |
-void Engine::DisconnectInput(Input input) { |
- DCHECK(input); |
- |
- base::AutoLock lock(lock_); |
- DisconnectInputUnsafe(input.stage_, input.index_); |
-} |
- |
-void Engine::RemovePartsConnectedToPart(Part part) { |
- DCHECK(part); |
- |
- base::AutoLock lock(lock_); |
- |
- std::deque<Part> to_remove { part }; |
- |
- while (!to_remove.empty()) { |
- Part part = to_remove.front(); |
- to_remove.pop_front(); |
- |
- for (uint32_t i = 0; i < part.input_count(); ++i) { |
- to_remove.push_back(part.upstream_part(i)); |
- } |
- |
- for (uint32_t i = 0; i < part.output_count(); ++i) { |
- to_remove.push_back(part.downstream_part(i)); |
- } |
- |
- RemoveUnsafe(part.stage_); |
- } |
-} |
- |
-void Engine::RemovePartsConnectedToOutput(Output output) { |
- DCHECK(output); |
- |
- if (!output.connected()) { |
- return; |
- } |
- |
- Part downstream_part = output.downstream_part(); |
- DisconnectOutput(output); |
- RemovePartsConnectedToPart(downstream_part); |
-} |
- |
-void Engine::RemovePartsConnectedToInput(Input input) { |
- DCHECK(input); |
- |
- if (!input.connected()) { |
- return; |
- } |
- |
- Part upstream_part = input.upstream_part(); |
- DisconnectInput(input); |
- RemovePartsConnectedToPart(upstream_part); |
-} |
- |
-void Engine::Prepare() { |
- base::AutoLock lock(lock_); |
- for (Stage* sink : sinks_) { |
- sink->Prepare(update_function_); |
- sink->prepared_ = true; |
- uint32_t input_count = sink->input_count(); |
- for (uint32_t input_index = 0; input_index < input_count; input_index++) { |
- MaybePrepareUnsafe(sink->input(input_index).upstream_stage()); |
- } |
- } |
-} |
- |
-void Engine::Prepare(Part part) { |
- DCHECK(part); |
+void Engine::PrepareInput(const InputRef& input) { |
+ VisitUpstream( |
+ input, |
+ [] (const InputRef& input, |
+ const OutputRef& output, |
+ const Stage::UpstreamCallback& callback) { |
+ DCHECK(!input.actual().prepared()); |
+ PayloadAllocator* allocator = input.stage_->PrepareInput(input.index_); |
+ input.actual().set_prepared(true); |
+ output.stage_->PrepareOutput(output.index_, allocator, callback); |
+ }); |
+} |
+ |
+void Engine::UnprepareInput(const InputRef& input) { |
+ VisitUpstream( |
+ input, |
+ [] (const InputRef& input, |
+ const OutputRef& output, |
+ const Stage::UpstreamCallback& callback) { |
+ DCHECK(input.actual().prepared()); |
+ input.stage_->UnprepareInput(input.index_); |
+ output.stage_->UnprepareOutput(output.index_, callback); |
+ }); |
+} |
+ |
+void Engine::FlushOutput(const OutputRef& output) { |
+ VisitDownstream( |
+ output, |
+ [] (const OutputRef& output, |
+ const InputRef& input, |
+ const Stage::DownstreamCallback& callback) { |
+ DCHECK(input.actual().prepared()); |
+ output.stage_->FlushOutput(output.index_); |
+ input.stage_->FlushInput(input.index_, callback); |
+ }); |
+} |
+ |
+void Engine::RequestUpdate(Stage* stage) { |
+ DCHECK(stage); |
base::AutoLock lock(lock_); |
- MaybePrepareUnsafe(part.stage_); |
+ Update(stage); |
+ Update(); |
} |
-void Engine::PrimeSinks() { |
- lock_.Acquire(); |
- std::list<Stage*> sinks(sinks_); |
- lock_.Release(); |
- |
- // TODO(dalesat): Threading issue: these sinks may go away during priming. |
- for (Stage* sink : sinks) { |
- sink->Prime(); |
- } |
-} |
- |
-void Engine::Reset() { |
- base::AutoLock lock(lock_); |
- while (!supply_backlog_.empty()) { |
- supply_backlog_.pop(); |
- } |
- while (!demand_backlog_.empty()) { |
- demand_backlog_.pop(); |
- } |
- sources_.clear(); |
- sinks_.clear(); |
- while (!stages_.empty()) { |
- Stage* stage = stages_.front(); |
- stages_.pop_front(); |
- delete stage; |
- } |
-} |
- |
-void Engine::PushToSupplyBacklogUnsafe(Stage* stage) { |
+void Engine::PushToSupplyBacklog(Stage* stage) { |
lock_.AssertAcquired(); |
- |
DCHECK(stage); |
+ |
packets_produced_ = true; |
if (!stage->in_supply_backlog_) { |
supply_backlog_.push(stage); |
@@ -236,150 +68,81 @@ void Engine::PushToSupplyBacklogUnsafe(Stage* stage) { |
} |
} |
-void Engine::PushToDemandBacklogUnsafe(Stage* stage) { |
+void Engine::PushToDemandBacklog(Stage* stage) { |
lock_.AssertAcquired(); |
- |
DCHECK(stage); |
+ |
if (!stage->in_demand_backlog_) { |
demand_backlog_.push(stage); |
stage->in_demand_backlog_ = true; |
} |
} |
-Engine::Part Engine::Add(Stage* stage) { |
+void Engine::VisitUpstream( |
+ const InputRef& input, |
+ const UpstreamVisitor& vistor) { |
base::AutoLock lock(lock_); |
- stages_.push_back(stage); |
- if (stage->input_count() == 0) { |
- sources_.push_back(stage); |
- } |
- if (stage->output_count() == 0) { |
- sinks_.push_back(stage); |
- } |
- return Part(stage); |
-} |
- |
-void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) { |
- DCHECK(stage); |
- DCHECK(index < stage->output_count()); |
+ std::queue<InputRef> backlog; |
+ backlog.push(input); |
- lock_.AssertAcquired(); |
+ while (!backlog.empty()) { |
+ InputRef input = backlog.front(); |
+ backlog.pop(); |
+ DCHECK(input.valid()); |
+ DCHECK(input.connected()); |
- StageOutput& stage_output = stage->output(index); |
+ const OutputRef& output = input.mate(); |
+ Stage* output_stage = output.stage_; |
- if (stage_output.downstream_stage() == nullptr) { |
- return; |
+ vistor( |
+ input, |
+ output, |
+ [output_stage, &backlog](size_t input_index) { |
+ backlog.push(InputRef(output_stage, input_index)); |
+ }); |
} |
- |
- stage_output.mate().disconnect(); |
- stage_output.disconnect(); |
} |
-void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) { |
- DCHECK(stage); |
- DCHECK(index < stage->input_count()); |
- |
- lock_.AssertAcquired(); |
- |
- StageInput& stage_input = stage->input(index); |
- |
- if (stage_input.upstream_stage() == nullptr) { |
- return; |
- } |
- |
- stage_input.mate().disconnect(); |
- stage_input.disconnect(); |
-} |
- |
-void Engine::RemoveUnsafe(Stage* stage) { |
- DCHECK(stage); |
- |
- lock_.AssertAcquired(); |
- |
- uint32_t input_count = stage->input_count(); |
- for (uint32_t input_index = 0; input_index < input_count; input_index++) { |
- if (stage->input(input_index).connected()) { |
- DisconnectInputUnsafe(stage, input_index); |
- } |
- } |
- |
- uint32_t output_count = stage->output_count(); |
- for (uint32_t output_index = 0; output_index < output_count; output_index++) { |
- if (stage->output(output_index).connected()) { |
- DisconnectOutputUnsafe(stage, output_index); |
- } |
- } |
- |
- sources_.remove(stage); |
- sinks_.remove(stage); |
- stages_.remove(stage); |
- delete stage; |
-} |
- |
-// static |
-Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) { |
- return new DistributorStage(source); |
-} |
- |
-// static |
-Stage* Engine::CreateStage(PacketTransformPtr transform) { |
- return new PacketTransformStage(transform); |
-} |
- |
-// static |
-Stage* Engine::CreateStage(ActiveSourcePtr source) { |
- return new ActiveSourceStage(source); |
-} |
- |
-// static |
-Stage* Engine::CreateStage(ActiveSinkPtr sink) { |
- return new ActiveSinkStage(sink); |
-} |
- |
-// static |
-Stage* Engine::CreateStage(LpcmTransformPtr transform) { |
- return new LpcmTransformStage(transform); |
-} |
- |
-void Engine::MaybePrepareUnsafe(Stage* stage) { |
- lock_.AssertAcquired(); |
+void Engine::VisitDownstream( |
+ const OutputRef& output, |
+ const DownstreamVisitor& vistor) { |
+ base::AutoLock lock(lock_); |
- if (stage == nullptr || stage->prepared_) { |
- return; |
- } |
+ std::queue<OutputRef> backlog; |
+ backlog.push(output); |
- // Make sure all downstream stages have been prepared. |
- uint32_t output_count = stage->output_count(); |
- for (uint32_t output_index = 0; output_index < output_count; output_index++) { |
- StageOutput& output = stage->output(output_index); |
- if (output.connected() && !output.downstream_stage()->prepared()) { |
- return; |
- } |
- } |
+ while (!backlog.empty()) { |
+ OutputRef output = backlog.front(); |
+ backlog.pop(); |
+ DCHECK(output.valid()); |
+ DCHECK(output.connected()); |
- stage->Prepare(update_function_); |
- stage->prepared_ = true; |
+ const InputRef& input = output.mate(); |
+ Stage* input_stage = input.stage_; |
- // Prepare all upstream stages. |
- uint32_t input_count = stage->input_count(); |
- for (uint32_t input_index = 0; input_index < input_count; input_index++) { |
- MaybePrepareUnsafe(stage->input(input_index).upstream_stage()); |
+ vistor( |
+ output, |
+ input, |
+ [input_stage, &backlog](size_t output_index) { |
+ backlog.push(OutputRef(input_stage, output_index)); |
+ }); |
} |
} |
-void Engine::UpdateUnsafe() { |
+void Engine::Update() { |
lock_.AssertAcquired(); |
while (true) { |
- Stage* stage = PopFromSupplyBacklogUnsafe(); |
+ Stage* stage = PopFromSupplyBacklog(); |
if (stage != nullptr) { |
- UpdateUnsafe(stage); |
+ Update(stage); |
continue; |
} |
- stage = PopFromDemandBacklogUnsafe(); |
+ stage = PopFromDemandBacklog(); |
if (stage != nullptr) { |
- UpdateUnsafe(stage); |
+ Update(stage); |
continue; |
} |
@@ -387,7 +150,7 @@ void Engine::UpdateUnsafe() { |
} |
} |
-void Engine::UpdateUnsafe(Stage *stage) { |
+void Engine::Update(Stage *stage) { |
lock_.AssertAcquired(); |
DCHECK(stage); |
@@ -398,11 +161,11 @@ void Engine::UpdateUnsafe(Stage *stage) { |
// If the stage produced packets, it may need to reevaluate demand later. |
if (packets_produced_) { |
- PushToDemandBacklogUnsafe(stage); |
+ PushToDemandBacklog(stage); |
} |
} |
-Stage* Engine::PopFromSupplyBacklogUnsafe() { |
+Stage* Engine::PopFromSupplyBacklog() { |
lock_.AssertAcquired(); |
if (supply_backlog_.empty()) { |
@@ -416,7 +179,7 @@ Stage* Engine::PopFromSupplyBacklogUnsafe() { |
return stage; |
} |
-Stage* Engine::PopFromDemandBacklogUnsafe() { |
+Stage* Engine::PopFromDemandBacklog() { |
lock_.AssertAcquired(); |
if (demand_backlog_.empty()) { |