Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(85)

Side by Side Diff: services/media/framework/engine.h

Issue 1577953002: Motown in-proc streaming framework used to implement media services. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: sync Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_H_
6 #define SERVICES_MEDIA_FRAMEWORK_ENGINE_H_
7
8 #include <list>
9 #include <queue>
10 #include <stack>
11
12 #include "base/synchronization/lock.h"
13 #include "services/media/framework/stages/active_sink_stage.h"
14 #include "services/media/framework/stages/active_source_stage.h"
15 #include "services/media/framework/stages/distributor_stage.h"
16 #include "services/media/framework/stages/lpcm_transform_stage.h"
17 #include "services/media/framework/stages/packet_transform_stage.h"
18 #include "services/media/framework/stages/stage.h"
19
20 namespace mojo {
21 namespace media {
22
23 //
24 // USAGE
25 //
26 // TODO(dalesat): Consider adding a suffix to Engine::Part/Input/Output to
27 // indicate that they're references.
28 // TODO(dalesat): Consider folding PrimeSinks into Prepare.
29 //
30 // Engine is a container for sources, sinks and transforms ('parts') connected
31 // in a graph. Engine::Part, Engine::Input and Engine::Output are all opaque
32 // references to parts and their inputs and outputs. Engine provides a variety
33 // of methods for adding and removing parts and for connecting inputs and
34 // outputs to form a graph.
35 //
36 // In addition to containing parts and representing their interconnection,
37 // Engine manages the coordinated operation of its constituent parts and
38 // transports media from part to part. The Prepare method prepares the graph
39 // for operation, and the PrimeSinks method tells the sinks in the graph to
40 // prime themselves. Any additional actions required to make the graph operate
41 // (such as manipulating a rate control interface) is out of scope.
42 //
43 // Parts added to the engine are referenced using shared pointers. The engine
44 // holds pointers to the parts it contains, and the application, in many cases,
45 // also holds pointers to the parts so it can call methods that are outside the
46 // engine's scope. When a part is added the Engine returns an Engine::Part
47 // object, which can be used to reference the part when the graph is modified.
48 // Engine::Part objects can be interrogated to retrieve inputs (as Engine::Input
49 // objects) and outputs (as Engine::Output objects).
50 //
51 // Some support is provided for modifying graphs that are operating. This
52 // capability isn't fully developed at the moment. Prepare(Part) is an example
53 // of a method provided for this purpose.
54 //
55 // Parts come in various flavors, defined by 'model' abstract classes. The
56 // current list of supported models is:
57 //
58 // ActiveSink - a sink that consumes packets asynchronously
59 // ActiveSource - a source that produces packets asynchronously
60 // LpcmMixer - a transform that mixes LPCM frames from multiple
61 // inputs and produces a single stream of LPCM frames
62 // via one output
63 // LpcmSource - a source that produces LPCM frames synchronously
64 // LpcmTransform - a synchronous transform with one LPCM input and
65 // one LPCM output
66 // MultiStreamPacketSource - a source that produces multiple streams of packets
67 // synchronously
68 // PacketTransform - a synchronous transform that consumes and produces
69 // packets via one input and one output
70 //
71 // Other models will be defined in the future as needed.
72 //
73
74 //
75 // DESIGN
76 //
77 // The Engine is implemented as a system of cooperating objects. Of those
78 // objects, only the engine itself is of relevance to code that uses Engine and
79 // to part implementations. The other objects are:
80 //
81 // Stage
82 // A stage hosts a single part. There are many subclasses of Stage, one for
83 // each supported part model. The stage's job is to implement the contract
84 // represented by the model so the parts that conform to the model can
85 // participate in the operation of the engine. Stages are uniform with respect
86 // to how they interact with engine. Engine::Part references a stage.
87 //
88 // StageInput
89 // A stage possesses zero or more StageInput instances. StageInput objects
90 // implement the supply of media into the stage and demand for media signalled
91 // upstream. StageInputs receive media from StageOutputs in the form of packets
92 // (type Packet). LpcmStageInput is a subclass of StageInput that interoperates
93 // with LpcmStageInputs in a way that provides optimizations relavant to LPCM
94 // audio media. Engine::Input references a StageInput.
95 //
96 // StageOutput
97 // A stage possesses zero or more StageOutput instances. StageOutput objects
98 // implement the supply of media output of the stage to a downstream input and
99 // demand for media signalled from that input. LpcmStageOutput implements
100 // optimized LPCM flow. Engine::Output references a StageOutput.
101 //
102 // Engine uses a 'work list' algorithm to operate the contained graph. The
103 // engine has a backlog of stages that need to be updated. To advance the
104 // operation of the graph, the engine removes a stage from the backlog and calls
105 // the stage's Update method. The Stage::Update may cause stages to be added
106 // synchronously to the the backlog. This procedure continues until the backlog
107 // is empty.
108 //
109 // Stage::Update is the stage's opportunity to react to the supply of new media
110 // via its inputs and the signalling of new demand via its outputs. During
111 // Update, the stage does whatever work it can with the current supply and
112 // demand, possibly supplying media downstream through its outputs and/or
113 // signalling new demand via its inputs. When a stage supplies media through
114 // an output, the downstream stage is added to the backlog. When a stage updates
115 // its demand through an input, the upstream stage is added to the backlog.
116 //
117 // The process starts when a stage invokes an update callback supplied by the
118 // engine. Stages that implement synchronous models never do this. Other stages
119 // do this as directed by the parts they host in accordance with their
120 // respective models. When a stage is ready to supply media or update demand
121 // due to external events, it calls the update callback. The engine responds by
122 // adding the stage to the backlog and then burning down the backlog. The stage
123 // that called back is updated first, and then all the work that can be done
124 // synchronously as a result of the external event is completed. In this way,
125 // the operation of the graph is driven by external events signalled through
126 // update callbacks.
127 //
128 // Currently, Engine uses an opportunistic threading model that only allows
129 // one thread to drive the backlog processing at any given time. The engine
130 // runs the processing on whatever thread enters it via an update callback.
131 // An engine employs a single lock that protects manipulation of the graph and
132 // processing of the backlog. Stage update methods are invoked with that lock
133 // taken. This arrangement implies the following constraints:
134 //
135 // 1) An update callback cannot be called synchronously with a Stage::Update
136 // call, because the lock is taken for the duration of Update, and the
137 // callback will take the lock. Update callbacks may occur during Engine::
138 // PrimeSinks, and they generally will.
139 // 2) A stage cannot update supply/demand on its inputs/outputs except during
140 // Update. When an external event occurs, the stage and/or its hosted part
141 // should update its internal state as required and invoke the callback.
142 // During the subsequent Update, the stage and/or part can then update
143 // supply and/or demand.
144 // 3) Threads used to call update callbacks must be suitable for operating the
145 // engine. There is currently no affordance for processing other tasks on
146 // a thread while the callback is running. A callback may run for a long
147 // time, depending on how much work needs to be done.
148 // 4) Parts cannot rely on being called back on the same thread on which they
149 // invoke update callbacks. This may require additional synchronization and
150 // thread transitions inside the part.
151 // 5) If a part takes a lock of its own during Update, it should not also hold
152 // that lock when calling the update callback. Doing so will result in
153 // deadlock.
154 //
155 // NOTE: Allocators, not otherwise discussed here, are required to be thread-
156 // safe so that packets may be cleaned up on any thread.
157 //
158 // In the future, the threading model will be enhanced. Intended features
159 // include:
160 // 1) Support for multiple threads.
161 // 2) Marshalling update callbacks to a different thread.
162 //
163
164 // Host for a source, sink or transform.
165 class Engine {
166 public:
167 class Input;
168 class Output;
169
170 // Opaque Stage pointer used for graph building.
171 class Part {
172 public:
173 Part() : stage_(nullptr) {}
174
175 uint32_t input_count();
176 Input input(uint32_t index);
177 Input input();
178 uint32_t output_count();
179 Output output(uint32_t index);
180 Output output();
181 Part upstream_part(uint32_t index);
182 Part upstream_part();
183 Part downstream_part(uint32_t index);
184 Part downstream_part();
185
186 private:
187 explicit Part(Stage* stage) : stage_(stage) {}
188
189 explicit operator bool() const { return stage_ != nullptr; }
190
191 Stage* stage_;
192
193 friend Engine;
194 friend Input;
195 friend Output;
196 };
197
198 // Opaque StageInput pointer used for graph building.
199 class Input {
200 public:
201 Input() : stage_(nullptr), index_(0) {}
202
203 explicit operator bool() const { return stage_ != nullptr; }
204
205 Part part() { return Part(stage_); }
206
207 bool connected() {
208 DCHECK(stage_);
209 return stage_input().upstream_stage() != nullptr;
210 }
211
212 Part upstream_part() {
213 DCHECK(connected());
214 return Part(stage_input().upstream_stage());
215 }
216
217 private:
218 Input(Stage* stage, uint32_t index) :
219 stage_(stage), index_(index) {
220 DCHECK(stage_);
221 DCHECK(index_ < stage_->input_count());
222 }
223
224 StageInput& stage_input() {
225 DCHECK(stage_);
226 return stage_->input(index_);
227 }
228
229 Stage* stage_;
230 uint32_t index_;
231
232 friend Engine;
233 friend Part;
234 friend Output;
235 };
236
237 // Opaque StageOutput pointer used for graph building.
238 class Output {
239 public:
240 Output() : stage_(nullptr), index_(0) {}
241
242 explicit operator bool() const { return stage_ != nullptr; }
243
244 Part part() { return Part(stage_); }
245
246 bool connected() {
247 DCHECK(stage_);
248 return stage_output().downstream_stage() != nullptr;
249 }
250
251 Part downstream_part() {
252 DCHECK(connected());
253 return Part(stage_output().downstream_stage());
254 }
255
256 private:
257 Output(Stage* stage, uint32_t index) :
258 stage_(stage), index_(index) {
259 DCHECK(stage_);
260 DCHECK(index_ < stage_->output_count());
261 }
262
263 StageOutput& stage_output() {
264 DCHECK(stage_);
265 return stage_->output(index_);
266 }
267
268 Stage* stage_;
269 uint32_t index_;
270
271 friend Engine;
272 friend Part;
273 friend Input;
274 };
275
276 Engine();
277
278 ~Engine();
279
280 // Adds a part to the engine.
281 template<typename T, typename TBase>
282 Part Add(SharedPtr<T, TBase> t) {
283 DCHECK(t);
284 return Add(CreateStage(std::shared_ptr<TBase>(t)));
285 }
286
287 // Removes a part from the engine after disconnecting it from other parts.
288 void RemovePart(Part part);
289
290 // Connects an output connector to an input connector. Returns the dowstream
291 // part.
292 Part Connect(Output output, Input input);
293
294 // Connects a part with exactly one output to a part with exactly one input.
295 // Returns the downstream part.
296 Part ConnectParts(Part upstream_part, Part downstream_part);
297
298 // Connects an output connector to a part that has exactly one input. Returns
299 // the downstream part.
300 Part ConnectOutputToPart(Output output, Part downstream_part);
301
302 // Connects a part with exactly one output to an input connector. Returns the
303 // downstream part.
304 Part ConnectPartToInput(Part upstream_part, Input input);
305
306 // Disconnects an output connector and the input connector to which it's
307 // connected.
308 void DisconnectOutput(Output output);
309
310 // Disconnects an input connector and the output connector to which it's
311 // connected.
312 void DisconnectInput(Input input);
313
314 // Disconnects and removes part and everything connected to it.
315 void RemovePartsConnectedToPart(Part part);
316
317 // Disconnects and removes everything connected to output.
318 void RemovePartsConnectedToOutput(Output output);
319
320 // Disconnects and removes everything connected to input.
321 void RemovePartsConnectedToInput(Input input);
322
323 // Adds all the parts in t (which must all have one input and one output) and
324 // connects them in sequence to the output connector. Returns the output
325 // connector of the last part or the output parameter if it is empty.
326 template<typename T>
327 Output AddAndConnectAll(
328 Output output,
329 const T& t) {
330 for (auto& element : t) {
331 Part part = Add(CreateStage(element));
332 Connect(output, part.input());
333 output = part.output();
334 }
335 return output;
336 }
337
338 // Prepares the engine.
339 void Prepare();
340
341 // Prepares the part and everything upstream of it. This method is used to
342 // prepare subgraphs added when the rest of the graph is already prepared.
343 void Prepare(Part part);
344
345 // Primes all the sinks in the graph.
346 void PrimeSinks();
347
348 // Removes all parts from the engine.
349 void Reset();
350
351 private:
352 // Adds a stage to the engine.
353 Part Add(Stage* stage);
354
355 // Disconnects an output.
356 void DisconnectOutputUnsafe(Stage* stage, uint32_t index);
357
358 // Disconnects an input.
359 void DisconnectInputUnsafe(Stage* stage, uint32_t index);
360
361 // Removes a stage.
362 void RemoveUnsafe(Stage* stage);
363
364 // Creates a stage from a source, sink or transform. A specialization of this
365 // template is defined for each type of source, sink or transform that can be
366 // added to the engine.
367 template<typename T>
368 static Stage* CreateStage(std::shared_ptr<T> t);
369
370 // CreateStage template specialization for MultiStreamPacketSource.
371 static Stage* CreateStage(MultiStreamPacketSourcePtr source);
372
373 // CreateStage template specialization for PacketTransform.
374 static Stage* CreateStage(PacketTransformPtr transform);
375
376 // CreateStage template specialization for ActiveSource.
377 static Stage* CreateStage(ActiveSourcePtr source);
378
379 // CreateStage template specialization for ActiveSink.
380 static Stage* CreateStage(ActiveSinkPtr sink);
381
382 // CreateStage template specialization for LpcmTransform.
383 static Stage* CreateStage(LpcmTransformPtr transform);
384
385 // Prepares a stage if all its downstream stages are prepared.
386 void MaybePrepareUnsafe(Stage* stage);
387
388 // Processes the entire backlog.
389 void UpdateUnsafe();
390
391 // Performs processing for a single stage, updating the backlog accordingly.
392 void UpdateUnsafe(Stage *stage);
393
394 // Pushes the stage to the supply backlog if it isn't already there.
395 void PushToSupplyBacklogUnsafe(Stage* stage);
396
397 // Pushes the stage to the demand backlog if it isn't already there.
398 void PushToDemandBacklogUnsafe(Stage* stage);
399
400 // Pops a stage from the supply backlog and returns it or returns nullptr if
401 // the supply backlog is empty.
402 Stage* PopFromSupplyBacklogUnsafe();
403
404 // Pops a stage from the demand backlog and returns it or returns nullptr if
405 // the demand backlog is empty.
406 Stage* PopFromDemandBacklogUnsafe();
407
408 mutable base::Lock lock_;
409 std::list<Stage*> stages_;
410 std::list<Stage*> sources_;
411 std::list<Stage*> sinks_;
412 // supply_backlog_ contains pointers to all the stages that have been supplied
413 // (packets or frames) but have not been updated since. demand_backlog_ does
414 // the same for demand. The use of queue vs stack here is a guess as to what
415 // will yield the best results. It's possible that only a single backlog is
416 // required.
417 // TODO(dalesat): Determine the best ordering and implement it.
418 std::queue<Stage*> supply_backlog_;
419 std::stack<Stage*> demand_backlog_;
420 Stage::UpdateCallback update_function_;
421 bool packets_produced_;
422
423 friend class StageInput;
424 friend class StageOutput;
425 friend class LpcmStageInput;
426 friend class LpcmStageOutput;
427 };
428
429 } // namespace media
430 } // namespace mojo
431
432 #endif // SERVICES_MEDIA_FRAMEWORK_ENGINE_ENGINE_H_
OLDNEW
« no previous file with comments | « services/media/framework/conversion_pipeline_builder.cc ('k') | services/media/framework/engine.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698