Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: services/media/framework/engine.h

Issue 1678433002: Motown: Remove LPCM optimizations, fix prepare, add flush, add ActiveMultistreamSink model/stage (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Sync Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_H_ 5 #ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_H_
6 #define SERVICES_MEDIA_FRAMEWORK_ENGINE_H_ 6 #define SERVICES_MEDIA_FRAMEWORK_ENGINE_H_
7 7
8 #include <list> 8 #include <list>
9 #include <queue> 9 #include <queue>
10 #include <stack> 10 #include <stack>
11 #include <unordered_map>
11 12
12 #include "base/synchronization/lock.h" 13 #include "base/synchronization/lock.h"
13 #include "services/media/framework/stages/active_sink_stage.h" 14 #include "services/media/framework/refs.h"
14 #include "services/media/framework/stages/active_source_stage.h"
15 #include "services/media/framework/stages/distributor_stage.h"
16 #include "services/media/framework/stages/lpcm_transform_stage.h"
17 #include "services/media/framework/stages/packet_transform_stage.h"
18 #include "services/media/framework/stages/stage.h" 15 #include "services/media/framework/stages/stage.h"
19 16
20 namespace mojo { 17 namespace mojo {
21 namespace media { 18 namespace media {
22 19
23 // 20 //
24 // USAGE
25 //
26 // TODO(dalesat): Consider adding a suffix to Engine::Part/Input/Output to
27 // indicate that they're references.
28 // TODO(dalesat): Consider folding PrimeSinks into Prepare.
29 //
30 // Engine is a container for sources, sinks and transforms ('parts') connected
31 // in a graph. Engine::Part, Engine::Input and Engine::Output are all opaque
32 // references to parts and their inputs and outputs. Engine provides a variety
33 // of methods for adding and removing parts and for connecting inputs and
34 // outputs to form a graph.
35 //
36 // In addition to containing parts and representing their interconnection,
37 // Engine manages the coordinated operation of its constituent parts and
38 // transports media from part to part. The Prepare method prepares the graph
39 // for operation, and the PrimeSinks method tells the sinks in the graph to
40 // prime themselves. Any additional actions required to make the graph operate
41 // (such as manipulating a rate control interface) is out of scope.
42 //
43 // Parts added to the engine are referenced using shared pointers. The engine
44 // holds pointers to the parts it contains, and the application, in many cases,
45 // also holds pointers to the parts so it can call methods that are outside the
46 // engine's scope. When a part is added the Engine returns an Engine::Part
47 // object, which can be used to reference the part when the graph is modified.
48 // Engine::Part objects can be interrogated to retrieve inputs (as Engine::Input
49 // objects) and outputs (as Engine::Output objects).
50 //
51 // Some support is provided for modifying graphs that are operating. This
52 // capability isn't fully developed at the moment. Prepare(Part) is an example
53 // of a method provided for this purpose.
54 //
55 // Parts come in various flavors, defined by 'model' abstract classes. The
56 // current list of supported models is:
57 //
58 // ActiveSink - a sink that consumes packets asynchronously
59 // ActiveSource - a source that produces packets asynchronously
60 // LpcmMixer - a transform that mixes LPCM frames from multiple
61 // inputs and produces a single stream of LPCM frames
62 // via one output
63 // LpcmSource - a source that produces LPCM frames synchronously
64 // LpcmTransform - a synchronous transform with one LPCM input and
65 // one LPCM output
66 // MultiStreamPacketSource - a source that produces multiple streams of packets
67 // synchronously
68 // PacketTransform - a synchronous transform that consumes and produces
69 // packets via one input and one output
70 //
71 // Other models will be defined in the future as needed.
72 //
73
74 //
75 // DESIGN 21 // DESIGN
76 // 22 //
77 // The Engine is implemented as a system of cooperating objects. Of those 23 // Engine uses a 'work list' algorithm to operate the graph. The
78 // objects, only the engine itself is of relevance to code that uses Engine and
79 // to part implementations. The other objects are:
80 //
81 // Stage
82 // A stage hosts a single part. There are many subclasses of Stage, one for
83 // each supported part model. The stage's job is to implement the contract
84 // represented by the model so the parts that conform to the model can
85 // participate in the operation of the engine. Stages are uniform with respect
86 // to how they interact with engine. Engine::Part references a stage.
87 //
88 // StageInput
89 // A stage possesses zero or more StageInput instances. StageInput objects
90 // implement the supply of media into the stage and demand for media signalled
91 // upstream. StageInputs receive media from StageOutputs in the form of packets
92 // (type Packet). LpcmStageInput is a subclass of StageInput that interoperates
93 // with LpcmStageInputs in a way that provides optimizations relavant to LPCM
94 // audio media. Engine::Input references a StageInput.
95 //
96 // StageOutput
97 // A stage possesses zero or more StageOutput instances. StageOutput objects
98 // implement the supply of media output of the stage to a downstream input and
99 // demand for media signalled from that input. LpcmStageOutput implements
100 // optimized LPCM flow. Engine::Output references a StageOutput.
101 //
102 // Engine uses a 'work list' algorithm to operate the contained graph. The
103 // engine has a backlog of stages that need to be updated. To advance the 24 // engine has a backlog of stages that need to be updated. To advance the
104 // operation of the graph, the engine removes a stage from the backlog and calls 25 // operation of the graph, the engine removes a stage from the backlog and calls
105 // the stage's Update method. The Stage::Update may cause stages to be added 26 // the stage's Update method. The Stage::Update may cause stages to be added
106 // synchronously to the the backlog. This procedure continues until the backlog 27 // synchronously to the the backlog. This procedure continues until the backlog
107 // is empty. 28 // is empty.
108 // 29 //
109 // Stage::Update is the stage's opportunity to react to the supply of new media 30 // Stage::Update is the stage's opportunity to react to the supply of new media
110 // via its inputs and the signalling of new demand via its outputs. During 31 // via its inputs and the signalling of new demand via its outputs. During
111 // Update, the stage does whatever work it can with the current supply and 32 // Update, the stage does whatever work it can with the current supply and
112 // demand, possibly supplying media downstream through its outputs and/or 33 // demand, possibly supplying media downstream through its outputs and/or
(...skipping 23 matching lines...) Expand all
136 // call, because the lock is taken for the duration of Update, and the 57 // call, because the lock is taken for the duration of Update, and the
137 // callback will take the lock. Update callbacks may occur during Engine:: 58 // callback will take the lock. Update callbacks may occur during Engine::
138 // PrimeSinks, and they generally will. 59 // PrimeSinks, and they generally will.
139 // 2) A stage cannot update supply/demand on its inputs/outputs except during 60 // 2) A stage cannot update supply/demand on its inputs/outputs except during
140 // Update. When an external event occurs, the stage and/or its hosted part 61 // Update. When an external event occurs, the stage and/or its hosted part
141 // should update its internal state as required and invoke the callback. 62 // should update its internal state as required and invoke the callback.
142 // During the subsequent Update, the stage and/or part can then update 63 // During the subsequent Update, the stage and/or part can then update
143 // supply and/or demand. 64 // supply and/or demand.
144 // 3) Threads used to call update callbacks must be suitable for operating the 65 // 3) Threads used to call update callbacks must be suitable for operating the
145 // engine. There is currently no affordance for processing other tasks on 66 // engine. There is currently no affordance for processing other tasks on
146 // a thread while the callback is running. A callback may run for a long 67 // the thread while the callback is running. A callback may run for a long
147 // time, depending on how much work needs to be done. 68 // time, depending on how much work needs to be done.
148 // 4) Parts cannot rely on being called back on the same thread on which they 69 // 4) Parts cannot rely on being called back on the same thread on which they
149 // invoke update callbacks. This may require additional synchronization and 70 // invoke update callbacks. This may require additional synchronization and
150 // thread transitions inside the part. 71 // thread transitions inside the part.
151 // 5) If a part takes a lock of its own during Update, it should not also hold 72 // 5) If a part takes a lock of its own during Update, it should not also hold
152 // that lock when calling the update callback. Doing so will result in 73 // that lock when calling the update callback. Doing so will result in
153 // deadlock. 74 // deadlock.
154 // 75 //
155 // NOTE: Allocators, not otherwise discussed here, are required to be thread- 76 // NOTE: Allocators, not otherwise discussed here, are required to be thread-
156 // safe so that packets may be cleaned up on any thread. 77 // safe so that packets may be cleaned up on any thread.
157 // 78 //
158 // In the future, the threading model will be enhanced. Intended features 79 // In the future, the threading model will be enhanced. Intended features
159 // include: 80 // include:
160 // 1) Support for multiple threads. 81 // 1) Support for multiple threads.
161 // 2) Marshalling update callbacks to a different thread. 82 // 2) Marshalling update callbacks to a different thread.
162 // 83 //
163 84
164 // Host for a source, sink or transform. 85 // Manages operation of a Graph.
165 class Engine { 86 class Engine {
166 public: 87 public:
167 class Input;
168 class Output;
169
170 // Opaque Stage pointer used for graph building.
171 class Part {
172 public:
173 Part() : stage_(nullptr) {}
174
175 uint32_t input_count();
176 Input input(uint32_t index);
177 Input input();
178 uint32_t output_count();
179 Output output(uint32_t index);
180 Output output();
181 Part upstream_part(uint32_t index);
182 Part upstream_part();
183 Part downstream_part(uint32_t index);
184 Part downstream_part();
185
186 private:
187 explicit Part(Stage* stage) : stage_(stage) {}
188
189 explicit operator bool() const { return stage_ != nullptr; }
190
191 Stage* stage_;
192
193 friend Engine;
194 friend Input;
195 friend Output;
196 };
197
198 // Opaque StageInput pointer used for graph building.
199 class Input {
200 public:
201 Input() : stage_(nullptr), index_(0) {}
202
203 explicit operator bool() const { return stage_ != nullptr; }
204
205 Part part() { return Part(stage_); }
206
207 bool connected() {
208 DCHECK(stage_);
209 return stage_input().upstream_stage() != nullptr;
210 }
211
212 Part upstream_part() {
213 DCHECK(connected());
214 return Part(stage_input().upstream_stage());
215 }
216
217 private:
218 Input(Stage* stage, uint32_t index) :
219 stage_(stage), index_(index) {
220 DCHECK(stage_);
221 DCHECK(index_ < stage_->input_count());
222 }
223
224 StageInput& stage_input() {
225 DCHECK(stage_);
226 return stage_->input(index_);
227 }
228
229 Stage* stage_;
230 uint32_t index_;
231
232 friend Engine;
233 friend Part;
234 friend Output;
235 };
236
237 // Opaque StageOutput pointer used for graph building.
238 class Output {
239 public:
240 Output() : stage_(nullptr), index_(0) {}
241
242 explicit operator bool() const { return stage_ != nullptr; }
243
244 Part part() { return Part(stage_); }
245
246 bool connected() {
247 DCHECK(stage_);
248 return stage_output().downstream_stage() != nullptr;
249 }
250
251 Part downstream_part() {
252 DCHECK(connected());
253 return Part(stage_output().downstream_stage());
254 }
255
256 private:
257 Output(Stage* stage, uint32_t index) :
258 stage_(stage), index_(index) {
259 DCHECK(stage_);
260 DCHECK(index_ < stage_->output_count());
261 }
262
263 StageOutput& stage_output() {
264 DCHECK(stage_);
265 return stage_->output(index_);
266 }
267
268 Stage* stage_;
269 uint32_t index_;
270
271 friend Engine;
272 friend Part;
273 friend Input;
274 };
275
276 Engine(); 88 Engine();
277 89
278 ~Engine(); 90 ~Engine();
279 91
280 // Adds a part to the engine. 92 // Prepares the input and the subgraph upstream of it.
281 template<typename T, typename TBase> 93 void PrepareInput(const InputRef& input_ref);
282 Part Add(SharedPtr<T, TBase> t) {
283 DCHECK(t);
284 return Add(CreateStage(std::shared_ptr<TBase>(t)));
285 }
286 94
287 // Removes a part from the engine after disconnecting it from other parts. 95 // Unprepares the input and the subgraph upstream of it.
288 void RemovePart(Part part); 96 void UnprepareInput(const InputRef& input_ref);
289 97
290 // Connects an output connector to an input connector. Returns the dowstream 98 // Flushes the output and the subgraph downstream of it.
291 // part. 99 void FlushOutput(const OutputRef& output_ref);
292 Part Connect(Output output, Input input);
293 100
294 // Connects a part with exactly one output to a part with exactly one input. 101 // Queues the stage for update and winds down the backlog.
295 // Returns the downstream part. 102 void RequestUpdate(Stage* stage);
296 Part ConnectParts(Part upstream_part, Part downstream_part);
297 103
298 // Connects an output connector to a part that has exactly one input. Returns 104 // Pushes the stage to the supply backlog if it isn't already there.
299 // the downstream part. 105 void PushToSupplyBacklog(Stage* stage);
300 Part ConnectOutputToPart(Output output, Part downstream_part);
301 106
302 // Connects a part with exactly one output to an input connector. Returns the 107 // Pushes the stage to the demand backlog if it isn't already there.
303 // downstream part. 108 void PushToDemandBacklog(Stage* stage);
304 Part ConnectPartToInput(Part upstream_part, Input input);
305
306 // Disconnects an output connector and the input connector to which it's
307 // connected.
308 void DisconnectOutput(Output output);
309
310 // Disconnects an input connector and the output connector to which it's
311 // connected.
312 void DisconnectInput(Input input);
313
314 // Disconnects and removes part and everything connected to it.
315 void RemovePartsConnectedToPart(Part part);
316
317 // Disconnects and removes everything connected to output.
318 void RemovePartsConnectedToOutput(Output output);
319
320 // Disconnects and removes everything connected to input.
321 void RemovePartsConnectedToInput(Input input);
322
323 // Adds all the parts in t (which must all have one input and one output) and
324 // connects them in sequence to the output connector. Returns the output
325 // connector of the last part or the output parameter if it is empty.
326 template<typename T>
327 Output AddAndConnectAll(
328 Output output,
329 const T& t) {
330 for (auto& element : t) {
331 Part part = Add(CreateStage(element));
332 Connect(output, part.input());
333 output = part.output();
334 }
335 return output;
336 }
337
338 // Prepares the engine.
339 void Prepare();
340
341 // Prepares the part and everything upstream of it. This method is used to
342 // prepare subgraphs added when the rest of the graph is already prepared.
343 void Prepare(Part part);
344
345 // Primes all the sinks in the graph.
346 void PrimeSinks();
347
348 // Removes all parts from the engine.
349 void Reset();
350 109
351 private: 110 private:
352 // Adds a stage to the engine. 111 using UpstreamVisitor = std::function<void(
353 Part Add(Stage* stage); 112 const InputRef& input,
113 const OutputRef& output,
114 const Stage::UpstreamCallback& callback)>;
115 using DownstreamVisitor = std::function<void(
116 const OutputRef& output,
117 const InputRef& input,
118 const Stage::DownstreamCallback& callback)>;
354 119
355 // Disconnects an output. 120 void VisitUpstream(
356 void DisconnectOutputUnsafe(Stage* stage, uint32_t index); 121 const InputRef& input,
122 const UpstreamVisitor& vistor);
357 123
358 // Disconnects an input. 124 void VisitDownstream(
359 void DisconnectInputUnsafe(Stage* stage, uint32_t index); 125 const OutputRef& output,
360 126 const DownstreamVisitor& vistor);
361 // Removes a stage.
362 void RemoveUnsafe(Stage* stage);
363
364 // Creates a stage from a source, sink or transform. A specialization of this
365 // template is defined for each type of source, sink or transform that can be
366 // added to the engine.
367 template<typename T>
368 static Stage* CreateStage(std::shared_ptr<T> t);
369
370 // CreateStage template specialization for MultiStreamPacketSource.
371 static Stage* CreateStage(MultiStreamPacketSourcePtr source);
372
373 // CreateStage template specialization for PacketTransform.
374 static Stage* CreateStage(PacketTransformPtr transform);
375
376 // CreateStage template specialization for ActiveSource.
377 static Stage* CreateStage(ActiveSourcePtr source);
378
379 // CreateStage template specialization for ActiveSink.
380 static Stage* CreateStage(ActiveSinkPtr sink);
381
382 // CreateStage template specialization for LpcmTransform.
383 static Stage* CreateStage(LpcmTransformPtr transform);
384
385 // Prepares a stage if all its downstream stages are prepared.
386 void MaybePrepareUnsafe(Stage* stage);
387 127
388 // Processes the entire backlog. 128 // Processes the entire backlog.
389 void UpdateUnsafe(); 129 void Update();
390 130
391 // Performs processing for a single stage, updating the backlog accordingly. 131 // Performs processing for a single stage, updating the backlog accordingly.
392 void UpdateUnsafe(Stage *stage); 132 void Update(Stage *stage);
393
394 // Pushes the stage to the supply backlog if it isn't already there.
395 void PushToSupplyBacklogUnsafe(Stage* stage);
396
397 // Pushes the stage to the demand backlog if it isn't already there.
398 void PushToDemandBacklogUnsafe(Stage* stage);
399 133
400 // Pops a stage from the supply backlog and returns it or returns nullptr if 134 // Pops a stage from the supply backlog and returns it or returns nullptr if
401 // the supply backlog is empty. 135 // the supply backlog is empty.
402 Stage* PopFromSupplyBacklogUnsafe(); 136 Stage* PopFromSupplyBacklog();
403 137
404 // Pops a stage from the demand backlog and returns it or returns nullptr if 138 // Pops a stage from the demand backlog and returns it or returns nullptr if
405 // the demand backlog is empty. 139 // the demand backlog is empty.
406 Stage* PopFromDemandBacklogUnsafe(); 140 Stage* PopFromDemandBacklog();
407 141
408 mutable base::Lock lock_; 142 mutable base::Lock lock_;
409 std::list<Stage*> stages_;
410 std::list<Stage*> sources_;
411 std::list<Stage*> sinks_;
412 // supply_backlog_ contains pointers to all the stages that have been supplied 143 // supply_backlog_ contains pointers to all the stages that have been supplied
413 // (packets or frames) but have not been updated since. demand_backlog_ does 144 // (packets or frames) but have not been updated since. demand_backlog_ does
414 // the same for demand. The use of queue vs stack here is a guess as to what 145 // the same for demand. The use of queue vs stack here is a guess as to what
415 // will yield the best results. It's possible that only a single backlog is 146 // will yield the best results. It's possible that only a single backlog is
416 // required. 147 // required.
417 // TODO(dalesat): Determine the best ordering and implement it. 148 // TODO(dalesat): Determine the best ordering and implement it.
418 std::queue<Stage*> supply_backlog_; 149 std::queue<Stage*> supply_backlog_;
419 std::stack<Stage*> demand_backlog_; 150 std::stack<Stage*> demand_backlog_;
420 Stage::UpdateCallback update_function_;
421 bool packets_produced_; 151 bool packets_produced_;
422
423 friend class StageInput;
424 friend class StageOutput;
425 friend class LpcmStageInput;
426 friend class LpcmStageOutput;
427 }; 152 };
428 153
429 } // namespace media 154 } // namespace media
430 } // namespace mojo 155 } // namespace mojo
431 156
432 #endif // SERVICES_MEDIA_FRAMEWORK_ENGINE_ENGINE_H_ 157 #endif // SERVICES_MEDIA_FRAMEWORK_ENGINE_H_
OLDNEW
« no previous file with comments | « services/media/framework/conversion_pipeline_builder.cc ('k') | services/media/framework/engine.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698