OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "services/media/framework/engine.h" | |
6 | |
7 namespace mojo { | |
8 namespace media { | |
9 | |
10 uint32_t Engine::Part::input_count() { | |
11 DCHECK(stage_ != nullptr); | |
12 return stage_->input_count(); | |
13 } | |
14 | |
15 Engine::Input Engine::Part::input(uint32_t index) { | |
16 DCHECK(stage_ != nullptr && index < stage_->input_count()); | |
17 return Input(stage_, index); | |
18 } | |
19 | |
20 Engine::Input Engine::Part::input() { | |
21 DCHECK(stage_ != nullptr && stage_->input_count() == 1); | |
22 return Input(stage_, 0); | |
23 } | |
24 | |
25 uint32_t Engine::Part::output_count() { | |
26 DCHECK(stage_ != nullptr); | |
27 return stage_->output_count(); | |
28 } | |
29 | |
30 Engine::Output Engine::Part::output(uint32_t index) { | |
31 DCHECK(stage_ != nullptr && index < stage_->output_count()); | |
32 return Output(stage_, index); | |
33 } | |
34 | |
35 Engine::Output Engine::Part::output() { | |
36 DCHECK(stage_ != nullptr && stage_->output_count() == 1); | |
37 return Output(stage_, 0); | |
38 } | |
39 | |
40 Engine::Part Engine::Part::upstream_part(uint32_t index) { | |
41 DCHECK(stage_ != nullptr && index < stage_->input_count()); | |
42 return Part(stage_->input(index).upstream_stage()); | |
43 } | |
44 | |
45 Engine::Part Engine::Part::upstream_part() { | |
46 DCHECK(stage_ != nullptr && stage_->input_count() == 1); | |
47 return Part(stage_->input(0).upstream_stage()); | |
48 } | |
49 | |
50 Engine::Part Engine::Part::downstream_part(uint32_t index) { | |
51 DCHECK(stage_ != nullptr && index < stage_->output_count()); | |
52 return Part(stage_->output(index).downstream_stage()); | |
53 } | |
54 | |
55 Engine::Part Engine::Part::downstream_part() { | |
56 DCHECK(stage_ != nullptr && stage_->output_count() == 1); | |
57 return Part(stage_->output(0).downstream_stage()); | |
58 } | |
59 | |
60 Engine::Engine() { | |
61 update_function_ = [this](Stage* stage) { | |
62 DCHECK(stage); | |
63 base::AutoLock lock(lock_); | |
64 UpdateUnsafe(stage); | |
65 UpdateUnsafe(); | |
66 }; | |
67 } | |
68 | |
69 Engine::~Engine() { | |
70 Reset(); | |
71 } | |
72 | |
73 void Engine::Remove(Part part) { | |
74 DCHECK(part); | |
75 base::AutoLock lock(lock_); | |
76 RemoveUnsafe(part.stage_); | |
77 } | |
78 | |
79 Engine::Part Engine::Connect(Output output, Input input) { | |
80 DCHECK(output); | |
81 DCHECK(input); | |
82 | |
83 base::AutoLock lock(lock_); | |
84 | |
85 if (output.connected()) { | |
86 DisconnectOutputUnsafe(output.stage_, output.index_); | |
87 } | |
88 if (input.connected()) { | |
89 DisconnectInputUnsafe(input.stage_, input.index_); | |
90 } | |
91 | |
92 output.stage_output().connect( | |
93 input.stage_, | |
94 input.index_); | |
95 | |
96 input.stage_input().connect( | |
97 output.stage_, | |
98 output.index_); | |
99 | |
100 return input.part(); | |
101 } | |
102 | |
103 Engine::Part Engine::Connect(Part upstream_part, Part downstream_part) { | |
104 DCHECK(upstream_part); | |
105 DCHECK(downstream_part); | |
106 Connect(upstream_part.output(), downstream_part.input()); | |
107 return downstream_part; | |
108 } | |
109 | |
110 Engine::Part Engine::Connect( | |
111 Output output, | |
112 Part downstream_part) { | |
113 DCHECK(output); | |
114 DCHECK(downstream_part); | |
115 Connect(output, downstream_part.input()); | |
116 return downstream_part; | |
117 } | |
118 | |
119 Engine::Part Engine::Connect(Part upstream_part, Input input) { | |
120 DCHECK(upstream_part); | |
121 DCHECK(input); | |
122 Connect(upstream_part.output(), input); | |
123 return input.part(); | |
124 } | |
125 | |
126 void Engine::Disconnect(Output output) { | |
127 DCHECK(output); | |
128 | |
129 base::AutoLock lock(lock_); | |
130 DisconnectOutputUnsafe(output.stage_, output.index_); | |
131 } | |
132 | |
133 void Engine::Disconnect(Input input) { | |
134 DCHECK(input); | |
135 | |
136 base::AutoLock lock(lock_); | |
137 DisconnectInputUnsafe(input.stage_, input.index_); | |
138 } | |
139 | |
140 void Engine::RemoveAll(Part part) { | |
141 DCHECK(part); | |
142 | |
143 base::AutoLock lock(lock_); | |
144 | |
145 std::deque<Part> to_remove { part }; | |
146 | |
147 while (!to_remove.empty()) { | |
148 Part part = to_remove.front(); | |
149 to_remove.pop_front(); | |
150 | |
151 for (uint32_t i = 0; i < part.input_count(); ++i) { | |
152 to_remove.push_back(part.upstream_part(i)); | |
153 } | |
154 | |
155 for (uint32_t i = 0; i < part.output_count(); ++i) { | |
156 to_remove.push_back(part.downstream_part(i)); | |
157 } | |
158 | |
159 RemoveUnsafe(part.stage_); | |
160 } | |
161 } | |
162 | |
163 void Engine::RemoveAll(Output output) { | |
164 DCHECK(output); | |
165 | |
166 if (!output.connected()) { | |
167 return; | |
168 } | |
169 | |
170 Part downstream_part = output.downstream_part(); | |
171 Disconnect(output); | |
172 RemoveAll(downstream_part); | |
173 } | |
174 | |
175 void Engine::RemoveAll(Input input) { | |
176 DCHECK(input); | |
177 | |
178 if (!input.connected()) { | |
179 return; | |
180 } | |
181 | |
182 Part upstream_part = input.upstream_part(); | |
183 Disconnect(input); | |
184 RemoveAll(upstream_part); | |
185 } | |
186 | |
187 void Engine::Prepare() { | |
188 base::AutoLock lock(lock_); | |
189 for (Stage* sink : sinks_) { | |
190 sink->Prepare(update_function_); | |
191 sink->prepared_ = true; | |
192 uint32_t input_count = sink->input_count(); | |
193 for (uint32_t input_index = 0; input_index < input_count; input_index++) { | |
194 PrepareUnsafe(sink->input(input_index).upstream_stage()); | |
195 } | |
196 } | |
197 } | |
198 | |
199 void Engine::Prepare(Part part) { | |
200 DCHECK(part); | |
201 base::AutoLock lock(lock_); | |
202 PrepareUnsafe(part.stage_); | |
203 } | |
204 | |
205 void Engine::PrimeSinks() { | |
206 lock_.Acquire(); | |
207 std::list<Stage*> sinks(sinks_); | |
208 lock_.Release(); | |
johngro
2016/01/28 19:14:55
so, you made a copy of the sinks_ list which is ho
dalesat
2016/01/29 01:08:29
At this point, we are depending on the kindness of
johngro
2016/02/01 22:38:17
Acknowledged.
| |
209 | |
210 for (Stage* sink : sinks) { | |
211 sink->Prime(); | |
212 } | |
213 } | |
214 | |
215 void Engine::Reset() { | |
216 base::AutoLock lock(lock_); | |
217 supply_backlog_.clear(); | |
218 demand_backlog_.clear(); | |
219 sources_.clear(); | |
220 sinks_.clear(); | |
221 while (!stages_.empty()) { | |
222 Stage* stage = stages_.front(); | |
223 stages_.pop_front(); | |
224 delete stage; | |
225 } | |
226 } | |
227 | |
228 void Engine::PushToSupplyBacklogUnsafe(Stage* stage) { | |
229 lock_.AssertAcquired(); | |
230 | |
231 DCHECK(stage); | |
232 packets_produced_ = true; | |
233 if (!stage->in_supply_backlog_) { | |
234 // LIFO | |
johngro
2016/01/27 22:35:22
These comments still seem backwards. Both Pop imp
dalesat
2016/01/28 18:49:17
Thanks for the tip on queue/stack. They don't have
johngro
2016/02/01 22:38:17
Acknowledged.
Re: ordering, this basically boils
dalesat
2016/02/01 23:01:28
Nice trick, thanks!
| |
235 supply_backlog_.push_back(stage); | |
236 stage->in_supply_backlog_ = true; | |
237 } | |
238 } | |
239 | |
240 void Engine::PushToDemandBacklogUnsafe(Stage* stage) { | |
241 lock_.AssertAcquired(); | |
242 | |
243 DCHECK(stage); | |
244 if (!stage->in_demand_backlog_) { | |
245 // FIFO | |
246 demand_backlog_.push_front(stage); | |
247 stage->in_demand_backlog_ = true; | |
248 } | |
249 } | |
250 | |
251 Engine::Part Engine::Add(Stage* stage) { | |
252 base::AutoLock lock(lock_); | |
253 | |
254 stages_.push_back(stage); | |
255 if (stage->input_count() == 0) { | |
256 sources_.push_back(stage); | |
257 } | |
258 if (stage->output_count() == 0) { | |
259 sinks_.push_back(stage); | |
260 } | |
261 return Part(stage); | |
262 } | |
263 | |
264 void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) { | |
265 DCHECK(stage); | |
266 DCHECK(index < stage->output_count()); | |
267 | |
268 lock_.AssertAcquired(); | |
269 | |
270 StageOutput& stage_output = stage->output(index); | |
271 | |
272 if (stage_output.downstream_stage() == nullptr) { | |
273 return; | |
274 } | |
275 | |
276 stage_output.mate().connect(nullptr, 0); | |
277 stage_output.connect(nullptr, 0); | |
278 } | |
279 | |
280 void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) { | |
281 DCHECK(stage); | |
282 DCHECK(index < stage->input_count()); | |
283 | |
284 lock_.AssertAcquired(); | |
285 | |
286 StageInput& stage_input = stage->input(index); | |
287 | |
288 if (stage_input.upstream_stage() == nullptr) { | |
289 return; | |
290 } | |
291 | |
292 stage_input.mate().connect(nullptr, 0); | |
293 stage_input.connect(nullptr, 0); | |
294 } | |
295 | |
296 void Engine::RemoveUnsafe(Stage* stage) { | |
297 DCHECK(stage); | |
298 | |
299 lock_.AssertAcquired(); | |
300 | |
301 uint32_t input_count = stage->input_count(); | |
302 for (uint32_t input_index = 0; input_index < input_count; input_index++) { | |
303 if (stage->input(input_index).connected()) { | |
304 DisconnectInputUnsafe(stage, input_index); | |
305 } | |
306 } | |
307 | |
308 uint32_t output_count = stage->output_count(); | |
309 for (uint32_t output_index = 0; output_index < output_count; output_index++) { | |
310 if (stage->output(output_index).connected()) { | |
311 DisconnectOutputUnsafe(stage, output_index); | |
312 } | |
313 } | |
314 | |
315 sources_.remove(stage); | |
316 sinks_.remove(stage); | |
317 stages_.remove(stage); | |
318 delete stage; | |
319 } | |
320 | |
321 // static | |
322 Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) { | |
323 return new DistributorStage(source); | |
324 } | |
325 | |
326 // static | |
327 Stage* Engine::CreateStage(PacketTransformPtr transform) { | |
328 return new PacketTransformStage(transform); | |
329 } | |
330 | |
331 // static | |
332 Stage* Engine::CreateStage(ActiveSourcePtr source) { | |
333 return new ActiveSourceStage(source); | |
334 } | |
335 | |
336 // static | |
337 Stage* Engine::CreateStage(ActiveSinkPtr sink) { | |
338 return new ActiveSinkStage(sink); | |
339 } | |
340 | |
341 // static | |
342 Stage* Engine::CreateStage(LpcmTransformPtr transform) { | |
343 return new LpcmTransformStage(transform); | |
344 } | |
345 | |
346 void Engine::PrepareUnsafe(Stage* stage) { | |
347 lock_.AssertAcquired(); | |
348 | |
349 if (stage == nullptr || stage->prepared_) { | |
350 return; | |
351 } | |
352 | |
353 stage->Prepare(update_function_); | |
johngro
2016/01/28 19:14:55
Following comment applies to UpdateUnsafe as well
dalesat
2016/01/29 01:08:30
The threading model will change dramatically as ti
johngro
2016/02/01 22:38:17
Acknowledged.
| |
354 stage->prepared_ = true; | |
355 | |
356 uint32_t input_count = stage->input_count(); | |
357 for (uint32_t input_index = 0; input_index < input_count; input_index++) { | |
358 PrepareUnsafe(stage->input(input_index).upstream_stage()); | |
359 } | |
360 } | |
361 | |
362 void Engine::UpdateUnsafe() { | |
363 lock_.AssertAcquired(); | |
364 | |
365 while (true) { | |
366 Stage* stage = PopFromSupplyBacklogUnsafe(); | |
367 if (stage != nullptr) { | |
368 UpdateUnsafe(stage); | |
369 continue; | |
370 } | |
371 | |
372 stage = PopFromDemandBacklogUnsafe(); | |
373 if (stage != nullptr) { | |
374 UpdateUnsafe(stage); | |
375 continue; | |
376 } | |
377 | |
378 break; | |
379 } | |
380 } | |
381 | |
382 void Engine::UpdateUnsafe(Stage *stage) { | |
383 lock_.AssertAcquired(); | |
384 | |
385 DCHECK(stage); | |
386 | |
387 packets_produced_ = false; | |
388 | |
389 stage->Update(this); | |
390 | |
391 // If the stage produced packets, it may need to reevaluate demand later. | |
392 if (packets_produced_) { | |
393 PushToDemandBacklogUnsafe(stage); | |
394 } | |
395 } | |
396 | |
397 Stage* Engine::PopFromSupplyBacklogUnsafe() { | |
398 lock_.AssertAcquired(); | |
399 | |
400 if (supply_backlog_.empty()) { | |
401 return nullptr; | |
402 } | |
403 | |
404 Stage* stage = supply_backlog_.front(); | |
405 supply_backlog_.pop_front(); | |
406 DCHECK(stage->in_supply_backlog_); | |
407 stage->in_supply_backlog_ = false; | |
408 return stage; | |
409 } | |
410 | |
411 Stage* Engine::PopFromDemandBacklogUnsafe() { | |
412 lock_.AssertAcquired(); | |
413 | |
414 if (demand_backlog_.empty()) { | |
415 return nullptr; | |
416 } | |
417 | |
418 Stage* stage = demand_backlog_.front(); | |
419 demand_backlog_.pop_front(); | |
420 DCHECK(stage->in_demand_backlog_); | |
421 stage->in_demand_backlog_ = false; | |
422 return stage; | |
423 } | |
424 | |
425 } // namespace media | |
426 } // namespace mojo | |
OLD | NEW |