Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(853)

Unified Diff: services/media/framework/engine.h

Issue 1577953002: Motown in-proc streaming framework used to implement media services. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Sync, updates based on feedback, some functions declared const. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: services/media/framework/engine.h
diff --git a/services/media/framework/engine.h b/services/media/framework/engine.h
new file mode 100644
index 0000000000000000000000000000000000000000..90b051935622a1512ebfb761dbc3eef1423cf938
--- /dev/null
+++ b/services/media/framework/engine.h
@@ -0,0 +1,428 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_H_
+#define SERVICES_MEDIA_FRAMEWORK_ENGINE_H_
+
+#include <list>
+#include <queue>
+#include <stack>
+
+#include "base/synchronization/lock.h"
+#include "services/media/framework/stages/active_sink_stage.h"
+#include "services/media/framework/stages/active_source_stage.h"
+#include "services/media/framework/stages/distributor_stage.h"
+#include "services/media/framework/stages/lpcm_transform_stage.h"
+#include "services/media/framework/stages/packet_transform_stage.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+//
+// USAGE
+//
+// Engine is a container for sources, sinks and transforms ('parts') connected
+// in a graph. Engine::Part, Engine::Input and Engine::Output are all opaque
+// references to parts and their inputs and outputs. Engine provides a variety
+// of methods for adding and removing parts and for connecting inputs and
+// outputs to form a graph.
+//
+// In addition to containing parts and representing their interconnection,
+// Engine manages the coordinated operation of its constituent parts and
+// transports media from part to part. The Prepare method prepares the graph
+// for operation, and the PrimeSinks method tells the sinks in the graph to
+// prime themselves. Any additional actions required to make the graph operate
+// (such as manipulating a rate control interface) is out of scope.
+//
+// Parts added to the engine are referenced using shared pointers. The engine
+// holds pointers to the parts it contains, and the application, in many cases,
+// also holds pointers to the parts so it can call methods that are outside the
+// engine's scope. When a part is added the Engine returns an Engine::Part
+// object, which can be used to reference the part when the graph is modified.
+// Engine::Part objects can be interrogated to retrieve inputs (as Engine::Input
+// objects) and outputs (as Engine::Output objects).
+//
+// Some support is provided for modifying graphs that are operating. This
+// capability isn't fully developed at the moment. Prepare(Part) is an example
+// of a method provided for this purpose.
+//
+// Parts come in various flavors, defined by 'model' abstract classes. The
+// current list of supported models is:
+//
+// ActiveSink - a sink that consumes packets asynchronously
+// ActiveSource - a source that produces packets asynchronously
+// LpcmMixer - a transform that mixes LPCM frames from multiple
+// inputs and produces a single stream of LPCM frames
+// via one output
+// LpcmSource - a source that produces LPCM frames synchronously
+// LpcmTransform - a synchronous transform with one LPCM input and
+// one LPCM output
+// MultiStreamPacketSource - a source that produces multiple streams of packets
+// synchronously
+// PacketTransform - a synchronous transform that consumes and produces
+// packets via one input and one output
+//
+// Other models will be defined in the future as needed.
+//
+
+//
+// DESIGN
+//
+// The Engine is implemented as a system of cooperating objects. Of those
+// objects, only the engine itself is of relevance to code that uses Engine and
+// to part implementations. The other objects are:
+//
+// Stage
+// A stage hosts a single part. There are many subclasses of Stage, one for
+// each supported part model. The stage's job is to implement the contract
+// represented by the model so the parts that conform to the model can
+// participate in the operation of the engine. Stages are uniform with respect
+// to how they interact with engine. Engine::Part references a stage.
+//
+// StageInput
+// A stage possesses zero or more StageInput instances. StageInput objects
+// implement the supply of media into the stage and demand for media signalled
+// upstream. StageInputs recieve media from StageOutputs in the form of packets
+// (type Packet). LpcmStageInput is a subclass of StageInput that interoperates
+// with LpcmStageInputs in a way that provides optimizations relavant to LPCM
+// audio media. Engine::Input references a StageInput.
+//
+// StageOutput
+// A stage possesses zero or more StageOutput instances. StageOutput objects
+// implement the supply of media output of the stage to a downstream input and
+// demand for media signalled from that input. LpcmStageOutput implements
+// optimized LPCM flow. Engine::Output references a StageOutput.
+//
+// Engine uses a 'work list' algorithm to operate the contained graph. The
+// engine has a backlog of stages that need to be updated. To advance the
+// operation of the graph, the engine removes a stage from the backlog and calls
+// the stage's Update method. The Stage::Update may cause stages to be added
+// synchronously to the the backlog. This procedure continues until the backlog
+// is empty.
+//
+// Stage::Update is the stage's opportunity to react to the supply of new media
+// via its inputs and the signalling of new demand via its outputs. During
+// Update, the stage does whatever work it can with the current supply and
+// demand, possibly supplying media downstream through its outputs and/or
+// signalling new demand via its inputs. When a stage supplies media through
+// an output, the downstream stage is added to the backlog. When a stage updates
+// its dwmand through an input, the upstream stage is added to the backlog.
johngro 2016/02/01 22:38:17 s/dwmand/demand
dalesat 2016/02/01 23:01:28 Done.
+//
+// The process starts when a stage invokes an update callback supplied by the
+// engine. Stages that implement synchronous models never do this. Other stages
+// do this as directed by the parts they host in accordance with their
+// respective models. When a stage is ready to supply media or update demand
+// due to external events, it calls the update callback. The engine responds by
+// adding the stage to the backlog and then burning down the backlog. The stage
+// that called back is updated first, and then all the work that can be done
+// synchronously as a result of the external event is completed. In this way,
+// the operation of the graph is driven by external events signalled through
+// update callbacks.
+//
+// Currently, Engine uses an opportunistic threading model that only allows
+// one thread to drive the backlog processing at any given time. The engine
+// runs the processing on whatever thread enters it via an update callback.
+// An engine employs a single lock that protects manipulation of the graph and
+// processing of the backlog. Stage update methods are invoked with that lock
+// taken. This arrangement implies the following constraints:
+//
+// 1) An update callback cannot be called synchronously with a State::Update
+// call, because the lock is taken for the duration of Update, and the
+// callback will take the lock. Update callbacks may occur during Engine::
+// PrimeSinks, and they generally will.
+// 2) A stage cannot update supply/demand on its inputs/outputs except during
+// Update. When an external event occurs, the stage and/or its hosted part
+// should update its internal state as required and invoke the callback.
+// During the subsequent Update, the stage and/or part can then update
+// supply and/or demand.
+// 3) Threads used to call update callbacks must be suitable for operating the
+// engine. There is currently no affordance for processing other tasks on
+// a thread while the callback is running. A callback may run for a long
+// time, depending on how much work needs to be done.
+// 4) Parts cannot rely on being called back on the same thread on which they
+// invoke update callbacks. This may require additional synchronization and
+// thread transitions inside the part.
+// 5) If a part takes a lock of its own during Update, it should not also hold
+// that lock when calling the update callback. Doing so will result in
+// deadlock.
+//
+// NOTE: Allocators, not otherwise discussed here, are required to be thread-
+// safe so that packets may be cleaned up on any thread.
+//
+// In the future, the threading model will be enhanced. Intended features
+// include:
+// 1) Support for multiple threads.
+// 2) Marshalling update callbacks to a different thread.
+//
+
+// Host for a source, sink or transform.
+class Engine {
+ public:
+ class Input;
+ class Output;
+
+ // Opaque Stage pointer used for graph building.
+ class Part {
+ public:
+ Part() : stage_(nullptr) {}
+
+ uint32_t input_count();
+ Input input(uint32_t index);
+ Input input();
+ uint32_t output_count();
+ Output output(uint32_t index);
+ Output output();
+ Part upstream_part(uint32_t index);
+ Part upstream_part();
+ Part downstream_part(uint32_t index);
+ Part downstream_part();
+
+ private:
+ explicit Part(Stage* stage) : stage_(stage) {}
+
+ explicit operator bool() const { return stage_ != nullptr; }
+
+ Stage* stage_;
+
+ friend Engine;
+ friend Input;
+ friend Output;
+ };
+
+ // Opaque StageInput pointer used for graph building.
+ class Input {
+ public:
+ Input() : stage_(nullptr), index_(0) {}
+
+ explicit operator bool() const { return stage_ != nullptr; }
+
+ Part part() { return Part(stage_); }
+
+ bool connected() {
+ DCHECK(stage_);
+ return stage_input().upstream_stage() != nullptr;
+ }
+
+ Part upstream_part() {
+ DCHECK(connected());
+ return Part(stage_input().upstream_stage());
+ }
+
+ private:
+ Input(Stage* stage, uint32_t index) :
+ stage_(stage), index_(index) {
+ DCHECK(stage_);
+ DCHECK(index_ < stage_->input_count());
+ }
+
+ StageInput& stage_input() {
+ DCHECK(stage_);
+ return stage_->input(index_);
+ }
+
+ Stage* stage_;
+ uint32_t index_;
+
+ friend Engine;
+ friend Part;
+ friend Output;
+ };
+
+ // Opaque StageOutput pointer used for graph building.
+ class Output {
+ public:
+ Output() : stage_(nullptr), index_(0) {}
+
+ explicit operator bool() const { return stage_ != nullptr; }
+
+ Part part() { return Part(stage_); }
+
+ bool connected() {
+ DCHECK(stage_);
+ return stage_output().downstream_stage() != nullptr;
+ }
+
+ Part downstream_part() {
+ DCHECK(connected());
+ return Part(stage_output().downstream_stage());
+ }
+
+ private:
+ Output(Stage* stage, uint32_t index) :
+ stage_(stage), index_(index) {
+ DCHECK(stage_);
+ DCHECK(index_ < stage_->output_count());
+ }
+
+ StageOutput& stage_output() {
+ DCHECK(stage_);
+ return stage_->output(index_);
+ }
+
+ Stage* stage_;
+ uint32_t index_;
+
+ friend Engine;
+ friend Part;
+ friend Input;
+ };
+
+ Engine();
+
+ ~Engine();
+
+ // Adds a part to the engine.
+ template<typename T, typename TBase>
+ Part Add(SharedPtr<T, TBase> t) {
+ DCHECK(t);
+ return Add(CreateStage(std::shared_ptr<TBase>(t)));
+ }
+
+ // Removes a part from the engine after disconnecting it from other parts.
+ void Remove(Part part);
+
+ // Connects an output connector to an input connector. Returns the dowstream
+ // part.
+ Part Connect(Output output, Input input);
+
+ // Connects a part with exactly one output to a part with exactly one input.
+ // Returns the downstream part.
+ Part Connect(Part upstream_part, Part downstream_part);
+
+ // Connects an output connector to a part that has exactly one input. Returns
+ // the downstream part.
+ Part Connect(Output output, Part downstream_part);
+
+ // Connects a part with exactly one output to an input connector. Returns the
+ // downstream part.
+ Part Connect(Part upstream_part, Input input);
+
+ // Disconnects an output connector and the input connector to which it's
+ // connected.
+ void Disconnect(Output output);
+
+ // Disconnects an input connector and the output connector to which it's
+ // connected.
+ void Disconnect(Input input);
+
+ // Disconnects and removes part and everything connected to it.
+ void RemoveAll(Part part);
+
+ // Disconnects and removes everything connected to output.
+ void RemoveAll(Output output);
+
+ // Disconnects and removes everything connected to input.
+ void RemoveAll(Input input);
+
+ // Adds all the parts in t (which must all have one input and one output) and
+ // connects them in sequence to the output connector. Returns the output
+ // connector of the last part or the output parameter if it is empty.
+ template<typename T>
+ Output AddAndConnectAll(
+ Output output,
+ const T& t) {
+ for (auto& element : t) {
+ Part part = Add(CreateStage(element));
+ Connect(output, part.input());
+ output = part.output();
+ }
+ return output;
+ }
+
+ // Prepares the engine.
+ void Prepare();
+
+ // Prepares the part and everything upstream of it. This method is used to
+ // prepare subgraphs added when the rest of the graph is already prepared.
+ void Prepare(Part part);
+
+ // Primes all the sinks in the graph.
+ void PrimeSinks();
+
+ // Removes all parts from the engine.
+ void Reset();
+
+ private:
+ // Adds a stage to the engine.
+ Part Add(Stage* stage);
+
+ // Disconnects an output.
+ void DisconnectOutputUnsafe(Stage* stage, uint32_t index);
+
+ // Disconnects an input.
+ void DisconnectInputUnsafe(Stage* stage, uint32_t index);
+
+ // Removes a stage.
+ void RemoveUnsafe(Stage* stage);
+
+ // Creates a stage from a source, sink or transform. A specialization of this
+ // template is defined for each type of source, sink or transform that can be
+ // added to the engine.
+ template<typename T>
+ static Stage* CreateStage(std::shared_ptr<T> t);
+
+ // CreateStage template specialization for MultiStreamPacketSource.
+ static Stage* CreateStage(MultiStreamPacketSourcePtr source);
+
+ // CreateStage template specialization for PacketTransform.
+ static Stage* CreateStage(PacketTransformPtr transform);
+
+ // CreateStage template specialization for ActiveSource.
+ static Stage* CreateStage(ActiveSourcePtr source);
+
+ // CreateStage template specialization for ActiveSink.
+ static Stage* CreateStage(ActiveSinkPtr sink);
+
+ // CreateStage template specialization for LpcmTransform.
+ static Stage* CreateStage(LpcmTransformPtr transform);
+
+ // Prepares a stage if all its downstream stages are prepared.
+ void MaybePrepareUnsafe(Stage* stage);
+
+ // Processes the entire backlog.
+ void UpdateUnsafe();
+
+ // Performs processing for a single stage, updating the backlog accordingly.
+ void UpdateUnsafe(Stage *stage);
+
+ // Pushes the stage to the supply backlog if it isn't already there.
+ void PushToSupplyBacklogUnsafe(Stage* stage);
+
+ // Pushes the stage to the demand backlog if it isn't already there.
+ void PushToDemandBacklogUnsafe(Stage* stage);
+
+ // Pops a stage from the supply backlog and returns it or returns nullptr if
+ // the supply backlog is empty.
+ Stage* PopFromSupplyBacklogUnsafe();
+
+ // Pops a stage from the demand backlog and returns it or returns nullptr if
+ // the demand backlog is empty.
+ Stage* PopFromDemandBacklogUnsafe();
+
+ mutable base::Lock lock_;
+ std::list<Stage*> stages_;
+ std::list<Stage*> sources_;
+ std::list<Stage*> sinks_;
+ // supply_backlog_ contains pointers to all the stages that have been supplied
+ // (packets or frames) but have not been updated since. demand_backlog_ does
+ // the same for demand. The use of queue vs stack here is a guess as to what
+ // will yield the best results. It's possible that only a single backlog is
+ // required.
+ // TODO(dalesat): Determine the best ordering and implement it.
+ std::queue<Stage*> supply_backlog_;
+ std::stack<Stage*> demand_backlog_;
+ Stage::UpdateCallback update_function_;
+ bool packets_produced_;
+
+ friend class StageInput;
+ friend class StageOutput;
+ friend class LpcmStageInput;
+ friend class LpcmStageOutput;
+};
+
+} // namespace media
+} // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_ENGINE_ENGINE_H_

Powered by Google App Engine
This is Rietveld 408576698