Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1489)

Side by Side Diff: services/media/framework/engine.cc

Issue 1577953002: Motown in-proc streaming framework used to implement media services. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Fixes based on feedback. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "services/media/framework/engine.h"
6
7 namespace mojo {
8 namespace media {
9
10 uint32_t Engine::Part::input_count() {
11 DCHECK(stage_ != nullptr);
12 return stage_->input_count();
13 }
14
15 Engine::Input Engine::Part::input(uint32_t index) {
16 DCHECK(stage_ != nullptr && index < stage_->input_count());
17 return Input(stage_, index);
18 }
19
20 Engine::Input Engine::Part::input() {
21 DCHECK(stage_ != nullptr && stage_->input_count() == 1);
22 return Input(stage_, 0);
23 }
24
25 uint32_t Engine::Part::output_count() {
26 DCHECK(stage_ != nullptr);
27 return stage_->output_count();
28 }
29
30 Engine::Output Engine::Part::output(uint32_t index) {
31 DCHECK(stage_ != nullptr && index < stage_->output_count());
32 return Output(stage_, index);
33 }
34
35 Engine::Output Engine::Part::output() {
36 DCHECK(stage_ != nullptr && stage_->output_count() == 1);
37 return Output(stage_, 0);
38 }
39
40 Engine::Part Engine::Part::upstream_part(uint32_t index) {
41 DCHECK(stage_ != nullptr && index < stage_->input_count());
42 return Part(stage_->input(index).upstream_stage());
43 }
44
45 Engine::Part Engine::Part::upstream_part() {
46 DCHECK(stage_ != nullptr && stage_->input_count() == 1);
47 return Part(stage_->input(0).upstream_stage());
48 }
49
50 Engine::Part Engine::Part::downstream_part(uint32_t index) {
51 DCHECK(stage_ != nullptr && index < stage_->output_count());
52 return Part(stage_->output(index).downstream_stage());
53 }
54
55 Engine::Part Engine::Part::downstream_part() {
56 DCHECK(stage_ != nullptr && stage_->output_count() == 1);
57 return Part(stage_->output(0).downstream_stage());
58 }
59
60 Engine::Engine() {
61 update_function_ = [this](Stage* stage) {
62 DCHECK(stage);
63 base::AutoLock lock(lock_);
64 UpdateUnsafe(stage);
65 UpdateUnsafe();
66 };
67 }
68
69 Engine::~Engine() {
70 Reset();
71 }
72
73 void Engine::Remove(Part part) {
74 DCHECK(part);
75 base::AutoLock lock(lock_);
76 RemoveUnsafe(part.stage_);
77 }
78
79 Engine::Part Engine::Connect(Output output, Input input) {
80 DCHECK(output);
81 DCHECK(input);
82
83 base::AutoLock lock(lock_);
84
85 if (output.connected()) {
86 DisconnectOutputUnsafe(output.stage_, output.index_);
87 }
88 if (input.connected()) {
89 DisconnectInputUnsafe(input.stage_, input.index_);
90 }
91
92 output.stage_output().connect(input.stage_, input.index_);
93 input.stage_input().connect(output.stage_, output.index_);
94
95 return input.part();
96 }
97
98 Engine::Part Engine::Connect(Part upstream_part, Part downstream_part) {
99 DCHECK(upstream_part);
100 DCHECK(downstream_part);
101 Connect(upstream_part.output(), downstream_part.input());
102 return downstream_part;
103 }
104
105 Engine::Part Engine::Connect(
106 Output output,
107 Part downstream_part) {
108 DCHECK(output);
109 DCHECK(downstream_part);
110 Connect(output, downstream_part.input());
111 return downstream_part;
112 }
113
114 Engine::Part Engine::Connect(Part upstream_part, Input input) {
115 DCHECK(upstream_part);
116 DCHECK(input);
117 Connect(upstream_part.output(), input);
118 return input.part();
119 }
120
121 void Engine::Disconnect(Output output) {
122 DCHECK(output);
123
124 base::AutoLock lock(lock_);
125 DisconnectOutputUnsafe(output.stage_, output.index_);
126 }
127
128 void Engine::Disconnect(Input input) {
129 DCHECK(input);
130
131 base::AutoLock lock(lock_);
132 DisconnectInputUnsafe(input.stage_, input.index_);
133 }
134
135 void Engine::RemoveAll(Part part) {
136 DCHECK(part);
137
138 base::AutoLock lock(lock_);
139
140 std::deque<Part> to_remove { part };
141
142 while (!to_remove.empty()) {
143 Part part = to_remove.front();
144 to_remove.pop_front();
145
146 for (uint32_t i = 0; i < part.input_count(); ++i) {
147 to_remove.push_back(part.upstream_part(i));
148 }
149
150 for (uint32_t i = 0; i < part.output_count(); ++i) {
151 to_remove.push_back(part.downstream_part(i));
152 }
153
154 RemoveUnsafe(part.stage_);
155 }
156 }
157
158 void Engine::RemoveAll(Output output) {
159 DCHECK(output);
160
161 if (!output.connected()) {
162 return;
163 }
164
165 Part downstream_part = output.downstream_part();
166 Disconnect(output);
167 RemoveAll(downstream_part);
168 }
169
170 void Engine::RemoveAll(Input input) {
171 DCHECK(input);
172
173 if (!input.connected()) {
174 return;
175 }
176
177 Part upstream_part = input.upstream_part();
178 Disconnect(input);
179 RemoveAll(upstream_part);
180 }
181
182 void Engine::Prepare() {
183 base::AutoLock lock(lock_);
184 for (Stage* sink : sinks_) {
185 sink->Prepare(update_function_);
186 sink->prepared_ = true;
187 uint32_t input_count = sink->input_count();
188 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
189 PrepareUnsafe(sink->input(input_index).upstream_stage());
190 }
191 }
192 }
193
194 void Engine::Prepare(Part part) {
195 DCHECK(part);
196 base::AutoLock lock(lock_);
197 PrepareUnsafe(part.stage_);
198 }
199
200 void Engine::PrimeSinks() {
201 lock_.Acquire();
202 std::list<Stage*> sinks(sinks_);
203 lock_.Release();
204
205 for (Stage* sink : sinks) {
206 sink->Prime();
207 }
208 }
209
210 void Engine::Reset() {
211 base::AutoLock lock(lock_);
212 while (!supply_backlog_.empty()) {
213 supply_backlog_.pop();
214 }
215 while (!demand_backlog_.empty()) {
216 demand_backlog_.pop();
217 }
218 sources_.clear();
219 sinks_.clear();
220 while (!stages_.empty()) {
221 Stage* stage = stages_.front();
222 stages_.pop_front();
223 delete stage;
224 }
225 }
226
227 void Engine::PushToSupplyBacklogUnsafe(Stage* stage) {
228 lock_.AssertAcquired();
229
230 DCHECK(stage);
231 packets_produced_ = true;
232 if (!stage->in_supply_backlog_) {
233 supply_backlog_.push(stage);
234 stage->in_supply_backlog_ = true;
235 }
236 }
237
238 void Engine::PushToDemandBacklogUnsafe(Stage* stage) {
239 lock_.AssertAcquired();
240
241 DCHECK(stage);
242 if (!stage->in_demand_backlog_) {
243 demand_backlog_.push(stage);
244 stage->in_demand_backlog_ = true;
245 }
246 }
247
248 Engine::Part Engine::Add(Stage* stage) {
249 base::AutoLock lock(lock_);
250
251 stages_.push_back(stage);
252 if (stage->input_count() == 0) {
253 sources_.push_back(stage);
254 }
255 if (stage->output_count() == 0) {
256 sinks_.push_back(stage);
257 }
258 return Part(stage);
259 }
260
261 void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) {
262 DCHECK(stage);
263 DCHECK(index < stage->output_count());
264
265 lock_.AssertAcquired();
266
267 StageOutput& stage_output = stage->output(index);
268
269 if (stage_output.downstream_stage() == nullptr) {
270 return;
271 }
272
273 stage_output.mate().disconnect();
274 stage_output.disconnect();
275 }
276
277 void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) {
278 DCHECK(stage);
279 DCHECK(index < stage->input_count());
280
281 lock_.AssertAcquired();
282
283 StageInput& stage_input = stage->input(index);
284
285 if (stage_input.upstream_stage() == nullptr) {
286 return;
287 }
288
289 stage_input.mate().disconnect();
290 stage_input.disconnect();
291 }
292
293 void Engine::RemoveUnsafe(Stage* stage) {
294 DCHECK(stage);
295
296 lock_.AssertAcquired();
297
298 uint32_t input_count = stage->input_count();
299 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
300 if (stage->input(input_index).connected()) {
301 DisconnectInputUnsafe(stage, input_index);
302 }
303 }
304
305 uint32_t output_count = stage->output_count();
306 for (uint32_t output_index = 0; output_index < output_count; output_index++) {
307 if (stage->output(output_index).connected()) {
308 DisconnectOutputUnsafe(stage, output_index);
309 }
310 }
311
312 sources_.remove(stage);
313 sinks_.remove(stage);
314 stages_.remove(stage);
315 delete stage;
316 }
317
318 // static
319 Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) {
320 return new DistributorStage(source);
321 }
322
323 // static
324 Stage* Engine::CreateStage(PacketTransformPtr transform) {
325 return new PacketTransformStage(transform);
326 }
327
328 // static
329 Stage* Engine::CreateStage(ActiveSourcePtr source) {
330 return new ActiveSourceStage(source);
331 }
332
333 // static
334 Stage* Engine::CreateStage(ActiveSinkPtr sink) {
335 return new ActiveSinkStage(sink);
336 }
337
338 // static
339 Stage* Engine::CreateStage(LpcmTransformPtr transform) {
340 return new LpcmTransformStage(transform);
341 }
342
343 void Engine::PrepareUnsafe(Stage* stage) {
344 lock_.AssertAcquired();
345
346 if (stage == nullptr || stage->prepared_) {
347 return;
348 }
349
350 stage->Prepare(update_function_);
351 stage->prepared_ = true;
352
353 uint32_t input_count = stage->input_count();
354 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
355 PrepareUnsafe(stage->input(input_index).upstream_stage());
356 }
357 }
358
359 void Engine::UpdateUnsafe() {
360 lock_.AssertAcquired();
361
362 while (true) {
363 Stage* stage = PopFromSupplyBacklogUnsafe();
364 if (stage != nullptr) {
365 UpdateUnsafe(stage);
366 continue;
367 }
368
369 stage = PopFromDemandBacklogUnsafe();
370 if (stage != nullptr) {
371 UpdateUnsafe(stage);
372 continue;
373 }
374
375 break;
376 }
377 }
378
379 void Engine::UpdateUnsafe(Stage *stage) {
380 lock_.AssertAcquired();
381
382 DCHECK(stage);
383
384 packets_produced_ = false;
385
386 stage->Update(this);
387
388 // If the stage produced packets, it may need to reevaluate demand later.
389 if (packets_produced_) {
390 PushToDemandBacklogUnsafe(stage);
391 }
392 }
393
394 Stage* Engine::PopFromSupplyBacklogUnsafe() {
395 lock_.AssertAcquired();
396
397 if (supply_backlog_.empty()) {
398 return nullptr;
399 }
400
401 Stage* stage = supply_backlog_.front();
402 supply_backlog_.pop();
403 DCHECK(stage->in_supply_backlog_);
404 stage->in_supply_backlog_ = false;
405 return stage;
406 }
407
408 Stage* Engine::PopFromDemandBacklogUnsafe() {
409 lock_.AssertAcquired();
410
411 if (demand_backlog_.empty()) {
412 return nullptr;
413 }
414
415 Stage* stage = demand_backlog_.top();
416 demand_backlog_.pop();
417 DCHECK(stage->in_demand_backlog_);
418 stage->in_demand_backlog_ = false;
419 return stage;
420 }
421
422 } // namespace media
423 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698