Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Side by Side Diff: services/media/framework/engine.cc

Issue 1577953002: Motown in-proc streaming framework used to implement media services. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Various fixes based on feedback. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "services/media/framework/engine.h"
6
7 namespace mojo {
8 namespace media {
9
10 uint32_t Engine::Part::input_count() {
11 DCHECK(stage_ != nullptr);
12 return stage_->input_count();
13 }
14
15 Engine::Input Engine::Part::input(uint32_t index) {
16 DCHECK(stage_ != nullptr && index < stage_->input_count());
17 return Input(stage_, index);
18 }
19
20 Engine::Input Engine::Part::input() {
21 DCHECK(stage_ != nullptr && stage_->input_count() == 1);
22 return Input(stage_, 0);
23 }
24
25 uint32_t Engine::Part::output_count() {
26 DCHECK(stage_ != nullptr);
27 return stage_->output_count();
28 }
29
30 Engine::Output Engine::Part::output(uint32_t index) {
31 DCHECK(stage_ != nullptr && index < stage_->output_count());
32 return Output(stage_, index);
33 }
34
35 Engine::Output Engine::Part::output() {
36 DCHECK(stage_ != nullptr && stage_->output_count() == 1);
37 return Output(stage_, 0);
38 }
39
40 Engine::Part Engine::Part::upstream_part(uint32_t index) {
41 DCHECK(stage_ != nullptr && index < stage_->input_count());
42 return Part(stage_->input(index).upstream_stage());
43 }
44
45 Engine::Part Engine::Part::upstream_part() {
46 DCHECK(stage_ != nullptr && stage_->input_count() == 1);
47 return Part(stage_->input(0).upstream_stage());
48 }
49
50 Engine::Part Engine::Part::downstream_part(uint32_t index) {
51 DCHECK(stage_ != nullptr && index < stage_->output_count());
52 return Part(stage_->output(index).downstream_stage());
53 }
54
55 Engine::Part Engine::Part::downstream_part() {
56 DCHECK(stage_ != nullptr && stage_->output_count() == 1);
57 return Part(stage_->output(0).downstream_stage());
58 }
59
60 Engine::Engine() {
61 update_function_ = [this](Stage* stage) {
62 DCHECK(stage);
63 base::AutoLock lock(lock_);
64 UpdateUnsafe(stage);
65 UpdateUnsafe();
66 };
67 }
68
69 Engine::~Engine() {
70 Reset();
71 }
72
73 void Engine::Remove(Part part) {
74 DCHECK(part);
75 base::AutoLock lock(lock_);
76 RemoveUnsafe(part.stage_);
77 }
78
79 Engine::Part Engine::Connect(Output output, Input input) {
80 DCHECK(output);
81 DCHECK(input);
82
83 base::AutoLock lock(lock_);
84
85 if (output.connected()) {
86 DisconnectOutputUnsafe(output.stage_, output.index_);
87 }
88 if (input.connected()) {
89 DisconnectInputUnsafe(input.stage_, input.index_);
90 }
91
92 output.stage_output().connect(
93 input.stage_,
94 input.index_);
95
96 input.stage_input().connect(
97 output.stage_,
98 output.index_);
99
100 return input.part();
101 }
102
103 Engine::Part Engine::Connect(Part upstream_part, Part downstream_part) {
104 DCHECK(upstream_part);
105 DCHECK(downstream_part);
106 Connect(upstream_part.output(), downstream_part.input());
107 return downstream_part;
108 }
109
110 Engine::Part Engine::Connect(
111 Output output,
112 Part downstream_part) {
113 DCHECK(output);
114 DCHECK(downstream_part);
115 Connect(output, downstream_part.input());
116 return downstream_part;
117 }
118
119 Engine::Part Engine::Connect(Part upstream_part, Input input) {
120 DCHECK(upstream_part);
121 DCHECK(input);
122 Connect(upstream_part.output(), input);
123 return input.part();
124 }
125
126 void Engine::Disconnect(Output output) {
127 DCHECK(output);
128
129 base::AutoLock lock(lock_);
130 DisconnectOutputUnsafe(output.stage_, output.index_);
131 }
132
133 void Engine::Disconnect(Input input) {
134 DCHECK(input);
135
136 base::AutoLock lock(lock_);
137 DisconnectInputUnsafe(input.stage_, input.index_);
138 }
139
140 void Engine::RemoveAll(Part part) {
141 DCHECK(part);
142
143 base::AutoLock lock(lock_);
144
145 std::deque<Part> to_remove { part };
146
147 while (!to_remove.empty()) {
148 Part part = to_remove.front();
149 to_remove.pop_front();
150
151 for (uint32_t i = 0; i < part.input_count(); ++i) {
152 to_remove.push_back(part.upstream_part(i));
153 }
154
155 for (uint32_t i = 0; i < part.output_count(); ++i) {
156 to_remove.push_back(part.downstream_part(i));
157 }
158
159 RemoveUnsafe(part.stage_);
160 }
161 }
162
163 void Engine::RemoveAll(Output output) {
164 DCHECK(output);
165
166 if (!output.connected()) {
167 return;
168 }
169
170 Part downstream_part = output.downstream_part();
171 Disconnect(output);
172 RemoveAll(downstream_part);
173 }
174
175 void Engine::RemoveAll(Input input) {
176 DCHECK(input);
177
178 if (!input.connected()) {
179 return;
180 }
181
182 Part upstream_part = input.upstream_part();
183 Disconnect(input);
184 RemoveAll(upstream_part);
185 }
186
187 void Engine::Prepare() {
188 base::AutoLock lock(lock_);
189 for (Stage* sink : sinks_) {
190 sink->Prepare(update_function_);
191 sink->prepared_ = true;
192 uint32_t input_count = sink->input_count();
193 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
194 PrepareUnsafe(sink->input(input_index).upstream_stage());
195 }
196 }
197 }
198
199 void Engine::Prepare(Part part) {
200 DCHECK(part);
201 base::AutoLock lock(lock_);
202 PrepareUnsafe(part.stage_);
203 }
204
205 void Engine::PrimeSinks() {
206 lock_.Acquire();
207 std::list<Stage*> sinks(sinks_);
208 lock_.Release();
johngro 2016/01/28 19:14:55 so, you made a copy of the sinks_ list which is ho
dalesat 2016/01/29 01:08:29 At this point, we are depending on the kindness of
johngro 2016/02/01 22:38:17 Acknowledged.
209
210 for (Stage* sink : sinks) {
211 sink->Prime();
212 }
213 }
214
215 void Engine::Reset() {
216 base::AutoLock lock(lock_);
217 supply_backlog_.clear();
218 demand_backlog_.clear();
219 sources_.clear();
220 sinks_.clear();
221 while (!stages_.empty()) {
222 Stage* stage = stages_.front();
223 stages_.pop_front();
224 delete stage;
225 }
226 }
227
228 void Engine::PushToSupplyBacklogUnsafe(Stage* stage) {
229 lock_.AssertAcquired();
230
231 DCHECK(stage);
232 packets_produced_ = true;
233 if (!stage->in_supply_backlog_) {
234 // LIFO
johngro 2016/01/27 22:35:22 These comments still seem backwards. Both Pop imp
dalesat 2016/01/28 18:49:17 Thanks for the tip on queue/stack. They don't have
johngro 2016/02/01 22:38:17 Acknowledged. Re: ordering, this basically boils
dalesat 2016/02/01 23:01:28 Nice trick, thanks!
235 supply_backlog_.push_back(stage);
236 stage->in_supply_backlog_ = true;
237 }
238 }
239
240 void Engine::PushToDemandBacklogUnsafe(Stage* stage) {
241 lock_.AssertAcquired();
242
243 DCHECK(stage);
244 if (!stage->in_demand_backlog_) {
245 // FIFO
246 demand_backlog_.push_front(stage);
247 stage->in_demand_backlog_ = true;
248 }
249 }
250
251 Engine::Part Engine::Add(Stage* stage) {
252 base::AutoLock lock(lock_);
253
254 stages_.push_back(stage);
255 if (stage->input_count() == 0) {
256 sources_.push_back(stage);
257 }
258 if (stage->output_count() == 0) {
259 sinks_.push_back(stage);
260 }
261 return Part(stage);
262 }
263
264 void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) {
265 DCHECK(stage);
266 DCHECK(index < stage->output_count());
267
268 lock_.AssertAcquired();
269
270 StageOutput& stage_output = stage->output(index);
271
272 if (stage_output.downstream_stage() == nullptr) {
273 return;
274 }
275
276 stage_output.mate().connect(nullptr, 0);
277 stage_output.connect(nullptr, 0);
278 }
279
280 void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) {
281 DCHECK(stage);
282 DCHECK(index < stage->input_count());
283
284 lock_.AssertAcquired();
285
286 StageInput& stage_input = stage->input(index);
287
288 if (stage_input.upstream_stage() == nullptr) {
289 return;
290 }
291
292 stage_input.mate().connect(nullptr, 0);
293 stage_input.connect(nullptr, 0);
294 }
295
296 void Engine::RemoveUnsafe(Stage* stage) {
297 DCHECK(stage);
298
299 lock_.AssertAcquired();
300
301 uint32_t input_count = stage->input_count();
302 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
303 if (stage->input(input_index).connected()) {
304 DisconnectInputUnsafe(stage, input_index);
305 }
306 }
307
308 uint32_t output_count = stage->output_count();
309 for (uint32_t output_index = 0; output_index < output_count; output_index++) {
310 if (stage->output(output_index).connected()) {
311 DisconnectOutputUnsafe(stage, output_index);
312 }
313 }
314
315 sources_.remove(stage);
316 sinks_.remove(stage);
317 stages_.remove(stage);
318 delete stage;
319 }
320
321 // static
322 Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) {
323 return new DistributorStage(source);
324 }
325
326 // static
327 Stage* Engine::CreateStage(PacketTransformPtr transform) {
328 return new PacketTransformStage(transform);
329 }
330
331 // static
332 Stage* Engine::CreateStage(ActiveSourcePtr source) {
333 return new ActiveSourceStage(source);
334 }
335
336 // static
337 Stage* Engine::CreateStage(ActiveSinkPtr sink) {
338 return new ActiveSinkStage(sink);
339 }
340
341 // static
342 Stage* Engine::CreateStage(LpcmTransformPtr transform) {
343 return new LpcmTransformStage(transform);
344 }
345
346 void Engine::PrepareUnsafe(Stage* stage) {
347 lock_.AssertAcquired();
348
349 if (stage == nullptr || stage->prepared_) {
350 return;
351 }
352
353 stage->Prepare(update_function_);
johngro 2016/01/28 19:14:55 Following comment applies to UpdateUnsafe as well
dalesat 2016/01/29 01:08:30 The threading model will change dramatically as ti
johngro 2016/02/01 22:38:17 Acknowledged.
354 stage->prepared_ = true;
355
356 uint32_t input_count = stage->input_count();
357 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
358 PrepareUnsafe(stage->input(input_index).upstream_stage());
359 }
360 }
361
362 void Engine::UpdateUnsafe() {
363 lock_.AssertAcquired();
364
365 while (true) {
366 Stage* stage = PopFromSupplyBacklogUnsafe();
367 if (stage != nullptr) {
368 UpdateUnsafe(stage);
369 continue;
370 }
371
372 stage = PopFromDemandBacklogUnsafe();
373 if (stage != nullptr) {
374 UpdateUnsafe(stage);
375 continue;
376 }
377
378 break;
379 }
380 }
381
382 void Engine::UpdateUnsafe(Stage *stage) {
383 lock_.AssertAcquired();
384
385 DCHECK(stage);
386
387 packets_produced_ = false;
388
389 stage->Update(this);
390
391 // If the stage produced packets, it may need to reevaluate demand later.
392 if (packets_produced_) {
393 PushToDemandBacklogUnsafe(stage);
394 }
395 }
396
397 Stage* Engine::PopFromSupplyBacklogUnsafe() {
398 lock_.AssertAcquired();
399
400 if (supply_backlog_.empty()) {
401 return nullptr;
402 }
403
404 Stage* stage = supply_backlog_.front();
405 supply_backlog_.pop_front();
406 DCHECK(stage->in_supply_backlog_);
407 stage->in_supply_backlog_ = false;
408 return stage;
409 }
410
411 Stage* Engine::PopFromDemandBacklogUnsafe() {
412 lock_.AssertAcquired();
413
414 if (demand_backlog_.empty()) {
415 return nullptr;
416 }
417
418 Stage* stage = demand_backlog_.front();
419 demand_backlog_.pop_front();
420 DCHECK(stage->in_demand_backlog_);
421 stage->in_demand_backlog_ = false;
422 return stage;
423 }
424
425 } // namespace media
426 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698