Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4017)

Unified Diff: services/media/framework/engine.cc

Issue 1577953002: Motown in-proc streaming framework used to implement media services. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Sync, updates based on feedback, some functions declared const. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: services/media/framework/engine.cc
diff --git a/services/media/framework/engine.cc b/services/media/framework/engine.cc
new file mode 100644
index 0000000000000000000000000000000000000000..f3661cb86d4ada12a76843c06aad7d1ef381aeea
--- /dev/null
+++ b/services/media/framework/engine.cc
@@ -0,0 +1,434 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/engine.h"
+
+namespace mojo {
+namespace media {
+
+uint32_t Engine::Part::input_count() {
+ DCHECK(stage_ != nullptr);
+ return stage_->input_count();
+}
+
+Engine::Input Engine::Part::input(uint32_t index) {
+ DCHECK(stage_ != nullptr && index < stage_->input_count());
+ return Input(stage_, index);
+}
+
+Engine::Input Engine::Part::input() {
+ DCHECK(stage_ != nullptr && stage_->input_count() == 1);
+ return Input(stage_, 0);
+}
+
+uint32_t Engine::Part::output_count() {
+ DCHECK(stage_ != nullptr);
+ return stage_->output_count();
+}
+
+Engine::Output Engine::Part::output(uint32_t index) {
+ DCHECK(stage_ != nullptr && index < stage_->output_count());
+ return Output(stage_, index);
+}
+
+Engine::Output Engine::Part::output() {
+ DCHECK(stage_ != nullptr && stage_->output_count() == 1);
+ return Output(stage_, 0);
+}
+
+Engine::Part Engine::Part::upstream_part(uint32_t index) {
+ DCHECK(stage_ != nullptr && index < stage_->input_count());
+ return Part(stage_->input(index).upstream_stage());
+}
+
+Engine::Part Engine::Part::upstream_part() {
+ DCHECK(stage_ != nullptr && stage_->input_count() == 1);
+ return Part(stage_->input(0).upstream_stage());
+}
+
+Engine::Part Engine::Part::downstream_part(uint32_t index) {
+ DCHECK(stage_ != nullptr && index < stage_->output_count());
+ return Part(stage_->output(index).downstream_stage());
+}
+
+Engine::Part Engine::Part::downstream_part() {
+ DCHECK(stage_ != nullptr && stage_->output_count() == 1);
+ return Part(stage_->output(0).downstream_stage());
+}
+
+Engine::Engine() {
+ update_function_ = [this](Stage* stage) {
+ DCHECK(stage);
+ base::AutoLock lock(lock_);
+ UpdateUnsafe(stage);
+ UpdateUnsafe();
+ };
+}
+
+Engine::~Engine() {
+ Reset();
+}
+
+void Engine::Remove(Part part) {
+ DCHECK(part);
+ base::AutoLock lock(lock_);
+ RemoveUnsafe(part.stage_);
+}
+
+Engine::Part Engine::Connect(Output output, Input input) {
+ DCHECK(output);
+ DCHECK(input);
+
+ base::AutoLock lock(lock_);
+
+ if (output.connected()) {
+ DisconnectOutputUnsafe(output.stage_, output.index_);
+ }
+ if (input.connected()) {
+ DisconnectInputUnsafe(input.stage_, input.index_);
+ }
+
+ output.stage_output().connect(input.stage_, input.index_);
+ input.stage_input().connect(output.stage_, output.index_);
+
+ return input.part();
+}
+
+Engine::Part Engine::Connect(Part upstream_part, Part downstream_part) {
+ DCHECK(upstream_part);
+ DCHECK(downstream_part);
+ Connect(upstream_part.output(), downstream_part.input());
+ return downstream_part;
+}
+
+Engine::Part Engine::Connect(
+ Output output,
+ Part downstream_part) {
+ DCHECK(output);
+ DCHECK(downstream_part);
+ Connect(output, downstream_part.input());
+ return downstream_part;
+}
+
+Engine::Part Engine::Connect(Part upstream_part, Input input) {
+ DCHECK(upstream_part);
+ DCHECK(input);
+ Connect(upstream_part.output(), input);
+ return input.part();
+}
+
+void Engine::Disconnect(Output output) {
+ DCHECK(output);
+
+ base::AutoLock lock(lock_);
+ DisconnectOutputUnsafe(output.stage_, output.index_);
+}
+
+void Engine::Disconnect(Input input) {
+ DCHECK(input);
+
+ base::AutoLock lock(lock_);
+ DisconnectInputUnsafe(input.stage_, input.index_);
+}
+
+void Engine::RemoveAll(Part part) {
+ DCHECK(part);
+
+ base::AutoLock lock(lock_);
+
+ std::deque<Part> to_remove { part };
+
+ while (!to_remove.empty()) {
+ Part part = to_remove.front();
+ to_remove.pop_front();
+
+ for (uint32_t i = 0; i < part.input_count(); ++i) {
+ to_remove.push_back(part.upstream_part(i));
+ }
+
+ for (uint32_t i = 0; i < part.output_count(); ++i) {
+ to_remove.push_back(part.downstream_part(i));
+ }
+
+ RemoveUnsafe(part.stage_);
+ }
+}
+
+void Engine::RemoveAll(Output output) {
+ DCHECK(output);
+
+ if (!output.connected()) {
+ return;
+ }
+
+ Part downstream_part = output.downstream_part();
+ Disconnect(output);
+ RemoveAll(downstream_part);
+}
+
+void Engine::RemoveAll(Input input) {
+ DCHECK(input);
+
+ if (!input.connected()) {
+ return;
+ }
+
+ Part upstream_part = input.upstream_part();
+ Disconnect(input);
+ RemoveAll(upstream_part);
+}
+
+void Engine::Prepare() {
+ base::AutoLock lock(lock_);
+ for (Stage* sink : sinks_) {
+ sink->Prepare(update_function_);
+ sink->prepared_ = true;
+ uint32_t input_count = sink->input_count();
+ for (uint32_t input_index = 0; input_index < input_count; input_index++) {
+ MaybePrepareUnsafe(sink->input(input_index).upstream_stage());
+ }
+ }
+}
+
+void Engine::Prepare(Part part) {
+ DCHECK(part);
+ base::AutoLock lock(lock_);
+ MaybePrepareUnsafe(part.stage_);
+}
+
+void Engine::PrimeSinks() {
+ lock_.Acquire();
+ std::list<Stage*> sinks(sinks_);
+ lock_.Release();
+
+ // TODO(dalesat): Threading issue: these sinks may go away during priming.
+ for (Stage* sink : sinks) {
+ sink->Prime();
+ }
+}
+
+void Engine::Reset() {
+ base::AutoLock lock(lock_);
+ while (!supply_backlog_.empty()) {
+ supply_backlog_.pop();
+ }
+ while (!demand_backlog_.empty()) {
+ demand_backlog_.pop();
+ }
+ sources_.clear();
+ sinks_.clear();
+ while (!stages_.empty()) {
+ Stage* stage = stages_.front();
+ stages_.pop_front();
+ delete stage;
+ }
+}
+
+void Engine::PushToSupplyBacklogUnsafe(Stage* stage) {
+ lock_.AssertAcquired();
+
+ DCHECK(stage);
+ packets_produced_ = true;
+ if (!stage->in_supply_backlog_) {
+ supply_backlog_.push(stage);
+ stage->in_supply_backlog_ = true;
+ }
+}
+
+void Engine::PushToDemandBacklogUnsafe(Stage* stage) {
+ lock_.AssertAcquired();
+
+ DCHECK(stage);
+ if (!stage->in_demand_backlog_) {
+ demand_backlog_.push(stage);
+ stage->in_demand_backlog_ = true;
+ }
+}
+
+Engine::Part Engine::Add(Stage* stage) {
+ base::AutoLock lock(lock_);
+
+ stages_.push_back(stage);
+ if (stage->input_count() == 0) {
+ sources_.push_back(stage);
+ }
+ if (stage->output_count() == 0) {
+ sinks_.push_back(stage);
+ }
+ return Part(stage);
+}
+
+void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) {
+ DCHECK(stage);
+ DCHECK(index < stage->output_count());
+
+ lock_.AssertAcquired();
+
+ StageOutput& stage_output = stage->output(index);
+
+ if (stage_output.downstream_stage() == nullptr) {
+ return;
+ }
+
+ stage_output.mate().disconnect();
+ stage_output.disconnect();
+}
+
+void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) {
+ DCHECK(stage);
+ DCHECK(index < stage->input_count());
+
+ lock_.AssertAcquired();
+
+ StageInput& stage_input = stage->input(index);
+
+ if (stage_input.upstream_stage() == nullptr) {
+ return;
+ }
+
+ stage_input.mate().disconnect();
+ stage_input.disconnect();
+}
+
+void Engine::RemoveUnsafe(Stage* stage) {
+ DCHECK(stage);
+
+ lock_.AssertAcquired();
+
+ uint32_t input_count = stage->input_count();
+ for (uint32_t input_index = 0; input_index < input_count; input_index++) {
+ if (stage->input(input_index).connected()) {
+ DisconnectInputUnsafe(stage, input_index);
+ }
+ }
+
+ uint32_t output_count = stage->output_count();
+ for (uint32_t output_index = 0; output_index < output_count; output_index++) {
+ if (stage->output(output_index).connected()) {
+ DisconnectOutputUnsafe(stage, output_index);
+ }
+ }
+
+ sources_.remove(stage);
+ sinks_.remove(stage);
+ stages_.remove(stage);
+ delete stage;
+}
+
+// static
+Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) {
+ return new DistributorStage(source);
+}
+
+// static
+Stage* Engine::CreateStage(PacketTransformPtr transform) {
+ return new PacketTransformStage(transform);
+}
+
+// static
+Stage* Engine::CreateStage(ActiveSourcePtr source) {
+ return new ActiveSourceStage(source);
+}
+
+// static
+Stage* Engine::CreateStage(ActiveSinkPtr sink) {
+ return new ActiveSinkStage(sink);
+}
+
+// static
+Stage* Engine::CreateStage(LpcmTransformPtr transform) {
+ return new LpcmTransformStage(transform);
+}
+
+void Engine::MaybePrepareUnsafe(Stage* stage) {
+ lock_.AssertAcquired();
+
+ if (stage == nullptr || stage->prepared_) {
+ return;
+ }
+
+ // Make sure all downstream stages have been prepared.
+ uint32_t output_count = stage->output_count();
+ for (uint32_t output_index = 0; output_index < output_count; output_index++) {
+ StageOutput& output = stage->output(output_index);
+ if (output.connected() && !output.downstream_stage()->prepared()) {
+ return;
+ }
+ }
+
+ stage->Prepare(update_function_);
+ stage->prepared_ = true;
+
+ // Prepare all upstream stages.
+ uint32_t input_count = stage->input_count();
+ for (uint32_t input_index = 0; input_index < input_count; input_index++) {
+ MaybePrepareUnsafe(stage->input(input_index).upstream_stage());
+ }
+}
+
+void Engine::UpdateUnsafe() {
+ lock_.AssertAcquired();
+
+ while (true) {
+ Stage* stage = PopFromSupplyBacklogUnsafe();
+ if (stage != nullptr) {
+ UpdateUnsafe(stage);
+ continue;
+ }
+
+ stage = PopFromDemandBacklogUnsafe();
+ if (stage != nullptr) {
+ UpdateUnsafe(stage);
+ continue;
+ }
+
+ break;
+ }
+}
+
+void Engine::UpdateUnsafe(Stage *stage) {
+ lock_.AssertAcquired();
+
+ DCHECK(stage);
+
+ packets_produced_ = false;
+
+ stage->Update(this);
+
+ // If the stage produced packets, it may need to reevaluate demand later.
+ if (packets_produced_) {
+ PushToDemandBacklogUnsafe(stage);
+ }
+}
+
+Stage* Engine::PopFromSupplyBacklogUnsafe() {
+ lock_.AssertAcquired();
+
+ if (supply_backlog_.empty()) {
+ return nullptr;
+ }
+
+ Stage* stage = supply_backlog_.front();
+ supply_backlog_.pop();
+ DCHECK(stage->in_supply_backlog_);
+ stage->in_supply_backlog_ = false;
+ return stage;
+}
+
+Stage* Engine::PopFromDemandBacklogUnsafe() {
+ lock_.AssertAcquired();
+
+ if (demand_backlog_.empty()) {
+ return nullptr;
+ }
+
+ Stage* stage = demand_backlog_.top();
+ demand_backlog_.pop();
+ DCHECK(stage->in_demand_backlog_);
+ stage->in_demand_backlog_ = false;
+ return stage;
+}
+
+} // namespace media
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698