Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(268)

Side by Side Diff: services/media/framework/engine.cc

Issue 1577953002: Motown in-proc streaming framework used to implement media services. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: removed build/util/LASTCHANGE Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "services/media/framework/engine.h"
6
7 namespace mojo {
8 namespace media {
9
10 uint32_t Engine::Part::input_count() {
johngro 2016/01/20 00:25:32 Would it be a good idea to inline most of these ac
dalesat 2016/01/25 23:29:37 These are only used for graph building, so they ar
johngro 2016/01/27 22:35:22 Acknowledged.
11 DCHECK(stage_ != nullptr);
12 return stage_->input_count();
13 }
14
15 Engine::Input Engine::Part::input(uint32_t index) {
16 DCHECK(stage_ != nullptr && index < stage_->input_count());
johngro 2016/01/20 00:25:32 FWIW, the 2 parameter form of the Input constructo
dalesat 2016/01/28 18:49:15 Acknowledged.
17 return Input(stage_, index);
18 }
19
20 Engine::Input Engine::Part::input() {
21 DCHECK(stage_ != nullptr && stage_->input_count() == 1);
22 return Input(stage_, 0);
23 }
24
25 uint32_t Engine::Part::output_count() {
26 DCHECK(stage_ != nullptr);
27 return stage_->output_count();
28 }
29
30 Engine::Output Engine::Part::output(uint32_t index) {
31 DCHECK(stage_ != nullptr && index < stage_->output_count());
32 return Output(stage_, index);
33 }
34
35 Engine::Output Engine::Part::output() {
36 DCHECK(stage_ != nullptr && stage_->output_count() == 1);
37 return Output(stage_, 0);
38 }
39
40 Engine::Part Engine::Part::upstream_part(uint32_t index) {
41 DCHECK(stage_ != nullptr && index < stage_->input_count());
42 return Part(stage_->input(index).upstream_stage());
43 }
44
45 Engine::Part Engine::Part::upstream_part() {
46 DCHECK(stage_ != nullptr && stage_->input_count() == 1);
47 return Part(stage_->input(0).upstream_stage());
48 }
49
50 Engine::Part Engine::Part::downstream_part(uint32_t index) {
51 DCHECK(stage_ != nullptr && index < stage_->output_count());
52 return Part(stage_->output(index).downstream_stage());
53 }
54
55 Engine::Part Engine::Part::downstream_part() {
56 DCHECK(stage_ != nullptr && stage_->output_count() == 1);
57 return Part(stage_->output(0).downstream_stage());
58 }
59
60 Engine::Engine() {
61 update_function_ = [this](Stage* stage) {
62 // TODO(dalesat): Correct thread and synchronization.
63 DCHECK(stage);
64 Update(stage);
65 Update();
66 };
67 }
68
69 Engine::~Engine() {}
70
71 void Engine::Remove(Part part) {
72 DCHECK(part);
73
74 Stage* stage = part.stage_;
75
76 uint32_t input_count = stage->input_count();
77 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
78 if (stage->input(input_index).connected()) {
79 Disconnect(Input(stage, input_index));
80 }
81 }
82
83 uint32_t output_count = stage->output_count();
84 for (uint32_t output_index = 0; output_index < output_count; output_index++) {
85 if (stage->output(output_index).connected()) {
86 Disconnect(Output(stage, output_index));
87 }
88 }
89
90 sources_.remove(stage);
91 sinks_.remove(stage);
92
93 std::unique_ptr<Stage> to_remove(stage); // Release this duplicate unique_ptr!
johngro 2016/01/20 00:25:32 something feel wrong about this... if stages_ is
dalesat 2016/01/25 23:29:37 A good analogy is that engine is a container like
johngro 2016/01/27 22:35:22 Acknowledged.
94 stages_.remove(to_remove);
95 to_remove.release(); // OK, it's released!
96 }
97
98 Engine::Part Engine::Connect(
99 Output output,
100 Input input) {
101 DCHECK(output);
102 DCHECK(input);
103
104 if (output.connected()) {
105 Disconnect(output);
106 }
107 if (input.connected()) {
108 Disconnect(input);
109 }
110
111 output.stage_output().connect(
112 input.stage_,
113 input.index_);
114
115 input.stage_input().connect(
116 output.stage_,
117 output.index_);
118
119 return input.part();
120 }
121
122 Engine::Part Engine::Connect(Part upstream_part, Part downstream_part) {
123 DCHECK(upstream_part);
124 DCHECK(downstream_part);
125 Connect(upstream_part.output(), downstream_part.input());
126 return downstream_part;
127 }
128
129 Engine::Part Engine::Connect(
130 Output output,
131 Part downstream_part) {
132 DCHECK(output);
133 DCHECK(downstream_part);
134 Connect(output, downstream_part.input());
135 return downstream_part;
136 }
137
138 Engine::Part Engine::Connect(
139 Part upstream_part,
140 Input input) {
141 DCHECK(upstream_part);
142 DCHECK(input);
143 Connect(upstream_part.output(), input);
144 return input.part();
145 }
146
147 void Engine::Disconnect(Output output) {
148 DCHECK(output);
149
150 if (!output.connected()) {
151 return;
152 }
153
154 StageOutput& stage_output = output.stage_output();
155 stage_output.mate().connect(nullptr, 0);
156 stage_output.connect(nullptr, 0);
157 }
158
159 void Engine::Disconnect(Input input) {
160 DCHECK(input);
161
162 if (!input.connected()) {
163 return;
164 }
165
166 StageInput& stage_input = input.stage_input();
167 stage_input.mate().connect(nullptr, 0);
168 stage_input.connect(nullptr, 0);
169 }
170
171 void Engine::RemoveAll(Part part) {
172 std::deque<Part> to_remove { part };
173
174 while (!to_remove.empty()) {
175 Part part = to_remove.front();
176 to_remove.pop_front();
177
178 for (uint32_t i = 0; i < part.input_count(); ++i) {
179 to_remove.push_back(part.upstream_part(i));
180 }
181
182 for (uint32_t i = 0; i < part.output_count(); ++i) {
183 to_remove.push_back(part.downstream_part(i));
184 }
185
186 Remove(part);
187 }
188 }
189
190 void Engine::RemoveAll(Output output) {
191 DCHECK(output);
192
193 if (!output.connected()) {
194 return;
195 }
196
197 Part downstream_part = output.downstream_part();
198 Disconnect(output);
199 RemoveAll(downstream_part);
200 }
201
202 void Engine::RemoveAll(Input input) {
203 DCHECK(input);
204
205 if (!input.connected()) {
206 return;
207 }
208
209 Part upstream_part = input.upstream_part();
210 Disconnect(input);
211 RemoveAll(upstream_part);
212 }
213
214 void Engine::Prepare() {
215 for (Stage* sink : sinks_) {
216 sink->Prepare(update_function_);
217 sink->engine_private().prepared_ = true;
218 uint32_t input_count = sink->input_count();
219 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
220 Prepare(sink->input(input_index).upstream_stage());
221 }
222 }
223 }
224
225 void Engine::reset() {
226 supply_backlog_.clear();
227 demand_backlog_.clear();
228 sources_.clear();
229 sinks_.clear();
230 stages_.clear();
231 }
232
233 Engine::Part Engine::Add(Stage* stage) {
234 stages_.push_back(std::unique_ptr<Stage>(stage));
johngro 2016/01/20 00:25:32 related to the question about unique pointers earl
dalesat 2016/01/28 18:49:15 Using raw pointers here now.
235 if (stage->input_count() == 0) {
236 sources_.push_back(stage);
237 }
238 if (stage->output_count() == 0) {
239 sinks_.push_back(stage);
240 }
241 return Part(stage);
242 }
243
244 // static
245 Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) {
246 return new DistributorStage(source);
247 }
248
249 // static
250 Stage* Engine::CreateStage(PacketTransformPtr transform) {
251 return new PacketTransformStage(transform);
252 }
253
254 // static
255 Stage* Engine::CreateStage(ActiveSourcePtr source) {
256 return new ActiveSourceStage(source);
257 }
258
259 // static
260 Stage* Engine::CreateStage(ActiveSinkPtr sink) {
261 return new ActiveSinkStage(sink);
262 }
263
264 // static
265 Stage* Engine::CreateStage(LpcmTransformPtr transform) {
266 return new LpcmTransformStage(transform);
267 }
268
269 void Engine::Prepare(Stage* stage) {
270 if (stage == nullptr || stage->engine_private().prepared_) {
271 return;
272 }
273
274 stage->Prepare(update_function_);
275 stage->engine_private().prepared_ = true;
276
277 uint32_t input_count = stage->input_count();
278 for (uint32_t input_index = 0; input_index < input_count; input_index++) {
279 Prepare(stage->input(input_index).upstream_stage());
280 }
281 }
282
283 void Engine::Update() {
284 while (true) {
johngro 2016/01/20 00:25:32 something to consider moving forward; If this is
dalesat 2016/01/25 17:47:29 Agreed. The threading model isn't complete.
johngro 2016/01/27 22:35:22 Acknowledged.
285 Stage* stage = PopFromSupplyBacklog();
286 if (stage != nullptr) {
287 Update(stage);
288 continue;
289 }
290
291 stage = PopFromDemandBacklog();
292 if (stage != nullptr) {
293 Update(stage);
294 continue;
295 }
296
297 break;
298 }
299 }
300
301 void Engine::Update(Stage *stage) {
302 DCHECK(stage);
303
304 packets_produced_ = false;
305
306 stage->Update(*this);
307
308 // If the stage produced packets, it may need to reevaluate demand later.
309 if (packets_produced_) {
310 PushToDemandBacklog(stage);
311 }
312 }
313
314 void Engine::PushToSupplyBacklog(Stage* stage) {
315 DCHECK(stage);
316 packets_produced_ = true;
317 if (!stage->engine_private().in_supply_backlog_) {
318 // LIFO
319 supply_backlog_.push_back(stage);
320 stage->engine_private().in_supply_backlog_ = true;
321 }
322 }
323
324 void Engine::PushToDemandBacklog(Stage* stage) {
325 DCHECK(stage);
326 if (!stage->engine_private().in_demand_backlog_) {
327 // FIFO
328 demand_backlog_.push_front(stage);
329 stage->engine_private().in_demand_backlog_ = true;
330 }
331 }
332
333 Stage* Engine::PopFromSupplyBacklog() {
334 if (supply_backlog_.empty()) {
335 return nullptr;
336 }
337
338 Stage* stage = supply_backlog_.front();
johngro 2016/01/20 00:25:32 the comment above indicates that this is supposed
johngro 2016/01/27 22:35:22 This still seems backwards, see updated comment in
dalesat 2016/01/28 18:49:15 Done.
339 supply_backlog_.pop_front();
340 DCHECK(stage->engine_private().in_supply_backlog_);
341 stage->engine_private().in_supply_backlog_ = false;
342 return stage;
343 }
344
345 Stage* Engine::PopFromDemandBacklog() {
346 if (demand_backlog_.empty()) {
347 return nullptr;
348 }
349
350 Stage* stage = demand_backlog_.front();
351 demand_backlog_.pop_front();
352 DCHECK(stage->engine_private().in_demand_backlog_);
353 stage->engine_private().in_demand_backlog_ = false;
354 return stage;
355 }
356
357 } // namespace media
358 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698