Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(307)

Side by Side Diff: services/media/framework/engine.h

Issue 1577953002: Motown in-proc streaming framework used to implement media services. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Fixes based on feedback. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_H_
6 #define SERVICES_MEDIA_FRAMEWORK_ENGINE_H_
7
8 #include <list>
9 #include <queue>
10 #include <stack>
11
12 #include "base/synchronization/lock.h"
13 #include "services/media/framework/stages/active_sink_stage.h"
14 #include "services/media/framework/stages/active_source_stage.h"
15 #include "services/media/framework/stages/distributor_stage.h"
16 #include "services/media/framework/stages/lpcm_transform_stage.h"
17 #include "services/media/framework/stages/packet_transform_stage.h"
18 #include "services/media/framework/stages/stage.h"
19
20 namespace mojo {
21 namespace media {
22
23 //
24 // USAGE
25 //
26 // Engine is a container for sources, sinks and transforms ('parts') connected
27 // in a graph. Engine::Part, Engine::Input and Engine::Output are all opaque
28 // references to parts and their inputs and outputs. Engine provides a variety
jeffbrown 2016/02/02 05:35:46 Consider adding a suffix to the Part, Input, and O
dalesat 2016/02/02 21:46:38 Added a TODO.
29 // of methods for adding and removing parts and for connecting inputs and
30 // outputs to form a graph.
31 //
32 // In addition to containing parts and representing their interconnection,
33 // Engine manages the coordinated operation of its constituent parts and
34 // transports media from part to part. The Prepare method prepares the graph
35 // for operation, and the PrimeSinks method tells the sinks in the graph to
jeffbrown 2016/02/02 05:35:46 Should PrimeSinks be folded into Prepare?
dalesat 2016/02/02 21:46:38 That might be the right thing to do. Prepare trave
36 // prime themselves. Any additional actions required to make the graph operate
37 // (such as manipulating a rate control interface) is out of scope.
38 //
39 // Parts added to the engine are referenced using shared pointers. The engine
40 // holds pointers to the parts it contains, and the application, in many cases,
41 // also holds pointers to the parts so it can call methods that are outside the
42 // engine's scope. When a part is added the Engine returns an Engine::Part
43 // object, which can be used to reference the part when the graph is modified.
44 // Engine::Part objects can be interrogated to retrieve inputs (as Engine::Input
45 // objects) and outputs (as Engine::Output objects).
46 //
47 // Some support is provided for modifying graphs that are operating. This
48 // capability isn't fully developed at the moment. Prepare(Part) is an example
49 // of a method provided for this purpose.
50 //
51 // Parts come in various flavors, defined by 'model' abstract classes. The
52 // current list of supported models is:
53 //
54 // ActiveSink - a sink that consumes packets asynchronously
55 // ActiveSource - a source that produces packets asynchronously
56 // LpcmMixer - a transform that mixes LPCM frames from multiple
57 // inputs and produces a single stream of LPCM frames
58 // via one output
59 // LpcmSource - a source that produces LPCM frames synchronously
60 // LpcmTransform - a synchronous transform with one LPCM input and
61 // one LPCM output
62 // MultiStreamPacketSource - a source that produces multiple streams of packets
63 // synchronously
64 // PacketTransform - a synchronous transform that consumes and produces
65 // packets via one input and one output
66 //
67 // Other models will be defined in the future as needed.
68 //
69
70 //
71 // DESIGN
72 //
73 // The Engine is implemented as a system of cooperating objects. Of those
74 // objects, only the engine itself is of relevance to code that uses Engine and
75 // to part implementations. The other objects are:
76 //
77 // Stage
78 // A stage hosts a single part. There are many subclasses of Stage, one for
79 // each supported part model. The stage's job is to implement the contract
80 // represented by the model so the parts that conform to the model can
81 // participate in the operation of the engine. Stages are uniform with respect
82 // to how they interact with engine. Engine::Part references a stage.
83 //
84 // StageInput
85 // A stage possesses zero or more StageInput instances. StageInput objects
86 // implement the supply of media into the stage and demand for media signalled
87 // upstream. StageInputs recieve media from StageOutputs in the form of packets
jeffbrown 2016/02/02 05:35:46 "receive"
dalesat 2016/02/02 21:46:38 Done.
88 // (type Packet). LpcmStageInput is a subclass of StageInput that interoperates
89 // with LpcmStageInputs in a way that provides optimizations relavant to LPCM
90 // audio media. Engine::Input references a StageInput.
91 //
92 // StageOutput
93 // A stage possesses zero or more StageOutput instances. StageOutput objects
94 // implement the supply of media output of the stage to a downstream input and
95 // demand for media signalled from that input. LpcmStageOutput implements
96 // optimized LPCM flow. Engine::Output references a StageOutput.
97 //
98 // Engine uses a 'work list' algorithm to operate the contained graph. The
99 // engine has a backlog of stages that need to be updated. To advance the
100 // operation of the graph, the engine removes a stage from the backlog and calls
101 // the stage's Update method. The Stage::Update may cause stages to be added
102 // synchronously to the the backlog. This procedure continues until the backlog
103 // is empty.
104 //
105 // Stage::Update is the stage's opportunity to react to the supply of new media
106 // via its inputs and the signalling of new demand via its outputs. During
107 // Update, the stage does whatever work it can with the current supply and
108 // demand, possibly supplying media downstream through its outputs and/or
109 // signalling new demand via its inputs. When a stage supplies media through
110 // an output, the downstream stage is added to the backlog. When a stage updates
111 // its dwmand through an input, the upstream stage is added to the backlog.
112 //
113 // The process starts when a stage invokes an update callback supplied by the
114 // engine. Stages that implement synchronous models never do this. Other stages
115 // do this as directed by the parts they host in accordance with their
116 // respective models. When a stage is ready to supply media or update demand
117 // due to external events, it calls the update callback. The engine responds by
118 // adding the stage to the backlog and then burning down the backlog. The stage
119 // that called back is updated first, and then all the work that can be done
120 // synchronously as a result of the external event is completed. In this way,
121 // the operation of the graph is driven by external events signalled through
122 // update callbacks.
123 //
124 // Currently, Engine uses an opportunistic threading model that only allows
125 // one thread to drive the backlog processing at any given time. The engine
126 // runs the processing on whatever thread enters it via an update callback.
127 // An engine employs a single lock that protects manipulation of the graph and
128 // processing of the backlog. Stage update methods are invoked with that lock
129 // taken. This arrangement implies the following constraints:
130 //
131 // 1) An update callback cannot be called synchronously with a State::Update
132 // call, because the lock is taken for the duration of Update, and the
133 // callback will take the lock. Update callbacks may occur during Engine::
134 // PrimeSinks, and they generally will.
135 // 2) A stage cannot update supply/demand on its inputs/outputs except during
136 // Update. When an external event occurs, the stage and/or its hosted part
137 // should update its internal state as required and invoke the callback.
138 // During the subsequent Update, the stage and/or part can then update
139 // supply and/or demand.
140 // 3) Threads used to call update callbacks must be suitable for operating the
141 // engine. There is currently no affordance for processing other tasks on
142 // a thread while the callback is running. A callback may run for a long
143 // time, depending on how much work needs to be done.
144 // 4) Parts cannot rely on being called back on the same thread on which they
jeffbrown 2016/02/02 05:35:46 It might make sense to reorganize this around a wo
dalesat 2016/02/02 21:46:38 Yes, as discussed below, the plan is to update the
145 // invoke update callbacks. This may require additional synchronization and
146 // thread transitions inside the part.
147 // 5) If a part takes a lock of its own during Update, it should not also hold
148 // that lock when calling the update callback. Doing so will result in
149 // deadlock.
150 //
151 // NOTE: Allocators, not otherwise discussed here, are required to be thread-
152 // safe so that packets may be cleaned up on any thread.
153 //
154 // In the future, the threading model will be enhanced. Intended features
155 // include:
156 // 1) Support for multiple threads.
157 // 2) Marshalling update callbacks to a different thread.
158 //
159
160 // Host for a source, sink or transform.
161 class Engine {
162 public:
163 class Input;
164 class Output;
165
166 // Opaque Stage pointer used for graph building.
167 class Part {
168 public:
169 Part() : stage_(nullptr) {}
170
171 uint32_t input_count();
172 Input input(uint32_t index);
173 Input input();
174 uint32_t output_count();
175 Output output(uint32_t index);
176 Output output();
177 Part upstream_part(uint32_t index);
178 Part upstream_part();
179 Part downstream_part(uint32_t index);
180 Part downstream_part();
181
182 private:
183 explicit Part(Stage* stage) : stage_(stage) {}
184
185 explicit operator bool() const { return stage_ != nullptr; }
186
187 Stage* stage_;
188
189 friend Engine;
190 friend Input;
191 friend Output;
192 };
193
194 // Opaque StageInput pointer used for graph building.
195 class Input {
196 public:
197 Input() : stage_(nullptr), index_(0) {}
198
199 explicit operator bool() const { return stage_ != nullptr; }
200
201 Part part() { return Part(stage_); }
202
203 bool connected() {
204 DCHECK(stage_);
205 return stage_input().upstream_stage() != nullptr;
206 }
207
208 Part upstream_part() {
209 DCHECK(connected());
210 return Part(stage_input().upstream_stage());
211 }
212
213 private:
214 Input(Stage* stage, uint32_t index) :
215 stage_(stage), index_(index) {
216 DCHECK(stage_);
217 DCHECK(index_ < stage_->input_count());
218 }
219
220 StageInput& stage_input() {
221 DCHECK(stage_);
222 return stage_->input(index_);
223 }
224
225 Stage* stage_;
226 uint32_t index_;
227
228 friend Engine;
229 friend Part;
230 friend Output;
231 };
232
233 // Opaque StageOutput pointer used for graph building.
234 class Output {
235 public:
236 Output() : stage_(nullptr), index_(0) {}
237
238 explicit operator bool() const { return stage_ != nullptr; }
239
240 Part part() { return Part(stage_); }
241
242 bool connected() {
243 DCHECK(stage_);
244 return stage_output().downstream_stage() != nullptr;
245 }
246
247 Part downstream_part() {
248 DCHECK(connected());
249 return Part(stage_output().downstream_stage());
250 }
251
252 private:
253 Output(Stage* stage, uint32_t index) :
254 stage_(stage), index_(index) {
255 DCHECK(stage_);
256 DCHECK(index_ < stage_->output_count());
257 }
258
259 StageOutput& stage_output() {
260 DCHECK(stage_);
261 return stage_->output(index_);
262 }
263
264 Stage* stage_;
265 uint32_t index_;
266
267 friend Engine;
268 friend Part;
269 friend Input;
270 };
271
272 Engine();
273
274 ~Engine();
275
276 // Adds a part to the engine.
277 template<typename T, typename TBase>
278 Part Add(SharedPtr<T, TBase> t) {
279 DCHECK(t);
280 return Add(CreateStage(std::shared_ptr<TBase>(t)));
281 }
282
283 // Removes a part from the engine after disconnecting it from other parts.
284 void Remove(Part part);
285
286 // Connects an output connector to an input connector. Returns the dowstream
287 // part.
288 Part Connect(Output output, Input input);
289
290 // Connects a part with exactly one output to a part with exactly one input.
291 // Returns the downstream part.
292 Part Connect(Part upstream_part, Part downstream_part);
jeffbrown 2016/02/02 05:35:46 Is this just a convenience method?
dalesat 2016/02/02 21:46:38 Yes, as are quite a few of these.
293
294 // Connects an output connector to a part that has exactly one input. Returns
295 // the downstream part.
296 Part Connect(Output output, Part downstream_part);
297
298 // Connects a part with exactly one output to an input connector. Returns the
299 // downstream part.
300 Part Connect(Part upstream_part, Input input);
301
302 // Disconnects an output connector and the input connector to which it's
303 // connected.
304 void Disconnect(Output output);
305
306 // Disconnects an input connector and the output connector to which it's
307 // connected.
308 void Disconnect(Input input);
309
310 // Disconnects and removes part and everything connected to it.
311 void RemoveAll(Part part);
jeffbrown 2016/02/02 05:35:46 I'd recommend encoding some type information into
dalesat 2016/02/02 21:46:38 Done.
312
313 // Disconnects and removes everything connected to output.
314 void RemoveAll(Output output);
315
316 // Disconnects and removes everything connected to input.
317 void RemoveAll(Input input);
318
319 // Adds all the parts in t (which must all have one input and one output) and
320 // connects them in sequence to the output connector. Returns the output
321 // connector of the last part or the output parameter if it is empty.
322 template<typename T>
323 Output AddAndConnectAll(
324 Output output,
325 const T& t) {
326 for (auto& element : t) {
327 Part part = Add(CreateStage(element));
328 Connect(output, part.input());
329 output = part.output();
330 }
331 return output;
332 }
333
334 // Prepares the engine.
335 void Prepare();
336
337 // Prepares the part and everything upstream of it. This method is used to
338 // prepare subgraphs added when the rest of the graph is already prepared.
339 void Prepare(Part part);
340
341 // Primes all the sinks in the graph.
342 void PrimeSinks();
343
344 // Removes all parts from the engine.
345 void Reset();
346
347 private:
348 // Adds a stage to the engine.
349 Part Add(Stage* stage);
350
351 // Disconnects an output.
352 void DisconnectOutputUnsafe(Stage* stage, uint32_t index);
353
354 // Disconnects an input.
355 void DisconnectInputUnsafe(Stage* stage, uint32_t index);
356
357 // Removes a stage.
358 void RemoveUnsafe(Stage* stage);
359
360 // Creates a stage from a source, sink or transform. A specialization of this
361 // template is defined for each type of source, sink or transform that can be
362 // added to the engine.
363 template<typename T>
364 static Stage* CreateStage(std::shared_ptr<T> t);
jeffbrown 2016/02/02 05:35:46 It's not clear to me why these need to be defined
dalesat 2016/02/02 21:46:38 Not sure what you mean by "each type of source nee
365
366 // CreateStage template specialization for MultiStreamPacketSource.
367 static Stage* CreateStage(MultiStreamPacketSourcePtr source);
368
369 // CreateStage template specialization for PacketTransform.
370 static Stage* CreateStage(PacketTransformPtr transform);
371
372 // CreateStage template specialization for ActiveSource.
373 static Stage* CreateStage(ActiveSourcePtr source);
374
375 // CreateStage template specialization for ActiveSink.
376 static Stage* CreateStage(ActiveSinkPtr sink);
377
378 // CreateStage template specialization for LpcmTransform.
379 static Stage* CreateStage(LpcmTransformPtr transform);
380
381 // Prepares a stage.
382 void PrepareUnsafe(Stage* stage);
383
384 // Processes the entire backlog.
385 void UpdateUnsafe();
386
387 // Performs processing for a single stage, updating the backlog accordingly.
388 void UpdateUnsafe(Stage *stage);
389
390 // Pushes the stage to the supply backlog if it isn't already there.
391 void PushToSupplyBacklogUnsafe(Stage* stage);
392
393 // Pushes the stage to the demand backlog if it isn't already there.
394 void PushToDemandBacklogUnsafe(Stage* stage);
395
396 // Pops a stage from the supply backlog and returns it or returns nullptr if
397 // the supply backlog is empty.
398 Stage* PopFromSupplyBacklogUnsafe();
399
400 // Pops a stage from the demand backlog and returns it or returns nullptr if
401 // the demand backlog is empty.
402 Stage* PopFromDemandBacklogUnsafe();
403
404 mutable base::Lock lock_;
405 std::list<Stage*> stages_;
406 std::list<Stage*> sources_;
407 std::list<Stage*> sinks_;
408 // supply_backlog_ contains pointers to all the stages that have been supplied
409 // (packets or frames) but have not been updated since. demand_backlog_ does
410 // the same for demand. The use of queue vs stack here is a guess as to what
411 // will yield the best results. It's possible that only a single backlog is
412 // required.
413 // TODO(dalesat): Determine the best dataflow ordering and implement it.
414 std::queue<Stage*> supply_backlog_;
415 std::stack<Stage*> demand_backlog_;
416 Stage::UpdateCallback update_function_;
417 bool packets_produced_;
418
419 friend class StageInput;
420 friend class StageOutput;
421 friend class LpcmStageInput;
422 friend class LpcmStageOutput;
423 };
424
425 } // namespace media
426 } // namespace mojo
427
428 #endif // SERVICES_MEDIA_FRAMEWORK_ENGINE_ENGINE_H_
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698