OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library barback.graph.phase; | |
6 | |
7 import 'dart:async'; | |
8 | |
9 import '../asset/asset_id.dart'; | |
10 import '../asset/asset_node.dart'; | |
11 import '../asset/asset_node_set.dart'; | |
12 import '../errors.dart'; | |
13 import '../log.dart'; | |
14 import '../transformer/aggregate_transformer.dart'; | |
15 import '../transformer/transformer.dart'; | |
16 import '../transformer/transformer_group.dart'; | |
17 import '../utils.dart'; | |
18 import '../utils/multiset.dart'; | |
19 import 'asset_cascade.dart'; | |
20 import 'group_runner.dart'; | |
21 import 'node_status.dart'; | |
22 import 'node_streams.dart'; | |
23 import 'phase_forwarder.dart'; | |
24 import 'phase_output.dart'; | |
25 import 'transformer_classifier.dart'; | |
26 | |
27 /// One phase in the ordered series of transformations in an [AssetCascade]. | |
28 /// | |
29 /// Each phase can access outputs from previous phases and can in turn pass | |
30 /// outputs to later phases. Phases are processed strictly serially. All | |
31 /// transforms in a phase will be complete before moving on to the next phase. | |
32 /// Within a single phase, all transforms will be run in parallel. | |
33 /// | |
34 /// Building can be interrupted between phases. For example, a source is added | |
35 /// which starts the background process. Sometime during, say, phase 2 (which | |
36 /// is running asynchronously) that source is modified. When the process queue | |
37 /// goes to advance to phase 3, it will see that modification and start the | |
38 /// waterfall from the beginning again. | |
39 class Phase { | |
40 /// The cascade that owns this phase. | |
41 final AssetCascade cascade; | |
42 | |
43 /// A string describing the location of [this] in the transformer graph. | |
44 final String _location; | |
45 | |
46 /// The index of [this] in its parent cascade or group. | |
47 final int _index; | |
48 | |
49 /// The groups for this phase. | |
50 final _groups = new Map<TransformerGroup, GroupRunner>(); | |
51 | |
52 /// The inputs for this phase. | |
53 /// | |
54 /// For the first phase, these will be the source assets. For all other | |
55 /// phases, they will be the outputs from the previous phase. | |
56 final _inputs = new AssetNodeSet(); | |
57 | |
58 /// The transformer classifiers for this phase. | |
59 /// | |
60 /// The keys can be either [Transformer]s or [AggregateTransformer]s. | |
61 final _classifiers = new Map<dynamic, TransformerClassifier>(); | |
62 | |
63 /// The forwarders for this phase. | |
64 final _forwarders = new Map<AssetId, PhaseForwarder>(); | |
65 | |
66 /// The outputs for this phase. | |
67 final _outputs = new Map<AssetId, PhaseOutput>(); | |
68 | |
69 /// The set of all [AssetNode.origin] properties of the input assets for this | |
70 /// phase. | |
71 /// | |
72 /// This is used to determine which assets have been passed unmodified through | |
73 /// [_classifiers] or [_groups]. It's possible that a given asset was consumed | |
74 /// by a group and not an individual transformer, and so shouldn't be | |
75 /// forwarded through the phase as a whole. | |
76 /// | |
77 /// In order to detect whether an output has been forwarded through a group or | |
78 /// a classifier, we must be able to distinguish it from other outputs with | |
79 /// the same id. To do so, we check if its origin is in [_inputOrigins]. If | |
80 /// so, it's been forwarded unmodified. | |
81 final _inputOrigins = new Multiset<AssetNode>(); | |
82 | |
83 /// The streams exposed by this phase. | |
84 final _streams = new NodeStreams(); | |
85 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange; | |
86 Stream<AssetNode> get onAsset => _streams.onAsset; | |
87 Stream<LogEntry> get onLog => _streams.onLog; | |
88 | |
89 /// How far along [this] is in processing its assets. | |
90 NodeStatus get status { | |
91 // Before any transformers are added, the phase should be dirty if and only | |
92 // if any input is dirty. | |
93 if (_classifiers.isEmpty && _groups.isEmpty && previous == null) { | |
94 return _inputs.any((input) => input.state.isDirty) ? | |
95 NodeStatus.RUNNING : NodeStatus.IDLE; | |
96 } | |
97 | |
98 var classifierStatus = NodeStatus.dirtiest( | |
99 _classifiers.values.map((classifier) => classifier.status)); | |
100 var groupStatus = NodeStatus.dirtiest( | |
101 _groups.values.map((group) => group.status)); | |
102 return (previous == null ? NodeStatus.IDLE : previous.status) | |
103 .dirtier(classifierStatus) | |
104 .dirtier(groupStatus); | |
105 } | |
106 | |
107 /// The previous phase in the cascade, or null if this is the first phase. | |
108 final Phase previous; | |
109 | |
110 /// The subscription to [previous]'s [onStatusChange] stream. | |
111 StreamSubscription _previousStatusSubscription; | |
112 | |
113 /// The subscription to [previous]'s [onAsset] stream. | |
114 StreamSubscription<AssetNode> _previousOnAssetSubscription; | |
115 | |
116 final _inputSubscriptions = new Set<StreamSubscription>(); | |
117 | |
118 /// A map of asset ids to completers for [getInput] requests. | |
119 /// | |
120 /// If an asset node is requested before it's available, we put a completer in | |
121 /// this map to wait for the asset to be generated. If it's not generated, the | |
122 /// completer should complete to `null`. | |
123 final _pendingOutputRequests = new Map<AssetId, Completer<AssetNode>>(); | |
124 | |
125 /// Returns all currently-available output assets for this phase. | |
126 Set<AssetNode> get availableOutputs { | |
127 return _outputs.values | |
128 .map((output) => output.output) | |
129 .where((node) => node.state.isAvailable) | |
130 .toSet(); | |
131 } | |
132 | |
133 // TODO(nweiz): Rather than passing the cascade and the phase everywhere, | |
134 // create an interface that just exposes [getInput]. Emit errors via | |
135 // [AssetNode]s. | |
136 Phase(AssetCascade cascade, String location) | |
137 : this._(cascade, location, 0); | |
138 | |
139 Phase._(this.cascade, this._location, this._index, [this.previous]) { | |
140 if (previous != null) { | |
141 _previousOnAssetSubscription = previous.onAsset.listen(addInput); | |
142 _previousStatusSubscription = previous.onStatusChange | |
143 .listen((_) => _streams.changeStatus(status)); | |
144 } | |
145 | |
146 onStatusChange.listen((status) { | |
147 if (status == NodeStatus.RUNNING) return; | |
148 | |
149 // All the previous phases have finished declaring or producing their | |
150 // outputs. If anyone's still waiting for outputs, cut off the wait; we | |
151 // won't be generating them, at least until a source asset changes. | |
152 for (var completer in _pendingOutputRequests.values) { | |
153 completer.complete(null); | |
154 } | |
155 _pendingOutputRequests.clear(); | |
156 }); | |
157 } | |
158 | |
159 /// Adds a new asset as an input for this phase. | |
160 /// | |
161 /// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase | |
162 /// will automatically begin determining which transforms can consume it as a | |
163 /// primary input. The transforms themselves won't be applied until [process] | |
164 /// is called, however. | |
165 /// | |
166 /// This should only be used for brand-new assets or assets that have been | |
167 /// removed and re-created. The phase will automatically handle updated assets | |
168 /// using the [AssetNode.onStateChange] stream. | |
169 void addInput(AssetNode node) { | |
170 // Each group is one channel along which an asset may be forwarded, as is | |
171 // each transformer. | |
172 var forwarder = new PhaseForwarder( | |
173 node, _classifiers.length, _groups.length); | |
174 _forwarders[node.id] = forwarder; | |
175 forwarder.onAsset.listen(_handleOutputWithoutForwarder); | |
176 if (forwarder.output != null) { | |
177 _handleOutputWithoutForwarder(forwarder.output); | |
178 } | |
179 | |
180 _inputOrigins.add(node.origin); | |
181 _inputs.add(node); | |
182 _inputSubscriptions.add(node.onStateChange.listen((state) { | |
183 if (state.isRemoved) { | |
184 _inputOrigins.remove(node.origin); | |
185 _forwarders.remove(node.id).remove(); | |
186 } | |
187 _streams.changeStatus(status); | |
188 })); | |
189 | |
190 for (var classifier in _classifiers.values) { | |
191 classifier.addInput(node); | |
192 } | |
193 } | |
194 | |
195 // TODO(nweiz): If the output is available when this is called, it's | |
196 // theoretically possible for it to become unavailable between the call and | |
197 // the return. If it does so, it won't trigger the rebuilding process. To | |
198 // avoid this, we should have this and the methods it calls take explicit | |
199 // callbacks, as in [AssetNode.whenAvailable]. | |
200 /// Gets the asset node for an output [id]. | |
201 /// | |
202 /// If [id] is for a generated or transformed asset, this will wait until it | |
203 /// has been created and return it. This means that the returned asset will | |
204 /// always be [AssetState.AVAILABLE]. | |
205 /// | |
206 /// If the output cannot be found, returns null. | |
207 Future<AssetNode> getOutput(AssetId id) { | |
208 return syncFuture(() { | |
209 if (id.package != cascade.package) return cascade.graph.getAssetNode(id); | |
210 if (_outputs.containsKey(id)) { | |
211 var output = _outputs[id].output; | |
212 // If the requested output is available, we can just return it. | |
213 if (output.state.isAvailable) return output; | |
214 | |
215 // If the requested output exists but isn't yet available, wait to see | |
216 // if it becomes available. If it's removed before becoming available, | |
217 // try again, since it could be generated again. | |
218 output.force(); | |
219 return output.whenAvailable((_) { | |
220 return output; | |
221 }).catchError((error) { | |
222 if (error is! AssetNotFoundException) throw error; | |
223 return getOutput(id); | |
224 }); | |
225 } | |
226 | |
227 // If this phase and the previous phases are fully declared or done, the | |
228 // requested output won't be generated and we can safely return null. | |
229 if (status != NodeStatus.RUNNING) return null; | |
230 | |
231 // Otherwise, store a completer for the asset node. If it's generated in | |
232 // the future, we'll complete this completer. | |
233 var completer = _pendingOutputRequests.putIfAbsent(id, | |
234 () => new Completer.sync()); | |
235 return completer.future; | |
236 }); | |
237 } | |
238 | |
239 /// Set this phase's transformers to [transformers]. | |
240 void updateTransformers(Iterable transformers) { | |
241 var newTransformers = transformers | |
242 .where((op) => op is Transformer || op is AggregateTransformer) | |
243 .toSet(); | |
244 var oldTransformers = _classifiers.keys.toSet(); | |
245 for (var removed in oldTransformers.difference(newTransformers)) { | |
246 _classifiers.remove(removed).remove(); | |
247 } | |
248 | |
249 for (var transformer in newTransformers.difference(oldTransformers)) { | |
250 var classifier = new TransformerClassifier( | |
251 this, transformer, "$_location.$_index"); | |
252 _classifiers[transformer] = classifier; | |
253 classifier.onAsset.listen(_handleOutput); | |
254 _streams.onLogPool.add(classifier.onLog); | |
255 classifier.onStatusChange.listen((_) => _streams.changeStatus(status)); | |
256 for (var input in _inputs) { | |
257 classifier.addInput(input); | |
258 } | |
259 } | |
260 | |
261 var newGroups = transformers.where((op) => op is TransformerGroup) | |
262 .toSet(); | |
263 var oldGroups = _groups.keys.toSet(); | |
264 for (var removed in oldGroups.difference(newGroups)) { | |
265 _groups.remove(removed).remove(); | |
266 } | |
267 | |
268 for (var added in newGroups.difference(oldGroups)) { | |
269 var runner = new GroupRunner(previous, added, "$_location.$_index"); | |
270 _groups[added] = runner; | |
271 runner.onAsset.listen(_handleOutput); | |
272 _streams.onLogPool.add(runner.onLog); | |
273 runner.onStatusChange.listen((_) => _streams.changeStatus(status)); | |
274 } | |
275 | |
276 for (var forwarder in _forwarders.values) { | |
277 forwarder.updateTransformers(_classifiers.length, _groups.length); | |
278 } | |
279 | |
280 _streams.changeStatus(status); | |
281 } | |
282 | |
283 /// Force all [LazyTransformer]s' transforms in this phase to begin producing | |
284 /// concrete assets. | |
285 void forceAllTransforms() { | |
286 for (var classifier in _classifiers.values) { | |
287 classifier.forceAllTransforms(); | |
288 } | |
289 | |
290 for (var group in _groups.values) { | |
291 group.forceAllTransforms(); | |
292 } | |
293 } | |
294 | |
295 /// Add a new phase after this one. | |
296 /// | |
297 /// The new phase will have a location annotation describing its place in the | |
298 /// package graph. By default, this annotation will describe it as being | |
299 /// directly after [this]. If [location] is passed, though, it's described as | |
300 /// being the first phase in that location. | |
301 Phase addPhase([String location]) { | |
302 var index = 0; | |
303 if (location == null) { | |
304 location = _location; | |
305 index = _index + 1; | |
306 } | |
307 | |
308 var next = new Phase._(cascade, location, index, this); | |
309 for (var output in _outputs.values.toList()) { | |
310 // Remove [output]'s listeners because now they should get the asset from | |
311 // [next], rather than this phase. Any transforms consuming [output] will | |
312 // be re-run and will consume the output from the new final phase. | |
313 output.removeListeners(); | |
314 } | |
315 return next; | |
316 } | |
317 | |
318 /// Mark this phase as removed. | |
319 /// | |
320 /// This will remove all the phase's outputs. | |
321 void remove() { | |
322 for (var classifier in _classifiers.values.toList()) { | |
323 classifier.remove(); | |
324 } | |
325 for (var group in _groups.values) { | |
326 group.remove(); | |
327 } | |
328 _streams.close(); | |
329 for (var subscription in _inputSubscriptions) { | |
330 subscription.cancel(); | |
331 } | |
332 if (_previousStatusSubscription != null) { | |
333 _previousStatusSubscription.cancel(); | |
334 } | |
335 if (_previousOnAssetSubscription != null) { | |
336 _previousOnAssetSubscription.cancel(); | |
337 } | |
338 } | |
339 | |
340 /// Add [asset] as an output of this phase. | |
341 void _handleOutput(AssetNode asset) { | |
342 if (_inputOrigins.contains(asset.origin)) { | |
343 _forwarders[asset.id].addIntermediateAsset(asset); | |
344 } else { | |
345 _handleOutputWithoutForwarder(asset); | |
346 } | |
347 } | |
348 | |
349 /// Add [asset] as an output of this phase without checking if it's a | |
350 /// forwarded asset. | |
351 void _handleOutputWithoutForwarder(AssetNode asset) { | |
352 if (_outputs.containsKey(asset.id)) { | |
353 _outputs[asset.id].add(asset); | |
354 } else { | |
355 _outputs[asset.id] = new PhaseOutput(this, asset, "$_location.$_index"); | |
356 _outputs[asset.id].onAsset.listen(_emit, | |
357 onDone: () => _outputs.remove(asset.id)); | |
358 _emit(_outputs[asset.id].output); | |
359 } | |
360 | |
361 var exception = _outputs[asset.id].collisionException; | |
362 if (exception != null) cascade.reportError(exception); | |
363 } | |
364 | |
365 /// Emit [asset] as an output of this phase. | |
366 /// | |
367 /// This should be called after [_handleOutput], so that collisions are | |
368 /// resolved. | |
369 void _emit(AssetNode asset) { | |
370 _streams.onAssetController.add(asset); | |
371 _providePendingAsset(asset); | |
372 } | |
373 | |
374 /// Provide an asset to a pending [getOutput] call. | |
375 void _providePendingAsset(AssetNode asset) { | |
376 // If anyone's waiting for this asset, provide it to them. | |
377 var request = _pendingOutputRequests.remove(asset.id); | |
378 if (request == null) return; | |
379 | |
380 if (asset.state.isAvailable) { | |
381 request.complete(asset); | |
382 return; | |
383 } | |
384 | |
385 // A lazy asset may be emitted while still dirty. If so, we wait until it's | |
386 // either available or removed before trying again to access it. | |
387 assert(asset.state.isDirty); | |
388 asset.force(); | |
389 asset.whenStateChanges().then((state) { | |
390 if (state.isRemoved) return getOutput(asset.id); | |
391 return asset; | |
392 }).then(request.complete).catchError(request.completeError); | |
393 } | |
394 | |
395 String toString() => "phase $_location.$_index"; | |
396 } | |
OLD | NEW |