Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(137)

Side by Side Diff: services/media/framework/engine.cc

Issue 1577953002: Motown in-proc streaming framework used to implement media services. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Changed lpcm input/output to use packets for supplying frames. Some name changes. Synced to master. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "services/media/framework/engine.h"
6
7 namespace mojo {
8 namespace media {
9
10 uint32_t Engine::Part::input_count() {
11 DCHECK(stage_ != nullptr);
12 return stage_->input_count();
13 }
14
15 Engine::Input Engine::Part::input(uint32_t index) {
16 DCHECK(stage_ != nullptr && index < stage_->input_count());
17 return Input(stage_, index);
18 }
19
20 Engine::Input Engine::Part::input() {
21 DCHECK(stage_ != nullptr && stage_->input_count() == 1);
22 return Input(stage_, 0);
23 }
24
25 uint32_t Engine::Part::output_count() {
26 DCHECK(stage_ != nullptr);
27 return stage_->output_count();
28 }
29
30 Engine::Output Engine::Part::output(uint32_t index) {
31 DCHECK(stage_ != nullptr && index < stage_->output_count());
32 return Output(stage_, index);
33 }
34
35 Engine::Output Engine::Part::output() {
36 DCHECK(stage_ != nullptr && stage_->output_count() == 1);
37 return Output(stage_, 0);
38 }
39
40 Engine::Part Engine::Part::upstream_part(uint32_t index) {
41 DCHECK(stage_ != nullptr && index < stage_->input_count());
42 return Part(stage_->input(index).upstream_stage());
43 }
44
45 Engine::Part Engine::Part::upstream_part() {
46 DCHECK(stage_ != nullptr && stage_->input_count() == 1);
47 return Part(stage_->input(0).upstream_stage());
48 }
49
50 Engine::Part Engine::Part::downstream_part(uint32_t index) {
51 DCHECK(stage_ != nullptr && index < stage_->output_count());
52 return Part(stage_->output(index).downstream_stage());
53 }
54
55 Engine::Part Engine::Part::downstream_part() {
56 DCHECK(stage_ != nullptr && stage_->output_count() == 1);
57 return Part(stage_->output(0).downstream_stage());
58 }
59
60 Engine::Engine() {
61 update_function_ = [this](Stage* stage) {
62 DCHECK(stage);
63 base::AutoLock lock(lock_);
64 UpdateUnsafe(stage);
65 UpdateUnsafe();
66 };
67 }
68
69 Engine::~Engine() {
70 base::AutoLock lock(lock_);
71 }
72
73 void Engine::Remove(Part part) {
74 DCHECK(part);
75 base::AutoLock lock(lock_);
76 RemoveUnsafe(part.stage_);
77 }
78
79 Engine::Part Engine::Connect(
80 Output output,
81 Input input) {
82 DCHECK(output);
83 DCHECK(input);
84
85 if (output.connected()) {
86 Disconnect(output);
87 }
88 if (input.connected()) {
89 Disconnect(input);
90 }
91
92 output.stage_output().connect(
93 input.stage_,
94 input.index_);
95
96 input.stage_input().connect(
97 output.stage_,
98 output.index_);
99
100 return input.part();
101 }
102
103 Engine::Part Engine::Connect(Part upstream_part, Part downstream_part) {
104 DCHECK(upstream_part);
105 DCHECK(downstream_part);
106 Connect(upstream_part.output(), downstream_part.input());
107 return downstream_part;
108 }
109
110 Engine::Part Engine::Connect(
111 Output output,
112 Part downstream_part) {
113 DCHECK(output);
114 DCHECK(downstream_part);
115 Connect(output, downstream_part.input());
116 return downstream_part;
117 }
118
119 Engine::Part Engine::Connect(Part upstream_part, Input input) {
120 DCHECK(upstream_part);
121 DCHECK(input);
122 Connect(upstream_part.output(), input);
123 return input.part();
124 }
125
126 void Engine::Disconnect(Output output) {
127 DCHECK(output);
128
129 base::AutoLock lock(lock_);
130 DisconnectOutputUnsafe(output.stage_, output.index_);
131 }
132
133 void Engine::Disconnect(Input input) {
134 DCHECK(input);
135
136 base::AutoLock lock(lock_);
137 DisconnectInputUnsafe(input.stage_, input.index_);
138 }
139
140 void Engine::RemoveAll(Part part) {
141 DCHECK(part);
142
143 base::AutoLock lock(lock_);
144
145 std::deque<Part> to_remove { part };
146
147 while (!to_remove.empty()) {
148 Part part = to_remove.front();
149 to_remove.pop_front();
150
151 for (uint32_t i = 0; i < part.input_count(); ++i) {
152 to_remove.push_back(part.upstream_part(i));
153 }
154
155 for (uint32_t i = 0; i < part.output_count(); ++i) {
156 to_remove.push_back(part.downstream_part(i));
157 }
158
159 RemoveUnsafe(part.stage_);
160 }
161 }
162
163 void Engine::RemoveAll(Output output) {
164 DCHECK(output);
165
166 if (!output.connected()) {
167 return;
168 }
169
170 Part downstream_part = output.downstream_part();
171 Disconnect(output);
172 RemoveAll(downstream_part);
173 }
174
175 void Engine::RemoveAll(Input input) {
176 DCHECK(input);
177
178 if (!input.connected()) {
179 return;
180 }
181
182 Part upstream_part = input.upstream_part();
183 Disconnect(input);
184 RemoveAll(upstream_part);
185 }
186
187 void Engine::Prepare() {
188 base::AutoLock lock(lock_);
189 for (Stage* sink : sinks_) {
190 sink->Prepare(update_function_);
191 sink->prepared_ = true;
192 uint32_t input_count = sink->input_count();
193 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
194 PrepareUnsafe(sink->input(input_index).upstream_stage());
195 }
196 }
197 }
198
199 void Engine::Prepare(Part part) {
200 DCHECK(part);
201 base::AutoLock lock(lock_);
202 PrepareUnsafe(part.stage_);
203 }
204
205 void Engine::PrimeSinks() {
206 lock_.Acquire();
207 std::list<Stage*> sinks(sinks_);
208 lock_.Release();
209
210 for (Stage* sink : sinks) {
211 sink->Prime();
212 }
213 }
214
215 void Engine::reset() {
216 base::AutoLock lock(lock_);
217 supply_backlog_.clear();
218 demand_backlog_.clear();
219 sources_.clear();
220 sinks_.clear();
221 stages_.clear();
222 }
223
224 void Engine::PushToSupplyBacklogUnsafe(Stage* stage) {
225 lock_.AssertAcquired();
226
227 DCHECK(stage);
228 packets_produced_ = true;
229 if (!stage->in_supply_backlog_) {
230 // FIFO
231 supply_backlog_.push_back(stage);
232 stage->in_supply_backlog_ = true;
233 }
234 }
235
236 void Engine::PushToDemandBacklogUnsafe(Stage* stage) {
237 lock_.AssertAcquired();
238
239 DCHECK(stage);
240 if (!stage->in_demand_backlog_) {
241 // LIFO
242 demand_backlog_.push_front(stage);
243 stage->in_demand_backlog_ = true;
244 }
245 }
246
247 Engine::Part Engine::Add(Stage* stage) {
248 base::AutoLock lock(lock_);
249
250 stages_.push_back(std::unique_ptr<Stage>(stage));
251 if (stage->input_count() == 0) {
252 sources_.push_back(stage);
253 }
254 if (stage->output_count() == 0) {
255 sinks_.push_back(stage);
256 }
257 return Part(stage);
258 }
259
260 void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) {
261 DCHECK(stage);
262 DCHECK(index < stage->output_count());
263
264 lock_.AssertAcquired();
265
266 StageOutput& stage_output = stage->output(index);
267
268 if (stage_output.downstream_stage() == nullptr) {
269 return;
270 }
271
272 stage_output.mate().connect(nullptr, 0);
273 stage_output.connect(nullptr, 0);
274 }
275
276 void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) {
277 DCHECK(stage);
278 DCHECK(index < stage->input_count());
279
280 lock_.AssertAcquired();
281
282 StageInput& stage_input = stage->input(index);
283
284 if (stage_input.upstream_stage() == nullptr) {
285 return;
286 }
287
288 stage_input.mate().connect(nullptr, 0);
289 stage_input.connect(nullptr, 0);
290 }
291
292 void Engine::RemoveUnsafe(Stage* stage) {
293 DCHECK(stage);
294
295 lock_.AssertAcquired();
296
297 uint32_t input_count = stage->input_count();
298 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
299 if (stage->input(input_index).connected()) {
300 DisconnectInputUnsafe(stage, input_index);
301 }
302 }
303
304 uint32_t output_count = stage->output_count();
305 for (uint32_t output_index = 0; output_index < output_count; output_index++) {
306 if (stage->output(output_index).connected()) {
307 DisconnectOutputUnsafe(stage, output_index);
308 }
309 }
310
311 sources_.remove(stage);
312 sinks_.remove(stage);
313
314 std::unique_ptr<Stage> to_remove(stage); // Release this duplicate unique_ptr!
315 stages_.remove(to_remove);
316 to_remove.release(); // OK, it's released!
317 }
318
319 // static
320 Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) {
321 return new DistributorStage(source);
322 }
323
324 // static
325 Stage* Engine::CreateStage(PacketTransformPtr transform) {
326 return new PacketTransformStage(transform);
327 }
328
329 // static
330 Stage* Engine::CreateStage(ActiveSourcePtr source) {
331 return new ActiveSourceStage(source);
332 }
333
334 // static
335 Stage* Engine::CreateStage(ActiveSinkPtr sink) {
336 return new ActiveSinkStage(sink);
337 }
338
339 // static
340 Stage* Engine::CreateStage(LpcmTransformPtr transform) {
341 return new LpcmTransformStage(transform);
342 }
343
344 void Engine::PrepareUnsafe(Stage* stage) {
345 lock_.AssertAcquired();
346
347 if (stage == nullptr || stage->prepared_) {
348 return;
349 }
350
351 stage->Prepare(update_function_);
352 stage->prepared_ = true;
353
354 uint32_t input_count = stage->input_count();
355 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
356 PrepareUnsafe(stage->input(input_index).upstream_stage());
357 }
358 }
359
360 void Engine::UpdateUnsafe() {
361 lock_.AssertAcquired();
362
363 while (true) {
364 Stage* stage = PopFromSupplyBacklogUnsafe();
365 if (stage != nullptr) {
366 UpdateUnsafe(stage);
367 continue;
368 }
369
370 stage = PopFromDemandBacklogUnsafe();
371 if (stage != nullptr) {
372 UpdateUnsafe(stage);
373 continue;
374 }
375
376 break;
377 }
378 }
379
380 void Engine::UpdateUnsafe(Stage *stage) {
381 lock_.AssertAcquired();
382
383 DCHECK(stage);
384
385 packets_produced_ = false;
386
387 stage->Update(*this);
388
389 // If the stage produced packets, it may need to reevaluate demand later.
390 if (packets_produced_) {
391 PushToDemandBacklogUnsafe(stage);
392 }
393 }
394
395 Stage* Engine::PopFromSupplyBacklogUnsafe() {
396 lock_.AssertAcquired();
397
398 if (supply_backlog_.empty()) {
399 return nullptr;
400 }
401
402 Stage* stage = supply_backlog_.front();
403 supply_backlog_.pop_front();
404 DCHECK(stage->in_supply_backlog_);
405 stage->in_supply_backlog_ = false;
406 return stage;
407 }
408
409 Stage* Engine::PopFromDemandBacklogUnsafe() {
410 lock_.AssertAcquired();
411
412 if (demand_backlog_.empty()) {
413 return nullptr;
414 }
415
416 Stage* stage = demand_backlog_.front();
417 demand_backlog_.pop_front();
418 DCHECK(stage->in_demand_backlog_);
419 stage->in_demand_backlog_ = false;
420 return stage;
421 }
422
423 } // namespace media
424 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698