Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(287)

Unified Diff: services/media/framework/engine.cc

Issue 1678433002: Motown: Remove LPCM optimizations, fix prepare, add flush, add ActiveMultistreamSink model/stage (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Sync Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « services/media/framework/engine.h ('k') | services/media/framework/formatting.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: services/media/framework/engine.cc
diff --git a/services/media/framework/engine.cc b/services/media/framework/engine.cc
index 980440cdfeb95867d748853f71e8896c6ed76b5a..6c76e0b248147762535d7e86de0b05800745978f 100644
--- a/services/media/framework/engine.cc
+++ b/services/media/framework/engine.cc
@@ -7,228 +7,60 @@
namespace mojo {
namespace media {
-uint32_t Engine::Part::input_count() {
- DCHECK(stage_ != nullptr);
- return stage_->input_count();
-}
-
-Engine::Input Engine::Part::input(uint32_t index) {
- DCHECK(stage_ != nullptr && index < stage_->input_count());
- return Input(stage_, index);
-}
-
-Engine::Input Engine::Part::input() {
- DCHECK(stage_ != nullptr && stage_->input_count() == 1);
- return Input(stage_, 0);
-}
-
-uint32_t Engine::Part::output_count() {
- DCHECK(stage_ != nullptr);
- return stage_->output_count();
-}
-
-Engine::Output Engine::Part::output(uint32_t index) {
- DCHECK(stage_ != nullptr && index < stage_->output_count());
- return Output(stage_, index);
-}
-
-Engine::Output Engine::Part::output() {
- DCHECK(stage_ != nullptr && stage_->output_count() == 1);
- return Output(stage_, 0);
-}
-
-Engine::Part Engine::Part::upstream_part(uint32_t index) {
- DCHECK(stage_ != nullptr && index < stage_->input_count());
- return Part(stage_->input(index).upstream_stage());
-}
-
-Engine::Part Engine::Part::upstream_part() {
- DCHECK(stage_ != nullptr && stage_->input_count() == 1);
- return Part(stage_->input(0).upstream_stage());
-}
-
-Engine::Part Engine::Part::downstream_part(uint32_t index) {
- DCHECK(stage_ != nullptr && index < stage_->output_count());
- return Part(stage_->output(index).downstream_stage());
-}
-
-Engine::Part Engine::Part::downstream_part() {
- DCHECK(stage_ != nullptr && stage_->output_count() == 1);
- return Part(stage_->output(0).downstream_stage());
-}
-
-Engine::Engine() {
- update_function_ = [this](Stage* stage) {
- DCHECK(stage);
- base::AutoLock lock(lock_);
- UpdateUnsafe(stage);
- UpdateUnsafe();
- };
-}
+Engine::Engine() {}
Engine::~Engine() {
- Reset();
-}
-
-void Engine::RemovePart(Part part) {
- DCHECK(part);
- base::AutoLock lock(lock_);
- RemoveUnsafe(part.stage_);
-}
-
-Engine::Part Engine::Connect(Output output, Input input) {
- DCHECK(output);
- DCHECK(input);
-
- base::AutoLock lock(lock_);
-
- if (output.connected()) {
- DisconnectOutputUnsafe(output.stage_, output.index_);
- }
- if (input.connected()) {
- DisconnectInputUnsafe(input.stage_, input.index_);
- }
-
- output.stage_output().connect(input.stage_, input.index_);
- input.stage_input().connect(output.stage_, output.index_);
-
- return input.part();
-}
-
-Engine::Part Engine::ConnectParts(Part upstream_part, Part downstream_part) {
- DCHECK(upstream_part);
- DCHECK(downstream_part);
- Connect(upstream_part.output(), downstream_part.input());
- return downstream_part;
-}
-
-Engine::Part Engine::ConnectOutputToPart(
- Output output,
- Part downstream_part) {
- DCHECK(output);
- DCHECK(downstream_part);
- Connect(output, downstream_part.input());
- return downstream_part;
-}
-
-Engine::Part Engine::ConnectPartToInput(Part upstream_part, Input input) {
- DCHECK(upstream_part);
- DCHECK(input);
- Connect(upstream_part.output(), input);
- return input.part();
-}
-
-void Engine::DisconnectOutput(Output output) {
- DCHECK(output);
-
base::AutoLock lock(lock_);
- DisconnectOutputUnsafe(output.stage_, output.index_);
}
-void Engine::DisconnectInput(Input input) {
- DCHECK(input);
-
- base::AutoLock lock(lock_);
- DisconnectInputUnsafe(input.stage_, input.index_);
-}
-
-void Engine::RemovePartsConnectedToPart(Part part) {
- DCHECK(part);
-
- base::AutoLock lock(lock_);
-
- std::deque<Part> to_remove { part };
-
- while (!to_remove.empty()) {
- Part part = to_remove.front();
- to_remove.pop_front();
-
- for (uint32_t i = 0; i < part.input_count(); ++i) {
- to_remove.push_back(part.upstream_part(i));
- }
-
- for (uint32_t i = 0; i < part.output_count(); ++i) {
- to_remove.push_back(part.downstream_part(i));
- }
-
- RemoveUnsafe(part.stage_);
- }
-}
-
-void Engine::RemovePartsConnectedToOutput(Output output) {
- DCHECK(output);
-
- if (!output.connected()) {
- return;
- }
-
- Part downstream_part = output.downstream_part();
- DisconnectOutput(output);
- RemovePartsConnectedToPart(downstream_part);
-}
-
-void Engine::RemovePartsConnectedToInput(Input input) {
- DCHECK(input);
-
- if (!input.connected()) {
- return;
- }
-
- Part upstream_part = input.upstream_part();
- DisconnectInput(input);
- RemovePartsConnectedToPart(upstream_part);
-}
-
-void Engine::Prepare() {
- base::AutoLock lock(lock_);
- for (Stage* sink : sinks_) {
- sink->Prepare(update_function_);
- sink->prepared_ = true;
- uint32_t input_count = sink->input_count();
- for (uint32_t input_index = 0; input_index < input_count; input_index++) {
- MaybePrepareUnsafe(sink->input(input_index).upstream_stage());
- }
- }
-}
-
-void Engine::Prepare(Part part) {
- DCHECK(part);
+void Engine::PrepareInput(const InputRef& input) {
+ VisitUpstream(
+ input,
+ [] (const InputRef& input,
+ const OutputRef& output,
+ const Stage::UpstreamCallback& callback) {
+ DCHECK(!input.actual().prepared());
+ PayloadAllocator* allocator = input.stage_->PrepareInput(input.index_);
+ input.actual().set_prepared(true);
+ output.stage_->PrepareOutput(output.index_, allocator, callback);
+ });
+}
+
+void Engine::UnprepareInput(const InputRef& input) {
+ VisitUpstream(
+ input,
+ [] (const InputRef& input,
+ const OutputRef& output,
+ const Stage::UpstreamCallback& callback) {
+ DCHECK(input.actual().prepared());
+ input.stage_->UnprepareInput(input.index_);
+ output.stage_->UnprepareOutput(output.index_, callback);
+ });
+}
+
+void Engine::FlushOutput(const OutputRef& output) {
+ VisitDownstream(
+ output,
+ [] (const OutputRef& output,
+ const InputRef& input,
+ const Stage::DownstreamCallback& callback) {
+ DCHECK(input.actual().prepared());
+ output.stage_->FlushOutput(output.index_);
+ input.stage_->FlushInput(input.index_, callback);
+ });
+}
+
+void Engine::RequestUpdate(Stage* stage) {
+ DCHECK(stage);
base::AutoLock lock(lock_);
- MaybePrepareUnsafe(part.stage_);
+ Update(stage);
+ Update();
}
-void Engine::PrimeSinks() {
- lock_.Acquire();
- std::list<Stage*> sinks(sinks_);
- lock_.Release();
-
- // TODO(dalesat): Threading issue: these sinks may go away during priming.
- for (Stage* sink : sinks) {
- sink->Prime();
- }
-}
-
-void Engine::Reset() {
- base::AutoLock lock(lock_);
- while (!supply_backlog_.empty()) {
- supply_backlog_.pop();
- }
- while (!demand_backlog_.empty()) {
- demand_backlog_.pop();
- }
- sources_.clear();
- sinks_.clear();
- while (!stages_.empty()) {
- Stage* stage = stages_.front();
- stages_.pop_front();
- delete stage;
- }
-}
-
-void Engine::PushToSupplyBacklogUnsafe(Stage* stage) {
+void Engine::PushToSupplyBacklog(Stage* stage) {
lock_.AssertAcquired();
-
DCHECK(stage);
+
packets_produced_ = true;
if (!stage->in_supply_backlog_) {
supply_backlog_.push(stage);
@@ -236,150 +68,81 @@ void Engine::PushToSupplyBacklogUnsafe(Stage* stage) {
}
}
-void Engine::PushToDemandBacklogUnsafe(Stage* stage) {
+void Engine::PushToDemandBacklog(Stage* stage) {
lock_.AssertAcquired();
-
DCHECK(stage);
+
if (!stage->in_demand_backlog_) {
demand_backlog_.push(stage);
stage->in_demand_backlog_ = true;
}
}
-Engine::Part Engine::Add(Stage* stage) {
+void Engine::VisitUpstream(
+ const InputRef& input,
+ const UpstreamVisitor& vistor) {
base::AutoLock lock(lock_);
- stages_.push_back(stage);
- if (stage->input_count() == 0) {
- sources_.push_back(stage);
- }
- if (stage->output_count() == 0) {
- sinks_.push_back(stage);
- }
- return Part(stage);
-}
-
-void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) {
- DCHECK(stage);
- DCHECK(index < stage->output_count());
+ std::queue<InputRef> backlog;
+ backlog.push(input);
- lock_.AssertAcquired();
+ while (!backlog.empty()) {
+ InputRef input = backlog.front();
+ backlog.pop();
+ DCHECK(input.valid());
+ DCHECK(input.connected());
- StageOutput& stage_output = stage->output(index);
+ const OutputRef& output = input.mate();
+ Stage* output_stage = output.stage_;
- if (stage_output.downstream_stage() == nullptr) {
- return;
+ vistor(
+ input,
+ output,
+ [output_stage, &backlog](size_t input_index) {
+ backlog.push(InputRef(output_stage, input_index));
+ });
}
-
- stage_output.mate().disconnect();
- stage_output.disconnect();
}
-void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) {
- DCHECK(stage);
- DCHECK(index < stage->input_count());
-
- lock_.AssertAcquired();
-
- StageInput& stage_input = stage->input(index);
-
- if (stage_input.upstream_stage() == nullptr) {
- return;
- }
-
- stage_input.mate().disconnect();
- stage_input.disconnect();
-}
-
-void Engine::RemoveUnsafe(Stage* stage) {
- DCHECK(stage);
-
- lock_.AssertAcquired();
-
- uint32_t input_count = stage->input_count();
- for (uint32_t input_index = 0; input_index < input_count; input_index++) {
- if (stage->input(input_index).connected()) {
- DisconnectInputUnsafe(stage, input_index);
- }
- }
-
- uint32_t output_count = stage->output_count();
- for (uint32_t output_index = 0; output_index < output_count; output_index++) {
- if (stage->output(output_index).connected()) {
- DisconnectOutputUnsafe(stage, output_index);
- }
- }
-
- sources_.remove(stage);
- sinks_.remove(stage);
- stages_.remove(stage);
- delete stage;
-}
-
-// static
-Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) {
- return new DistributorStage(source);
-}
-
-// static
-Stage* Engine::CreateStage(PacketTransformPtr transform) {
- return new PacketTransformStage(transform);
-}
-
-// static
-Stage* Engine::CreateStage(ActiveSourcePtr source) {
- return new ActiveSourceStage(source);
-}
-
-// static
-Stage* Engine::CreateStage(ActiveSinkPtr sink) {
- return new ActiveSinkStage(sink);
-}
-
-// static
-Stage* Engine::CreateStage(LpcmTransformPtr transform) {
- return new LpcmTransformStage(transform);
-}
-
-void Engine::MaybePrepareUnsafe(Stage* stage) {
- lock_.AssertAcquired();
+void Engine::VisitDownstream(
+ const OutputRef& output,
+ const DownstreamVisitor& vistor) {
+ base::AutoLock lock(lock_);
- if (stage == nullptr || stage->prepared_) {
- return;
- }
+ std::queue<OutputRef> backlog;
+ backlog.push(output);
- // Make sure all downstream stages have been prepared.
- uint32_t output_count = stage->output_count();
- for (uint32_t output_index = 0; output_index < output_count; output_index++) {
- StageOutput& output = stage->output(output_index);
- if (output.connected() && !output.downstream_stage()->prepared()) {
- return;
- }
- }
+ while (!backlog.empty()) {
+ OutputRef output = backlog.front();
+ backlog.pop();
+ DCHECK(output.valid());
+ DCHECK(output.connected());
- stage->Prepare(update_function_);
- stage->prepared_ = true;
+ const InputRef& input = output.mate();
+ Stage* input_stage = input.stage_;
- // Prepare all upstream stages.
- uint32_t input_count = stage->input_count();
- for (uint32_t input_index = 0; input_index < input_count; input_index++) {
- MaybePrepareUnsafe(stage->input(input_index).upstream_stage());
+ vistor(
+ output,
+ input,
+ [input_stage, &backlog](size_t output_index) {
+ backlog.push(OutputRef(input_stage, output_index));
+ });
}
}
-void Engine::UpdateUnsafe() {
+void Engine::Update() {
lock_.AssertAcquired();
while (true) {
- Stage* stage = PopFromSupplyBacklogUnsafe();
+ Stage* stage = PopFromSupplyBacklog();
if (stage != nullptr) {
- UpdateUnsafe(stage);
+ Update(stage);
continue;
}
- stage = PopFromDemandBacklogUnsafe();
+ stage = PopFromDemandBacklog();
if (stage != nullptr) {
- UpdateUnsafe(stage);
+ Update(stage);
continue;
}
@@ -387,7 +150,7 @@ void Engine::UpdateUnsafe() {
}
}
-void Engine::UpdateUnsafe(Stage *stage) {
+void Engine::Update(Stage *stage) {
lock_.AssertAcquired();
DCHECK(stage);
@@ -398,11 +161,11 @@ void Engine::UpdateUnsafe(Stage *stage) {
// If the stage produced packets, it may need to reevaluate demand later.
if (packets_produced_) {
- PushToDemandBacklogUnsafe(stage);
+ PushToDemandBacklog(stage);
}
}
-Stage* Engine::PopFromSupplyBacklogUnsafe() {
+Stage* Engine::PopFromSupplyBacklog() {
lock_.AssertAcquired();
if (supply_backlog_.empty()) {
@@ -416,7 +179,7 @@ Stage* Engine::PopFromSupplyBacklogUnsafe() {
return stage;
}
-Stage* Engine::PopFromDemandBacklogUnsafe() {
+Stage* Engine::PopFromDemandBacklog() {
lock_.AssertAcquired();
if (demand_backlog_.empty()) {
« no previous file with comments | « services/media/framework/engine.h ('k') | services/media/framework/formatting.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698