Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(439)

Side by Side Diff: services/media/framework/engine.cc

Issue 1678433002: Motown: Remove LPCM optimizations, fix prepare, add flush, add ActiveMultistreamSink model/stage (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Sync Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « services/media/framework/engine.h ('k') | services/media/framework/formatting.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "services/media/framework/engine.h" 5 #include "services/media/framework/engine.h"
6 6
7 namespace mojo { 7 namespace mojo {
8 namespace media { 8 namespace media {
9 9
10 uint32_t Engine::Part::input_count() { 10 Engine::Engine() {}
11 DCHECK(stage_ != nullptr); 11
12 return stage_->input_count(); 12 Engine::~Engine() {
13 base::AutoLock lock(lock_);
13 } 14 }
14 15
15 Engine::Input Engine::Part::input(uint32_t index) { 16 void Engine::PrepareInput(const InputRef& input) {
16 DCHECK(stage_ != nullptr && index < stage_->input_count()); 17 VisitUpstream(
17 return Input(stage_, index); 18 input,
19 [] (const InputRef& input,
20 const OutputRef& output,
21 const Stage::UpstreamCallback& callback) {
22 DCHECK(!input.actual().prepared());
23 PayloadAllocator* allocator = input.stage_->PrepareInput(input.index_);
24 input.actual().set_prepared(true);
25 output.stage_->PrepareOutput(output.index_, allocator, callback);
26 });
18 } 27 }
19 28
20 Engine::Input Engine::Part::input() { 29 void Engine::UnprepareInput(const InputRef& input) {
21 DCHECK(stage_ != nullptr && stage_->input_count() == 1); 30 VisitUpstream(
22 return Input(stage_, 0); 31 input,
32 [] (const InputRef& input,
33 const OutputRef& output,
34 const Stage::UpstreamCallback& callback) {
35 DCHECK(input.actual().prepared());
36 input.stage_->UnprepareInput(input.index_);
37 output.stage_->UnprepareOutput(output.index_, callback);
38 });
23 } 39 }
24 40
25 uint32_t Engine::Part::output_count() { 41 void Engine::FlushOutput(const OutputRef& output) {
26 DCHECK(stage_ != nullptr); 42 VisitDownstream(
27 return stage_->output_count(); 43 output,
44 [] (const OutputRef& output,
45 const InputRef& input,
46 const Stage::DownstreamCallback& callback) {
47 DCHECK(input.actual().prepared());
48 output.stage_->FlushOutput(output.index_);
49 input.stage_->FlushInput(input.index_, callback);
50 });
28 } 51 }
29 52
30 Engine::Output Engine::Part::output(uint32_t index) { 53 void Engine::RequestUpdate(Stage* stage) {
31 DCHECK(stage_ != nullptr && index < stage_->output_count()); 54 DCHECK(stage);
32 return Output(stage_, index); 55 base::AutoLock lock(lock_);
56 Update(stage);
57 Update();
33 } 58 }
34 59
35 Engine::Output Engine::Part::output() { 60 void Engine::PushToSupplyBacklog(Stage* stage) {
36 DCHECK(stage_ != nullptr && stage_->output_count() == 1); 61 lock_.AssertAcquired();
37 return Output(stage_, 0); 62 DCHECK(stage);
38 }
39 63
40 Engine::Part Engine::Part::upstream_part(uint32_t index) {
41 DCHECK(stage_ != nullptr && index < stage_->input_count());
42 return Part(stage_->input(index).upstream_stage());
43 }
44
45 Engine::Part Engine::Part::upstream_part() {
46 DCHECK(stage_ != nullptr && stage_->input_count() == 1);
47 return Part(stage_->input(0).upstream_stage());
48 }
49
50 Engine::Part Engine::Part::downstream_part(uint32_t index) {
51 DCHECK(stage_ != nullptr && index < stage_->output_count());
52 return Part(stage_->output(index).downstream_stage());
53 }
54
55 Engine::Part Engine::Part::downstream_part() {
56 DCHECK(stage_ != nullptr && stage_->output_count() == 1);
57 return Part(stage_->output(0).downstream_stage());
58 }
59
60 Engine::Engine() {
61 update_function_ = [this](Stage* stage) {
62 DCHECK(stage);
63 base::AutoLock lock(lock_);
64 UpdateUnsafe(stage);
65 UpdateUnsafe();
66 };
67 }
68
69 Engine::~Engine() {
70 Reset();
71 }
72
73 void Engine::RemovePart(Part part) {
74 DCHECK(part);
75 base::AutoLock lock(lock_);
76 RemoveUnsafe(part.stage_);
77 }
78
79 Engine::Part Engine::Connect(Output output, Input input) {
80 DCHECK(output);
81 DCHECK(input);
82
83 base::AutoLock lock(lock_);
84
85 if (output.connected()) {
86 DisconnectOutputUnsafe(output.stage_, output.index_);
87 }
88 if (input.connected()) {
89 DisconnectInputUnsafe(input.stage_, input.index_);
90 }
91
92 output.stage_output().connect(input.stage_, input.index_);
93 input.stage_input().connect(output.stage_, output.index_);
94
95 return input.part();
96 }
97
98 Engine::Part Engine::ConnectParts(Part upstream_part, Part downstream_part) {
99 DCHECK(upstream_part);
100 DCHECK(downstream_part);
101 Connect(upstream_part.output(), downstream_part.input());
102 return downstream_part;
103 }
104
105 Engine::Part Engine::ConnectOutputToPart(
106 Output output,
107 Part downstream_part) {
108 DCHECK(output);
109 DCHECK(downstream_part);
110 Connect(output, downstream_part.input());
111 return downstream_part;
112 }
113
114 Engine::Part Engine::ConnectPartToInput(Part upstream_part, Input input) {
115 DCHECK(upstream_part);
116 DCHECK(input);
117 Connect(upstream_part.output(), input);
118 return input.part();
119 }
120
121 void Engine::DisconnectOutput(Output output) {
122 DCHECK(output);
123
124 base::AutoLock lock(lock_);
125 DisconnectOutputUnsafe(output.stage_, output.index_);
126 }
127
128 void Engine::DisconnectInput(Input input) {
129 DCHECK(input);
130
131 base::AutoLock lock(lock_);
132 DisconnectInputUnsafe(input.stage_, input.index_);
133 }
134
135 void Engine::RemovePartsConnectedToPart(Part part) {
136 DCHECK(part);
137
138 base::AutoLock lock(lock_);
139
140 std::deque<Part> to_remove { part };
141
142 while (!to_remove.empty()) {
143 Part part = to_remove.front();
144 to_remove.pop_front();
145
146 for (uint32_t i = 0; i < part.input_count(); ++i) {
147 to_remove.push_back(part.upstream_part(i));
148 }
149
150 for (uint32_t i = 0; i < part.output_count(); ++i) {
151 to_remove.push_back(part.downstream_part(i));
152 }
153
154 RemoveUnsafe(part.stage_);
155 }
156 }
157
158 void Engine::RemovePartsConnectedToOutput(Output output) {
159 DCHECK(output);
160
161 if (!output.connected()) {
162 return;
163 }
164
165 Part downstream_part = output.downstream_part();
166 DisconnectOutput(output);
167 RemovePartsConnectedToPart(downstream_part);
168 }
169
170 void Engine::RemovePartsConnectedToInput(Input input) {
171 DCHECK(input);
172
173 if (!input.connected()) {
174 return;
175 }
176
177 Part upstream_part = input.upstream_part();
178 DisconnectInput(input);
179 RemovePartsConnectedToPart(upstream_part);
180 }
181
182 void Engine::Prepare() {
183 base::AutoLock lock(lock_);
184 for (Stage* sink : sinks_) {
185 sink->Prepare(update_function_);
186 sink->prepared_ = true;
187 uint32_t input_count = sink->input_count();
188 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
189 MaybePrepareUnsafe(sink->input(input_index).upstream_stage());
190 }
191 }
192 }
193
194 void Engine::Prepare(Part part) {
195 DCHECK(part);
196 base::AutoLock lock(lock_);
197 MaybePrepareUnsafe(part.stage_);
198 }
199
200 void Engine::PrimeSinks() {
201 lock_.Acquire();
202 std::list<Stage*> sinks(sinks_);
203 lock_.Release();
204
205 // TODO(dalesat): Threading issue: these sinks may go away during priming.
206 for (Stage* sink : sinks) {
207 sink->Prime();
208 }
209 }
210
211 void Engine::Reset() {
212 base::AutoLock lock(lock_);
213 while (!supply_backlog_.empty()) {
214 supply_backlog_.pop();
215 }
216 while (!demand_backlog_.empty()) {
217 demand_backlog_.pop();
218 }
219 sources_.clear();
220 sinks_.clear();
221 while (!stages_.empty()) {
222 Stage* stage = stages_.front();
223 stages_.pop_front();
224 delete stage;
225 }
226 }
227
228 void Engine::PushToSupplyBacklogUnsafe(Stage* stage) {
229 lock_.AssertAcquired();
230
231 DCHECK(stage);
232 packets_produced_ = true; 64 packets_produced_ = true;
233 if (!stage->in_supply_backlog_) { 65 if (!stage->in_supply_backlog_) {
234 supply_backlog_.push(stage); 66 supply_backlog_.push(stage);
235 stage->in_supply_backlog_ = true; 67 stage->in_supply_backlog_ = true;
236 } 68 }
237 } 69 }
238 70
239 void Engine::PushToDemandBacklogUnsafe(Stage* stage) { 71 void Engine::PushToDemandBacklog(Stage* stage) {
240 lock_.AssertAcquired(); 72 lock_.AssertAcquired();
73 DCHECK(stage);
241 74
242 DCHECK(stage);
243 if (!stage->in_demand_backlog_) { 75 if (!stage->in_demand_backlog_) {
244 demand_backlog_.push(stage); 76 demand_backlog_.push(stage);
245 stage->in_demand_backlog_ = true; 77 stage->in_demand_backlog_ = true;
246 } 78 }
247 } 79 }
248 80
249 Engine::Part Engine::Add(Stage* stage) { 81 void Engine::VisitUpstream(
82 const InputRef& input,
83 const UpstreamVisitor& vistor) {
250 base::AutoLock lock(lock_); 84 base::AutoLock lock(lock_);
251 85
252 stages_.push_back(stage); 86 std::queue<InputRef> backlog;
253 if (stage->input_count() == 0) { 87 backlog.push(input);
254 sources_.push_back(stage);
255 }
256 if (stage->output_count() == 0) {
257 sinks_.push_back(stage);
258 }
259 return Part(stage);
260 }
261 88
262 void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) { 89 while (!backlog.empty()) {
263 DCHECK(stage); 90 InputRef input = backlog.front();
264 DCHECK(index < stage->output_count()); 91 backlog.pop();
92 DCHECK(input.valid());
93 DCHECK(input.connected());
265 94
266 lock_.AssertAcquired(); 95 const OutputRef& output = input.mate();
96 Stage* output_stage = output.stage_;
267 97
268 StageOutput& stage_output = stage->output(index); 98 vistor(
269 99 input,
270 if (stage_output.downstream_stage() == nullptr) { 100 output,
271 return; 101 [output_stage, &backlog](size_t input_index) {
272 } 102 backlog.push(InputRef(output_stage, input_index));
273 103 });
274 stage_output.mate().disconnect();
275 stage_output.disconnect();
276 }
277
278 void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) {
279 DCHECK(stage);
280 DCHECK(index < stage->input_count());
281
282 lock_.AssertAcquired();
283
284 StageInput& stage_input = stage->input(index);
285
286 if (stage_input.upstream_stage() == nullptr) {
287 return;
288 }
289
290 stage_input.mate().disconnect();
291 stage_input.disconnect();
292 }
293
294 void Engine::RemoveUnsafe(Stage* stage) {
295 DCHECK(stage);
296
297 lock_.AssertAcquired();
298
299 uint32_t input_count = stage->input_count();
300 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
301 if (stage->input(input_index).connected()) {
302 DisconnectInputUnsafe(stage, input_index);
303 }
304 }
305
306 uint32_t output_count = stage->output_count();
307 for (uint32_t output_index = 0; output_index < output_count; output_index++) {
308 if (stage->output(output_index).connected()) {
309 DisconnectOutputUnsafe(stage, output_index);
310 }
311 }
312
313 sources_.remove(stage);
314 sinks_.remove(stage);
315 stages_.remove(stage);
316 delete stage;
317 }
318
319 // static
320 Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) {
321 return new DistributorStage(source);
322 }
323
324 // static
325 Stage* Engine::CreateStage(PacketTransformPtr transform) {
326 return new PacketTransformStage(transform);
327 }
328
329 // static
330 Stage* Engine::CreateStage(ActiveSourcePtr source) {
331 return new ActiveSourceStage(source);
332 }
333
334 // static
335 Stage* Engine::CreateStage(ActiveSinkPtr sink) {
336 return new ActiveSinkStage(sink);
337 }
338
339 // static
340 Stage* Engine::CreateStage(LpcmTransformPtr transform) {
341 return new LpcmTransformStage(transform);
342 }
343
344 void Engine::MaybePrepareUnsafe(Stage* stage) {
345 lock_.AssertAcquired();
346
347 if (stage == nullptr || stage->prepared_) {
348 return;
349 }
350
351 // Make sure all downstream stages have been prepared.
352 uint32_t output_count = stage->output_count();
353 for (uint32_t output_index = 0; output_index < output_count; output_index++) {
354 StageOutput& output = stage->output(output_index);
355 if (output.connected() && !output.downstream_stage()->prepared()) {
356 return;
357 }
358 }
359
360 stage->Prepare(update_function_);
361 stage->prepared_ = true;
362
363 // Prepare all upstream stages.
364 uint32_t input_count = stage->input_count();
365 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
366 MaybePrepareUnsafe(stage->input(input_index).upstream_stage());
367 } 104 }
368 } 105 }
369 106
370 void Engine::UpdateUnsafe() { 107 void Engine::VisitDownstream(
108 const OutputRef& output,
109 const DownstreamVisitor& vistor) {
110 base::AutoLock lock(lock_);
111
112 std::queue<OutputRef> backlog;
113 backlog.push(output);
114
115 while (!backlog.empty()) {
116 OutputRef output = backlog.front();
117 backlog.pop();
118 DCHECK(output.valid());
119 DCHECK(output.connected());
120
121 const InputRef& input = output.mate();
122 Stage* input_stage = input.stage_;
123
124 vistor(
125 output,
126 input,
127 [input_stage, &backlog](size_t output_index) {
128 backlog.push(OutputRef(input_stage, output_index));
129 });
130 }
131 }
132
133 void Engine::Update() {
371 lock_.AssertAcquired(); 134 lock_.AssertAcquired();
372 135
373 while (true) { 136 while (true) {
374 Stage* stage = PopFromSupplyBacklogUnsafe(); 137 Stage* stage = PopFromSupplyBacklog();
375 if (stage != nullptr) { 138 if (stage != nullptr) {
376 UpdateUnsafe(stage); 139 Update(stage);
377 continue; 140 continue;
378 } 141 }
379 142
380 stage = PopFromDemandBacklogUnsafe(); 143 stage = PopFromDemandBacklog();
381 if (stage != nullptr) { 144 if (stage != nullptr) {
382 UpdateUnsafe(stage); 145 Update(stage);
383 continue; 146 continue;
384 } 147 }
385 148
386 break; 149 break;
387 } 150 }
388 } 151 }
389 152
390 void Engine::UpdateUnsafe(Stage *stage) { 153 void Engine::Update(Stage *stage) {
391 lock_.AssertAcquired(); 154 lock_.AssertAcquired();
392 155
393 DCHECK(stage); 156 DCHECK(stage);
394 157
395 packets_produced_ = false; 158 packets_produced_ = false;
396 159
397 stage->Update(this); 160 stage->Update(this);
398 161
399 // If the stage produced packets, it may need to reevaluate demand later. 162 // If the stage produced packets, it may need to reevaluate demand later.
400 if (packets_produced_) { 163 if (packets_produced_) {
401 PushToDemandBacklogUnsafe(stage); 164 PushToDemandBacklog(stage);
402 } 165 }
403 } 166 }
404 167
405 Stage* Engine::PopFromSupplyBacklogUnsafe() { 168 Stage* Engine::PopFromSupplyBacklog() {
406 lock_.AssertAcquired(); 169 lock_.AssertAcquired();
407 170
408 if (supply_backlog_.empty()) { 171 if (supply_backlog_.empty()) {
409 return nullptr; 172 return nullptr;
410 } 173 }
411 174
412 Stage* stage = supply_backlog_.front(); 175 Stage* stage = supply_backlog_.front();
413 supply_backlog_.pop(); 176 supply_backlog_.pop();
414 DCHECK(stage->in_supply_backlog_); 177 DCHECK(stage->in_supply_backlog_);
415 stage->in_supply_backlog_ = false; 178 stage->in_supply_backlog_ = false;
416 return stage; 179 return stage;
417 } 180 }
418 181
419 Stage* Engine::PopFromDemandBacklogUnsafe() { 182 Stage* Engine::PopFromDemandBacklog() {
420 lock_.AssertAcquired(); 183 lock_.AssertAcquired();
421 184
422 if (demand_backlog_.empty()) { 185 if (demand_backlog_.empty()) {
423 return nullptr; 186 return nullptr;
424 } 187 }
425 188
426 Stage* stage = demand_backlog_.top(); 189 Stage* stage = demand_backlog_.top();
427 demand_backlog_.pop(); 190 demand_backlog_.pop();
428 DCHECK(stage->in_demand_backlog_); 191 DCHECK(stage->in_demand_backlog_);
429 stage->in_demand_backlog_ = false; 192 stage->in_demand_backlog_ = false;
430 return stage; 193 return stage;
431 } 194 }
432 195
433 } // namespace media 196 } // namespace media
434 } // namespace mojo 197 } // namespace mojo
OLDNEW
« no previous file with comments | « services/media/framework/engine.h ('k') | services/media/framework/formatting.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698