Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(217)

Side by Side Diff: pkg/barback/lib/src/phase.dart

Issue 261823008: Reorganize barback's source files. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: re-add barback/lib/src/internal_asset.dart Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/barback/lib/src/package_provider.dart ('k') | pkg/barback/lib/src/phase_forwarder.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library barback.phase;
6
7 import 'dart:async';
8
9 import 'asset_cascade.dart';
10 import 'asset_id.dart';
11 import 'asset_node.dart';
12 import 'asset_node_set.dart';
13 import 'errors.dart';
14 import 'group_runner.dart';
15 import 'log.dart';
16 import 'multiset.dart';
17 import 'node_status.dart';
18 import 'node_streams.dart';
19 import 'phase_forwarder.dart';
20 import 'phase_output.dart';
21 import 'transformer.dart';
22 import 'transformer_classifier.dart';
23 import 'transformer_group.dart';
24 import 'utils.dart';
25
26 /// One phase in the ordered series of transformations in an [AssetCascade].
27 ///
28 /// Each phase can access outputs from previous phases and can in turn pass
29 /// outputs to later phases. Phases are processed strictly serially. All
30 /// transforms in a phase will be complete before moving on to the next phase.
31 /// Within a single phase, all transforms will be run in parallel.
32 ///
33 /// Building can be interrupted between phases. For example, a source is added
34 /// which starts the background process. Sometime during, say, phase 2 (which
35 /// is running asynchronously) that source is modified. When the process queue
36 /// goes to advance to phase 3, it will see that modification and start the
37 /// waterfall from the beginning again.
38 class Phase {
39 /// The cascade that owns this phase.
40 final AssetCascade cascade;
41
42 /// A string describing the location of [this] in the transformer graph.
43 final String _location;
44
45 /// The index of [this] in its parent cascade or group.
46 final int _index;
47
48 /// The groups for this phase.
49 final _groups = new Map<TransformerGroup, GroupRunner>();
50
51 /// The inputs for this phase.
52 ///
53 /// For the first phase, these will be the source assets. For all other
54 /// phases, they will be the outputs from the previous phase.
55 final _inputs = new AssetNodeSet();
56
57 /// The transformer classifiers for this phase.
58 final _classifiers = new Map<Transformer, TransformerClassifier>();
59
60 /// The forwarders for this phase.
61 final _forwarders = new Map<AssetId, PhaseForwarder>();
62
63 /// The outputs for this phase.
64 final _outputs = new Map<AssetId, PhaseOutput>();
65
66 /// The set of all [AssetNode.origin] properties of the input assets for this
67 /// phase.
68 ///
69 /// This is used to determine which assets have been passed unmodified through
70 /// [_classifiers] or [_groups]. It's possible that a given asset was consumed
71 /// by a group and not an individual transformer, and so shouldn't be
72 /// forwarded through the phase as a whole.
73 ///
74 /// In order to detect whether an output has been forwarded through a group or
75 /// a classifier, we must be able to distinguish it from other outputs with
76 /// the same id. To do so, we check if its origin is in [_inputOrigins]. If
77 /// so, it's been forwarded unmodified.
78 final _inputOrigins = new Multiset<AssetNode>();
79
80 /// The streams exposed by this phase.
81 final _streams = new NodeStreams();
82 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange;
83 Stream<AssetNode> get onAsset => _streams.onAsset;
84 Stream<LogEntry> get onLog => _streams.onLog;
85
86 /// How far along [this] is in processing its assets.
87 NodeStatus get status {
88 // Before any transformers are added, the phase should be dirty if and only
89 // if any input is dirty.
90 if (_classifiers.isEmpty && _groups.isEmpty) {
91 return _inputs.any((input) => input.state.isDirty) ?
92 NodeStatus.RUNNING : NodeStatus.IDLE;
93 }
94
95 var classifierStatus = NodeStatus.dirtiest(
96 _classifiers.values.map((classifier) => classifier.status));
97 var groupStatus = NodeStatus.dirtiest(
98 _groups.values.map((group) => group.status));
99 return (previous == null ? NodeStatus.IDLE : previous.status)
100 .dirtier(classifierStatus)
101 .dirtier(groupStatus);
102 }
103
104 /// The previous phase in the cascade, or null if this is the first phase.
105 final Phase previous;
106
107 /// The subscription to [previous]'s [onStatusChange] stream.
108 StreamSubscription _previousStatusSubscription;
109
110 /// The subscription to [previous]'s [onAsset] stream.
111 StreamSubscription<AssetNode> _previousOnAssetSubscription;
112
113 /// A map of asset ids to completers for [getInput] requests.
114 ///
115 /// If an asset node is requested before it's available, we put a completer in
116 /// this map to wait for the asset to be generated. If it's not generated, the
117 /// completer should complete to `null`.
118 final _pendingOutputRequests = new Map<AssetId, Completer<AssetNode>>();
119
120 /// Returns all currently-available output assets for this phase.
121 Set<AssetNode> get availableOutputs {
122 return _outputs.values
123 .map((output) => output.output)
124 .where((node) => node.state.isAvailable)
125 .toSet();
126 }
127
128 // TODO(nweiz): Rather than passing the cascade and the phase everywhere,
129 // create an interface that just exposes [getInput]. Emit errors via
130 // [AssetNode]s.
131 Phase(AssetCascade cascade, String location)
132 : this._(cascade, location, 0);
133
134 Phase._(this.cascade, this._location, this._index, [this.previous]) {
135 if (previous != null) {
136 _previousOnAssetSubscription = previous.onAsset.listen(addInput);
137 _previousStatusSubscription = previous.onStatusChange
138 .listen((_) => _streams.changeStatus(status));
139 }
140
141 onStatusChange.listen((status) {
142 if (status == NodeStatus.RUNNING) return;
143
144 // All the previous phases have finished declaring or producing their
145 // outputs. If anyone's still waiting for outputs, cut off the wait; we
146 // won't be generating them, at least until a source asset changes.
147 for (var completer in _pendingOutputRequests.values) {
148 completer.complete(null);
149 }
150 _pendingOutputRequests.clear();
151 });
152 }
153
154 /// Adds a new asset as an input for this phase.
155 ///
156 /// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase
157 /// will automatically begin determining which transforms can consume it as a
158 /// primary input. The transforms themselves won't be applied until [process]
159 /// is called, however.
160 ///
161 /// This should only be used for brand-new assets or assets that have been
162 /// removed and re-created. The phase will automatically handle updated assets
163 /// using the [AssetNode.onStateChange] stream.
164 void addInput(AssetNode node) {
165 // Each group is one channel along which an asset may be forwarded, as is
166 // each transformer.
167 var forwarder = new PhaseForwarder(
168 node, _classifiers.length, _groups.length);
169 _forwarders[node.id] = forwarder;
170 forwarder.onAsset.listen(_handleOutputWithoutForwarder);
171 if (forwarder.output != null) {
172 _handleOutputWithoutForwarder(forwarder.output);
173 }
174
175 _inputOrigins.add(node.origin);
176 _inputs.add(node);
177 node.onStateChange.listen((state) {
178 if (state.isRemoved) {
179 _inputOrigins.remove(node.origin);
180 _forwarders.remove(node.id).remove();
181 }
182 _streams.changeStatus(status);
183 });
184
185 for (var classifier in _classifiers.values) {
186 classifier.addInput(node);
187 }
188 for (var group in _groups.values) {
189 group.addInput(node);
190 }
191 }
192
193 // TODO(nweiz): If the output is available when this is called, it's
194 // theoretically possible for it to become unavailable between the call and
195 // the return. If it does so, it won't trigger the rebuilding process. To
196 // avoid this, we should have this and the methods it calls take explicit
197 // callbacks, as in [AssetNode.whenAvailable].
198 /// Gets the asset node for an output [id].
199 ///
200 /// If [id] is for a generated or transformed asset, this will wait until it
201 /// has been created and return it. This means that the returned asset will
202 /// always be [AssetState.AVAILABLE].
203 ///
204 /// If the output cannot be found, returns null.
205 Future<AssetNode> getOutput(AssetId id) {
206 return syncFuture(() {
207 if (id.package != cascade.package) return cascade.graph.getAssetNode(id);
208 if (_outputs.containsKey(id)) {
209 var output = _outputs[id].output;
210 // If the requested output is available, we can just return it.
211 if (output.state.isAvailable) return output;
212
213 // If the requested output exists but isn't yet available, wait to see
214 // if it becomes available. If it's removed before becoming available,
215 // try again, since it could be generated again.
216 output.force();
217 return output.whenAvailable((_) {
218 return output;
219 }).catchError((error) {
220 if (error is! AssetNotFoundException) throw error;
221 return getOutput(id);
222 });
223 }
224
225 // If this phase and the previous phases are fully declared or done, the
226 // requested output won't be generated and we can safely return null.
227 if (status != NodeStatus.RUNNING) return null;
228
229 // Otherwise, store a completer for the asset node. If it's generated in
230 // the future, we'll complete this completer.
231 var completer = _pendingOutputRequests.putIfAbsent(id,
232 () => new Completer.sync());
233 return completer.future;
234 });
235 }
236
237 /// Set this phase's transformers to [transformers].
238 void updateTransformers(Iterable transformers) {
239 var newTransformers = transformers.where((op) => op is Transformer)
240 .toSet();
241 var oldTransformers = _classifiers.keys.toSet();
242 for (var removed in oldTransformers.difference(newTransformers)) {
243 _classifiers.remove(removed).remove();
244 }
245
246 for (var transformer in newTransformers.difference(oldTransformers)) {
247 var classifier = new TransformerClassifier(
248 this, transformer, "$_location.$_index");
249 _classifiers[transformer] = classifier;
250 classifier.onAsset.listen(_handleOutput);
251 _streams.onLogPool.add(classifier.onLog);
252 classifier.onStatusChange.listen((_) => _streams.changeStatus(status));
253 for (var input in _inputs) {
254 classifier.addInput(input);
255 }
256 }
257
258 var newGroups = transformers.where((op) => op is TransformerGroup)
259 .toSet();
260 var oldGroups = _groups.keys.toSet();
261 for (var removed in oldGroups.difference(newGroups)) {
262 _groups.remove(removed).remove();
263 }
264
265 for (var added in newGroups.difference(oldGroups)) {
266 var runner = new GroupRunner(cascade, added, "$_location.$_index");
267 _groups[added] = runner;
268 runner.onAsset.listen(_handleOutput);
269 _streams.onLogPool.add(runner.onLog);
270 runner.onStatusChange.listen((_) => _streams.changeStatus(status));
271 for (var input in _inputs) {
272 runner.addInput(input);
273 }
274 }
275
276 for (var forwarder in _forwarders.values) {
277 forwarder.updateTransformers(_classifiers.length, _groups.length);
278 }
279
280 _streams.changeStatus(status);
281 }
282
283 /// Force all [LazyTransformer]s' transforms in this phase to begin producing
284 /// concrete assets.
285 void forceAllTransforms() {
286 for (var classifier in _classifiers.values) {
287 classifier.forceAllTransforms();
288 }
289
290 for (var group in _groups.values) {
291 group.forceAllTransforms();
292 }
293 }
294
295 /// Add a new phase after this one.
296 ///
297 /// This may only be called on a phase with no phase following it.
298 Phase addPhase() {
299 var next = new Phase._(cascade, _location, _index + 1, this);
300 for (var output in _outputs.values.toList()) {
301 // Remove [output]'s listeners because now they should get the asset from
302 // [next], rather than this phase. Any transforms consuming [output] will
303 // be re-run and will consume the output from the new final phase.
304 output.removeListeners();
305 }
306 return next;
307 }
308
309 /// Mark this phase as removed.
310 ///
311 /// This will remove all the phase's outputs.
312 void remove() {
313 for (var classifier in _classifiers.values.toList()) {
314 classifier.remove();
315 }
316 for (var group in _groups.values) {
317 group.remove();
318 }
319 _streams.close();
320 if (_previousStatusSubscription != null) {
321 _previousStatusSubscription.cancel();
322 }
323 if (_previousOnAssetSubscription != null) {
324 _previousOnAssetSubscription.cancel();
325 }
326 }
327
328 /// Add [asset] as an output of this phase.
329 void _handleOutput(AssetNode asset) {
330 if (_inputOrigins.contains(asset.origin)) {
331 _forwarders[asset.id].addIntermediateAsset(asset);
332 } else {
333 _handleOutputWithoutForwarder(asset);
334 }
335 }
336
337 /// Add [asset] as an output of this phase without checking if it's a
338 /// forwarded asset.
339 void _handleOutputWithoutForwarder(AssetNode asset) {
340 if (_outputs.containsKey(asset.id)) {
341 _outputs[asset.id].add(asset);
342 } else {
343 _outputs[asset.id] = new PhaseOutput(this, asset, "$_location.$_index");
344 _outputs[asset.id].onAsset.listen(_emit,
345 onDone: () => _outputs.remove(asset.id));
346 _emit(_outputs[asset.id].output);
347 }
348
349 var exception = _outputs[asset.id].collisionException;
350 if (exception != null) cascade.reportError(exception);
351 }
352
353 /// Emit [asset] as an output of this phase.
354 ///
355 /// This should be called after [_handleOutput], so that collisions are
356 /// resolved.
357 void _emit(AssetNode asset) {
358 _streams.onAssetController.add(asset);
359 _providePendingAsset(asset);
360 }
361
362 /// Provide an asset to a pending [getOutput] call.
363 void _providePendingAsset(AssetNode asset) {
364 // If anyone's waiting for this asset, provide it to them.
365 var request = _pendingOutputRequests.remove(asset.id);
366 if (request == null) return;
367
368 if (asset.state.isAvailable) {
369 request.complete(asset);
370 return;
371 }
372
373 // A lazy asset may be emitted while still dirty. If so, we wait until it's
374 // either available or removed before trying again to access it.
375 assert(asset.state.isDirty);
376 asset.force();
377 asset.whenStateChanges().then((state) {
378 if (state.isRemoved) return getOutput(asset.id);
379 return asset;
380 }).then(request.complete).catchError(request.completeError);
381 }
382
383 String toString() => "phase $_location.$_index";
384 }
OLDNEW
« no previous file with comments | « pkg/barback/lib/src/package_provider.dart ('k') | pkg/barback/lib/src/phase_forwarder.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698