OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library barback.graph.transform_node; | |
6 | |
7 import 'dart:async'; | |
8 | |
9 import '../asset/asset.dart'; | |
10 import '../asset/asset_id.dart'; | |
11 import '../asset/asset_node.dart'; | |
12 import '../asset/asset_node_set.dart'; | |
13 import '../errors.dart'; | |
14 import '../log.dart'; | |
15 import '../transformer/aggregate_transform.dart'; | |
16 import '../transformer/aggregate_transformer.dart'; | |
17 import '../transformer/declaring_aggregate_transform.dart'; | |
18 import '../transformer/declaring_aggregate_transformer.dart'; | |
19 import '../transformer/lazy_aggregate_transformer.dart'; | |
20 import '../utils.dart'; | |
21 import 'node_status.dart'; | |
22 import 'node_streams.dart'; | |
23 import 'phase.dart'; | |
24 import 'transformer_classifier.dart'; | |
25 | |
26 /// Describes a transform on a set of assets and its relationship to the build | |
27 /// dependency graph. | |
28 /// | |
29 /// Keeps track of whether it's dirty and needs to be run and which assets it | |
30 /// depends on. | |
31 class TransformNode { | |
32 /// The aggregate key for this node. | |
33 final String key; | |
34 | |
35 /// The [TransformerClassifier] that [this] belongs to. | |
36 final TransformerClassifier classifier; | |
37 | |
38 /// The [Phase] that this transform runs in. | |
39 Phase get phase => classifier.phase; | |
40 | |
41 /// The [AggregateTransformer] to apply to this node's inputs. | |
42 final AggregateTransformer transformer; | |
43 | |
44 /// The primary asset nodes this transform runs on. | |
45 final _primaries = new AssetNodeSet(); | |
46 | |
47 /// A string describing the location of [this] in the transformer graph. | |
48 final String _location; | |
49 | |
50 /// The subscription to the [_primaries]' [AssetNode.onStateChange] streams. | |
51 final _primarySubscriptions = new Map<AssetId, StreamSubscription>(); | |
52 | |
53 /// The subscription to [phase]'s [Phase.onAsset] stream. | |
54 StreamSubscription<AssetNode> _phaseAssetSubscription; | |
55 | |
56 /// The subscription to [phase]'s [Phase.onStatusChange] stream. | |
57 StreamSubscription<NodeStatus> _phaseStatusSubscription; | |
58 | |
59 /// How far along [this] is in processing its assets. | |
60 NodeStatus get status { | |
61 if (_state == _State.APPLIED || _state == _State.DECLARED) { | |
62 return NodeStatus.IDLE; | |
63 } | |
64 | |
65 if (_declaring && _state != _State.DECLARING && | |
66 _state != _State.NEEDS_DECLARE) { | |
67 return NodeStatus.MATERIALIZING; | |
68 } else { | |
69 return NodeStatus.RUNNING; | |
70 } | |
71 } | |
72 | |
73 /// The [TransformInfo] describing this node. | |
74 /// | |
75 /// [TransformInfo] is the publicly-visible representation of a transform | |
76 /// node. | |
77 TransformInfo get info => new TransformInfo(transformer, | |
78 new AssetId(phase.cascade.package, key)); | |
79 | |
80 /// Whether this is a declaring transform. | |
81 /// | |
82 /// This is usually identical to `transformer is | |
83 /// DeclaringAggregateTransformer`, but if a declaring and non-lazy | |
84 /// transformer emits an error during `declareOutputs` it's treated as though | |
85 /// it wasn't declaring. | |
86 bool get _declaring => transformer is DeclaringAggregateTransformer && | |
87 (_state == _State.DECLARING || _declaredOutputs != null); | |
88 | |
89 /// Whether this transform has been forced since it last finished applying. | |
90 /// | |
91 /// A transform being forced means it should run until it generates outputs | |
92 /// and is no longer dirty. This is always true for non-declaring | |
93 /// transformers, since they always need to eagerly generate outputs. | |
94 bool _forced; | |
95 | |
96 /// The subscriptions to each secondary input's [AssetNode.onStateChange] | |
97 /// stream. | |
98 final _secondarySubscriptions = new Map<AssetId, StreamSubscription>(); | |
99 | |
100 /// The controllers for the asset nodes emitted by this node. | |
101 final _outputControllers = new Map<AssetId, AssetNodeController>(); | |
102 | |
103 /// The ids of inputs the transformer tried and failed to read last time it | |
104 /// ran. | |
105 final _missingInputs = new Set<AssetId>(); | |
106 | |
107 /// The controllers that are used to pass each primary input through [this] if | |
108 /// it's not consumed or overwritten. | |
109 /// | |
110 /// This needs an intervening controller to ensure that the output can be | |
111 /// marked dirty when determining whether [this] will consume or overwrite it, | |
112 /// and be marked removed if it does. No pass-through controller will exist | |
113 /// for primary inputs that are not being passed through. | |
114 final _passThroughControllers = new Map<AssetId, AssetNodeController>(); | |
115 | |
116 /// The asset node for this transform. | |
117 final _streams = new NodeStreams(); | |
118 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange; | |
119 Stream<AssetNode> get onAsset => _streams.onAsset; | |
120 Stream<LogEntry> get onLog => _streams.onLog; | |
121 | |
122 /// The current state of [this]. | |
123 var _state = _State.DECLARED; | |
124 | |
125 /// Whether [this] has been marked as removed. | |
126 bool get _isRemoved => _streams.onAssetController.isClosed; | |
127 | |
128 // If [transformer] is declaring but not lazy and [primary] is available, we | |
129 // can run [apply] even if [force] hasn't been called, since [transformer] | |
130 // should run eagerly if possible. | |
131 bool get _canRunDeclaringEagerly => | |
132 _declaring && transformer is! LazyAggregateTransformer && | |
133 _primaries.every((input) => input.state.isAvailable); | |
134 | |
135 /// Which primary inputs the most recent run of this transform has declared | |
136 /// that it consumes. | |
137 /// | |
138 /// This starts out `null`, indicating that the transform hasn't declared | |
139 /// anything yet. This is not meaningful unless [_state] is [_State.APPLIED] | |
140 /// or [_State.DECLARED]. | |
141 Set<AssetId> _consumedPrimaries; | |
142 | |
143 /// The set of output ids that [transformer] declared it would emit. | |
144 /// | |
145 /// This is only non-null if [transformer] is a | |
146 /// [DeclaringAggregateTransformer] and its [declareOutputs] has been run | |
147 /// successfully. | |
148 Set<AssetId> _declaredOutputs; | |
149 | |
150 /// The controller for the currently-running | |
151 /// [DeclaringAggregateTransformer.declareOutputs] call's | |
152 /// [DeclaringAggregateTransform]. | |
153 /// | |
154 /// This will be non-`null` when | |
155 /// [DeclaringAggregateTransformer.declareOutputs] is running. This means that | |
156 /// it's always non-`null` when [_state] is [_State.DECLARING], sometimes | |
157 /// non-`null` when it's [_State.NEEDS_DECLARE], and always `null` otherwise. | |
158 DeclaringAggregateTransformController _declareController; | |
159 | |
160 /// The controller for the currently-running [AggregateTransformer.apply] | |
161 /// call's [AggregateTransform]. | |
162 /// | |
163 /// This will be non-`null` when [AggregateTransform.apply] is running, which | |
164 /// means that it's always non-`null` when [_state] is [_State.APPLYING] or | |
165 /// [_State.NEEDS_APPLY], sometimes non-`null` when it's | |
166 /// [_State.NEEDS_DECLARE], and always `null` otherwise. | |
167 AggregateTransformController _applyController; | |
168 | |
169 /// The number of secondary inputs that have been requested but not yet | |
170 /// produced. | |
171 int _pendingSecondaryInputs = 0; | |
172 | |
173 /// A stopwatch that tracks the total time spent in a transformer's `apply` | |
174 /// function. | |
175 final _timeInTransformer = new Stopwatch(); | |
176 | |
177 /// A stopwatch that tracks the time in a transformer's `apply` function spent | |
178 /// waiting for [getInput] calls to complete. | |
179 final _timeAwaitingInputs = new Stopwatch(); | |
180 | |
181 TransformNode(this.classifier, this.transformer, this.key, this._location) { | |
182 _forced = transformer is! DeclaringAggregateTransformer; | |
183 | |
184 _phaseAssetSubscription = phase.previous.onAsset.listen((node) { | |
185 if (!_missingInputs.contains(node.id)) return; | |
186 if (_forced) node.force(); | |
187 _dirty(); | |
188 }); | |
189 | |
190 _phaseStatusSubscription = phase.previous.onStatusChange.listen((status) { | |
191 if (status == NodeStatus.RUNNING) return; | |
192 | |
193 _maybeFinishDeclareController(); | |
194 _maybeFinishApplyController(); | |
195 }); | |
196 | |
197 classifier.onDoneClassifying.listen((_) { | |
198 _maybeFinishDeclareController(); | |
199 _maybeFinishApplyController(); | |
200 }); | |
201 | |
202 _run(); | |
203 } | |
204 | |
205 /// Adds [input] as a primary input for this node. | |
206 void addPrimary(AssetNode input) { | |
207 _primaries.add(input); | |
208 if (_forced) input.force(); | |
209 | |
210 _primarySubscriptions[input.id] = input.onStateChange | |
211 .listen((_) => _onPrimaryStateChange(input)); | |
212 | |
213 if (_state == _State.DECLARING && !_declareController.isDone) { | |
214 // If we're running `declareOutputs` and its id stream isn't closed yet, | |
215 // pass this in as another id. | |
216 _declareController.addId(input.id); | |
217 _maybeFinishDeclareController(); | |
218 } else if (_state == _State.APPLYING) { | |
219 // If we're running `apply`, we need to wait until [input] is available | |
220 // before we pass it into the stream. If it's available now, great; if | |
221 // not, [_onPrimaryStateChange] will handle it. | |
222 if (!input.state.isAvailable) { | |
223 // If we started running eagerly without being forced, abort that run if | |
224 // a new unavailable asset comes in. | |
225 if (input.isLazy && !_forced) _restartRun(); | |
226 return; | |
227 } | |
228 | |
229 _onPrimaryStateChange(input); | |
230 _maybeFinishApplyController(); | |
231 } else { | |
232 // Otherwise, a new input means we'll need to re-run `declareOutputs`. | |
233 _restartRun(); | |
234 } | |
235 } | |
236 | |
237 /// Marks this transform as removed. | |
238 /// | |
239 /// This causes all of the transform's outputs to be marked as removed as | |
240 /// well. Normally this will be automatically done internally based on events | |
241 /// from the primary input, but it's possible for a transform to no longer be | |
242 /// valid even if its primary input still exists. | |
243 void remove() { | |
244 _streams.close(); | |
245 _phaseAssetSubscription.cancel(); | |
246 _phaseStatusSubscription.cancel(); | |
247 if (_declareController != null) _declareController.cancel(); | |
248 if (_applyController != null) _applyController.cancel(); | |
249 _clearSecondarySubscriptions(); | |
250 _clearOutputs(); | |
251 | |
252 for (var subscription in _primarySubscriptions.values) { | |
253 subscription.cancel(); | |
254 } | |
255 _primarySubscriptions.clear(); | |
256 | |
257 for (var controller in _passThroughControllers.values) { | |
258 controller.setRemoved(); | |
259 } | |
260 _passThroughControllers.clear(); | |
261 } | |
262 | |
263 /// If [this] is deferred, ensures that its concrete outputs will be | |
264 /// generated. | |
265 void force() { | |
266 if (_forced || _state == _State.APPLIED) return; | |
267 for (var input in _primaries) { | |
268 input.force(); | |
269 } | |
270 | |
271 _forced = true; | |
272 if (_state == _State.DECLARED) _apply(); | |
273 } | |
274 | |
275 /// Marks this transform as dirty. | |
276 /// | |
277 /// Specifically, this should be called when one of the transform's inputs' | |
278 /// contents change, or when a secondary input is removed. Primary inputs | |
279 /// being added or removed are handled by [addInput] and | |
280 /// [_onPrimaryStateChange]. | |
281 void _dirty() { | |
282 if (_state == _State.DECLARING || _state == _State.NEEDS_DECLARE || | |
283 _state == _State.NEEDS_APPLY) { | |
284 // If we already know that [_apply] needs to be run, there's nothing to do | |
285 // here. | |
286 return; | |
287 } | |
288 | |
289 if (!_forced && !_canRunDeclaringEagerly) { | |
290 // [forced] should only ever be false for a declaring transformer. | |
291 assert(_declaring); | |
292 | |
293 // If we've finished applying, transition to DECLARED, indicating that we | |
294 // know what outputs [apply] will emit but we're waiting to emit them | |
295 // concretely until [force] is called. If we're still applying, we'll | |
296 // transition to DECLARED once we finish. | |
297 if (_state == _State.APPLIED) _state = _State.DECLARED; | |
298 for (var controller in _outputControllers.values) { | |
299 controller.setLazy(force); | |
300 } | |
301 _emitDeclaredOutputs(); | |
302 return; | |
303 } | |
304 | |
305 if (_state == _State.APPLIED) { | |
306 if (_declaredOutputs != null) _emitDeclaredOutputs(); | |
307 _apply(); | |
308 } else if (_state == _State.DECLARED) { | |
309 _apply(); | |
310 } else { | |
311 _state = _State.NEEDS_APPLY; | |
312 } | |
313 } | |
314 | |
315 /// The callback called when [input]'s state changes. | |
316 void _onPrimaryStateChange(AssetNode input) { | |
317 if (input.state.isRemoved) { | |
318 _primarySubscriptions.remove(input.id); | |
319 | |
320 if (_primaries.isEmpty) { | |
321 // If there are no more primary inputs, there's no more use for this | |
322 // node in the graph. It will be re-created by its | |
323 // [TransformerClassifier] if a new input with [key] is added. | |
324 remove(); | |
325 return; | |
326 } | |
327 | |
328 // Any change to the number of primary inputs requires that we re-run the | |
329 // transformation. | |
330 _restartRun(); | |
331 } else if (input.state.isAvailable) { | |
332 if (_state == _State.DECLARED && _canRunDeclaringEagerly) { | |
333 // If [this] is fully declared but hasn't started applying, this input | |
334 // becoming available may mean that all inputs are available, in which | |
335 // case we can run apply eagerly. | |
336 _apply(); | |
337 return; | |
338 } | |
339 | |
340 // If we're not actively passing concrete assets to the transformer, the | |
341 // distinction between a dirty asset and an available one isn't relevant. | |
342 if (_state != _State.APPLYING) return; | |
343 | |
344 if (_applyController.isDone) { | |
345 // If we get a new asset after we've closed the asset stream, we need to | |
346 // re-run declare and then apply. | |
347 _restartRun(); | |
348 } else { | |
349 // If the new asset comes before the asset stream is done, we can just | |
350 // pass it to the stream. | |
351 _applyController.addInput(input.asset); | |
352 _maybeFinishApplyController(); | |
353 } | |
354 } else { | |
355 if (_forced) input.force(); | |
356 if (_state == _State.APPLYING && !_applyController.addedId(input.id) && | |
357 (_forced || !input.isLazy)) { | |
358 // If the input hasn't yet been added to the transform's input stream, | |
359 // there's no need to consider the transformation dirty. However, if the | |
360 // input is lazy and we're running eagerly, we need to restart the | |
361 // transformation. | |
362 return; | |
363 } | |
364 _dirty(); | |
365 } | |
366 } | |
367 | |
368 /// Run the entire transformation, including both `declareOutputs` (if | |
369 /// applicable) and `apply`. | |
370 void _run() { | |
371 assert(_state != _State.DECLARING); | |
372 assert(_state != _State.APPLYING); | |
373 | |
374 _markOutputsDirty(); | |
375 _declareOutputs(() { | |
376 if (_forced || _canRunDeclaringEagerly) { | |
377 _apply(); | |
378 } else { | |
379 _state = _State.DECLARED; | |
380 _streams.changeStatus(NodeStatus.IDLE); | |
381 } | |
382 }); | |
383 } | |
384 | |
385 /// Restart the entire transformation, including `declareOutputs` if | |
386 /// applicable. | |
387 void _restartRun() { | |
388 if (_state == _State.DECLARED || _state == _State.APPLIED) { | |
389 // If we're currently idle, we can restart the transformation immediately. | |
390 _run(); | |
391 return; | |
392 } | |
393 | |
394 // If we're actively running `declareOutputs` or `apply`, cancel the | |
395 // transforms and transition to `NEEDS_DECLARE`. Once the transformer's | |
396 // method returns, we'll transition to `DECLARING`. | |
397 if (_declareController != null) _declareController.cancel(); | |
398 if (_applyController != null) _applyController.cancel(); | |
399 _state = _State.NEEDS_DECLARE; | |
400 } | |
401 | |
402 /// Runs [transform.declareOutputs] and emits the resulting assets as dirty | |
403 /// assets. | |
404 /// | |
405 /// Calls [callback] when it's finished. This doesn't return a future so that | |
406 /// [callback] is called synchronously if there are no outputs to declare. If | |
407 /// [this] is removed while inputs are being declared, [callback] will not be | |
408 /// called. | |
409 void _declareOutputs(void callback()) { | |
410 if (transformer is! DeclaringAggregateTransformer) { | |
411 callback(); | |
412 return; | |
413 } | |
414 | |
415 _state = _State.DECLARING; | |
416 var controller = new DeclaringAggregateTransformController(this); | |
417 _declareController = controller; | |
418 _streams.onLogPool.add(controller.onLog); | |
419 for (var primary in _primaries) { | |
420 controller.addId(primary.id); | |
421 } | |
422 _maybeFinishDeclareController(); | |
423 | |
424 syncFuture(() { | |
425 return (transformer as DeclaringAggregateTransformer) | |
426 .declareOutputs(controller.transform); | |
427 }).whenComplete(() { | |
428 // Cancel the controller here even if `declareOutputs` wasn't interrupted. | |
429 // Since the declaration is finished, we want to close out the | |
430 // controller's streams. | |
431 controller.cancel(); | |
432 _declareController = null; | |
433 }).then((_) { | |
434 if (_isRemoved) return; | |
435 if (_state == _State.NEEDS_DECLARE) { | |
436 _declareOutputs(callback); | |
437 return; | |
438 } | |
439 | |
440 if (controller.loggedError) { | |
441 // If `declareOutputs` fails, fall back to treating a declaring | |
442 // transformer as though it were eager. | |
443 if (transformer is! LazyAggregateTransformer) _forced = true; | |
444 callback(); | |
445 return; | |
446 } | |
447 | |
448 _consumedPrimaries = controller.consumedPrimaries; | |
449 _declaredOutputs = controller.outputIds; | |
450 var invalidIds = _declaredOutputs | |
451 .where((id) => id.package != phase.cascade.package).toSet(); | |
452 for (var id in invalidIds) { | |
453 _declaredOutputs.remove(id); | |
454 // TODO(nweiz): report this as a warning rather than a failing error. | |
455 phase.cascade.reportError(new InvalidOutputException(info, id)); | |
456 } | |
457 | |
458 for (var primary in _primaries) { | |
459 if (_declaredOutputs.contains(primary.id)) continue; | |
460 _passThrough(primary.id); | |
461 } | |
462 _emitDeclaredOutputs(); | |
463 callback(); | |
464 }).catchError((error, stackTrace) { | |
465 if (_isRemoved) return; | |
466 if (transformer is! LazyAggregateTransformer) _forced = true; | |
467 phase.cascade.reportError(_wrapException(error, stackTrace)); | |
468 callback(); | |
469 }); | |
470 } | |
471 | |
472 /// Emits a dirty asset node for all outputs that were declared by the | |
473 /// transformer. | |
474 /// | |
475 /// This won't emit any outputs for which there already exist output | |
476 /// controllers. It should only be called for transforms that have declared | |
477 /// their outputs. | |
478 void _emitDeclaredOutputs() { | |
479 assert(_declaredOutputs != null); | |
480 for (var id in _declaredOutputs) { | |
481 if (_outputControllers.containsKey(id)) continue; | |
482 var controller = _forced | |
483 ? new AssetNodeController(id, this) | |
484 : new AssetNodeController.lazy(id, force, this); | |
485 _outputControllers[id] = controller; | |
486 _streams.onAssetController.add(controller.node); | |
487 } | |
488 } | |
489 | |
490 //// Mark all emitted and passed-through outputs of this transform as dirty. | |
491 void _markOutputsDirty() { | |
492 for (var controller in _passThroughControllers.values) { | |
493 controller.setDirty(); | |
494 } | |
495 for (var controller in _outputControllers.values) { | |
496 if (_forced) { | |
497 controller.setDirty(); | |
498 } else { | |
499 controller.setLazy(force); | |
500 } | |
501 } | |
502 } | |
503 | |
504 /// Applies this transform. | |
505 void _apply() { | |
506 assert(!_isRemoved); | |
507 | |
508 _markOutputsDirty(); | |
509 _clearSecondarySubscriptions(); | |
510 _state = _State.APPLYING; | |
511 _streams.changeStatus(status); | |
512 _runApply().then((hadError) { | |
513 if (_isRemoved) return; | |
514 | |
515 if (_state == _State.DECLARED) return; | |
516 | |
517 if (_state == _State.NEEDS_DECLARE) { | |
518 _run(); | |
519 return; | |
520 } | |
521 | |
522 // If an input's contents changed while running `apply`, retry unless the | |
523 // transformer is deferred and hasn't been forced. | |
524 if (_state == _State.NEEDS_APPLY) { | |
525 if (_forced || _canRunDeclaringEagerly) { | |
526 _apply(); | |
527 } else { | |
528 _state = _State.DECLARED; | |
529 } | |
530 return; | |
531 } | |
532 | |
533 if (_declaring) _forced = false; | |
534 | |
535 assert(_state == _State.APPLYING); | |
536 if (hadError) { | |
537 _clearOutputs(); | |
538 // If the transformer threw an error, we don't want to emit the | |
539 // pass-through assets in case they'll be overwritten by the | |
540 // transformer. However, if the transformer declared that it wouldn't | |
541 // overwrite or consume a pass-through asset, we can safely emit it. | |
542 if (_declaredOutputs != null) { | |
543 for (var input in _primaries) { | |
544 if (_consumedPrimaries.contains(input.id) || | |
545 _declaredOutputs.contains(input.id)) { | |
546 _consumePrimary(input.id); | |
547 } else { | |
548 _passThrough(input.id); | |
549 } | |
550 } | |
551 } | |
552 } | |
553 | |
554 _state = _State.APPLIED; | |
555 _streams.changeStatus(NodeStatus.IDLE); | |
556 }); | |
557 } | |
558 | |
559 /// Gets the asset for an input [id]. | |
560 /// | |
561 /// If an input with [id] cannot be found, throws an [AssetNotFoundException]. | |
562 Future<Asset> getInput(AssetId id) { | |
563 _timeAwaitingInputs.start(); | |
564 _pendingSecondaryInputs++; | |
565 return phase.previous.getOutput(id).then((node) { | |
566 // Throw if the input isn't found. This ensures the transformer's apply | |
567 // is exited. We'll then catch this and report it through the proper | |
568 // results stream. | |
569 if (node == null) { | |
570 _missingInputs.add(id); | |
571 throw new AssetNotFoundException(id); | |
572 } | |
573 | |
574 _secondarySubscriptions.putIfAbsent(node.id, () { | |
575 return node.onStateChange.listen((_) => _dirty()); | |
576 }); | |
577 | |
578 return node.asset; | |
579 }).whenComplete(() { | |
580 _pendingSecondaryInputs--; | |
581 if (_pendingSecondaryInputs != 0) _timeAwaitingInputs.stop(); | |
582 }); | |
583 } | |
584 | |
585 /// Run [AggregateTransformer.apply]. | |
586 /// | |
587 /// Returns whether or not an error occurred while running the transformer. | |
588 Future<bool> _runApply() { | |
589 var controller = new AggregateTransformController(this); | |
590 _applyController = controller; | |
591 _streams.onLogPool.add(controller.onLog); | |
592 for (var primary in _primaries) { | |
593 if (!primary.state.isAvailable) continue; | |
594 controller.addInput(primary.asset); | |
595 } | |
596 _maybeFinishApplyController(); | |
597 | |
598 return syncFuture(() { | |
599 _timeInTransformer.reset(); | |
600 _timeAwaitingInputs.reset(); | |
601 _timeInTransformer.start(); | |
602 return transformer.apply(controller.transform); | |
603 }).whenComplete(() { | |
604 _timeInTransformer.stop(); | |
605 _timeAwaitingInputs.stop(); | |
606 | |
607 // Cancel the controller here even if `apply` wasn't interrupted. Since | |
608 // the apply is finished, we want to close out the controller's streams. | |
609 controller.cancel(); | |
610 _applyController = null; | |
611 }).then((_) { | |
612 assert(_state != _State.DECLARED); | |
613 assert(_state != _State.DECLARING); | |
614 assert(_state != _State.APPLIED); | |
615 | |
616 if (!_forced && _primaries.any((node) => !node.state.isAvailable)) { | |
617 _state = _State.DECLARED; | |
618 _streams.changeStatus(NodeStatus.IDLE); | |
619 return false; | |
620 } | |
621 | |
622 if (_isRemoved) return false; | |
623 if (_state == _State.NEEDS_APPLY) return false; | |
624 if (_state == _State.NEEDS_DECLARE) return false; | |
625 if (controller.loggedError) return true; | |
626 | |
627 // If the transformer took long enough, log its duration in fine output. | |
628 // That way it's not always visible, but users running with "pub serve | |
629 // --verbose" can see it. | |
630 var ranLong = _timeInTransformer.elapsed > new Duration(seconds: 1); | |
631 var ranLongLocally = | |
632 _timeInTransformer.elapsed - _timeAwaitingInputs.elapsed > | |
633 new Duration(milliseconds: 200); | |
634 | |
635 // Report the transformer's timing information if it spent more than 0.2s | |
636 // doing things other than waiting for its secondary inputs or if it spent | |
637 // more than 1s in total. | |
638 if (ranLongLocally || ranLong) { | |
639 _streams.onLogController.add(new LogEntry( | |
640 info, info.primaryId, LogLevel.FINE, | |
641 "Took ${niceDuration(_timeInTransformer.elapsed)} " | |
642 "(${niceDuration(_timeAwaitingInputs.elapsed)} awaiting " | |
643 "secondary inputs).", | |
644 null)); | |
645 } | |
646 | |
647 _handleApplyResults(controller); | |
648 return false; | |
649 }).catchError((error, stackTrace) { | |
650 // If the transform became dirty while processing, ignore any errors from | |
651 // it. | |
652 if (_state == _State.NEEDS_APPLY || _isRemoved) return false; | |
653 | |
654 // Catch all transformer errors and pipe them to the results stream. This | |
655 // is so a broken transformer doesn't take down the whole graph. | |
656 phase.cascade.reportError(_wrapException(error, stackTrace)); | |
657 return true; | |
658 }); | |
659 } | |
660 | |
661 /// Handle the results of running [Transformer.apply]. | |
662 /// | |
663 /// [controller] should be the controller for the [AggegateTransform] passed | |
664 /// to [AggregateTransformer.apply]. | |
665 void _handleApplyResults(AggregateTransformController controller) { | |
666 _consumedPrimaries = controller.consumedPrimaries; | |
667 | |
668 var newOutputs = controller.outputs; | |
669 // Any ids that are for a different package are invalid. | |
670 var invalidIds = newOutputs | |
671 .map((asset) => asset.id) | |
672 .where((id) => id.package != phase.cascade.package) | |
673 .toSet(); | |
674 for (var id in invalidIds) { | |
675 newOutputs.removeId(id); | |
676 // TODO(nweiz): report this as a warning rather than a failing error. | |
677 phase.cascade.reportError(new InvalidOutputException(info, id)); | |
678 } | |
679 | |
680 // Remove outputs that used to exist but don't anymore. | |
681 for (var id in _outputControllers.keys.toList()) { | |
682 if (newOutputs.containsId(id)) continue; | |
683 _outputControllers.remove(id).setRemoved(); | |
684 } | |
685 | |
686 // Emit or stop emitting pass-through assets between removing and adding | |
687 // outputs to ensure there are no collisions. | |
688 for (var id in _primaries.map((node) => node.id)) { | |
689 if (_consumedPrimaries.contains(id) || newOutputs.containsId(id)) { | |
690 _consumePrimary(id); | |
691 } else { | |
692 _passThrough(id); | |
693 } | |
694 } | |
695 | |
696 // Store any new outputs or new contents for existing outputs. | |
697 for (var asset in newOutputs) { | |
698 var controller = _outputControllers[asset.id]; | |
699 if (controller != null) { | |
700 controller.setAvailable(asset); | |
701 } else { | |
702 var controller = new AssetNodeController.available(asset, this); | |
703 _outputControllers[asset.id] = controller; | |
704 _streams.onAssetController.add(controller.node); | |
705 } | |
706 } | |
707 } | |
708 | |
709 /// Cancels all subscriptions to secondary input nodes. | |
710 void _clearSecondarySubscriptions() { | |
711 _missingInputs.clear(); | |
712 for (var subscription in _secondarySubscriptions.values) { | |
713 subscription.cancel(); | |
714 } | |
715 _secondarySubscriptions.clear(); | |
716 } | |
717 | |
718 /// Removes all output assets. | |
719 void _clearOutputs() { | |
720 // Remove all the previously-emitted assets. | |
721 for (var controller in _outputControllers.values) { | |
722 controller.setRemoved(); | |
723 } | |
724 _outputControllers.clear(); | |
725 } | |
726 | |
727 /// Emit the pass-through node for the primary input [id] if it's not being | |
728 /// emitted already. | |
729 void _passThrough(AssetId id) { | |
730 assert(!_outputControllers.containsKey(id)); | |
731 | |
732 if (_consumedPrimaries.contains(id)) return; | |
733 var controller = _passThroughControllers[id]; | |
734 var primary = _primaries[id]; | |
735 if (controller == null) { | |
736 controller = new AssetNodeController.from(primary); | |
737 _passThroughControllers[id] = controller; | |
738 _streams.onAssetController.add(controller.node); | |
739 } else if (primary.state.isDirty) { | |
740 controller.setDirty(); | |
741 } else if (!controller.node.state.isAvailable) { | |
742 controller.setAvailable(primary.asset); | |
743 } | |
744 } | |
745 | |
746 /// Stops emitting the pass-through node for the primary input [id] if it's | |
747 /// being emitted. | |
748 void _consumePrimary(AssetId id) { | |
749 var controller = _passThroughControllers.remove(id); | |
750 if (controller == null) return; | |
751 controller.setRemoved(); | |
752 } | |
753 | |
754 /// If `declareOutputs` is running and all previous phases have declared their | |
755 /// outputs, mark [_declareController] as done. | |
756 void _maybeFinishDeclareController() { | |
757 if (_declareController == null) return; | |
758 if (classifier.isClassifying) return; | |
759 if (phase.previous.status == NodeStatus.RUNNING) return; | |
760 _declareController.done(); | |
761 } | |
762 | |
763 /// If `apply` is running, all previous phases have declared their outputs, | |
764 /// and all primary inputs are available and thus have been passed to the | |
765 /// transformer, mark [_applyController] as done. | |
766 void _maybeFinishApplyController() { | |
767 if (_applyController == null) return; | |
768 if (classifier.isClassifying) return; | |
769 if (_primaries.any((input) => !input.state.isAvailable)) return; | |
770 if (phase.previous.status == NodeStatus.RUNNING) return; | |
771 _applyController.done(); | |
772 } | |
773 | |
774 BarbackException _wrapException(error, StackTrace stackTrace) { | |
775 if (error is! AssetNotFoundException) { | |
776 return new TransformerException(info, error, stackTrace); | |
777 } else { | |
778 return new MissingInputException(info, error.id); | |
779 } | |
780 } | |
781 | |
782 String toString() => | |
783 "transform node in $_location for $transformer on ${info.primaryId} " | |
784 "($_state, $status, ${_forced ? '' : 'un'}forced)"; | |
785 } | |
786 | |
787 /// The enum of states that [TransformNode] can be in. | |
788 class _State { | |
789 /// The transform is running [DeclaringAggregateTransformer.declareOutputs]. | |
790 /// | |
791 /// If the set of primary inputs changes while in this state, it will | |
792 /// transition to [NEEDS_DECLARE]. If the [TransformNode] is still in this | |
793 /// state when `declareOutputs` finishes running, it will transition to | |
794 /// [APPLYING] if the transform is non-lazy and all of its primary inputs are | |
795 /// available, and [DECLARED] otherwise. | |
796 /// | |
797 /// Non-declaring transformers will transition out of this state and into | |
798 /// [APPLYING] immediately. | |
799 static const DECLARING = const _State._("declaring outputs"); | |
800 | |
801 /// The transform is running [AggregateTransformer.declareOutputs] or | |
802 /// [AggregateTransform.apply], but a primary input was added or removed after | |
803 /// it started, so it will need to re-run `declareOutputs`. | |
804 /// | |
805 /// The [TransformNode] will transition to [DECLARING] once `declareOutputs` | |
806 /// or `apply` finishes running. | |
807 static const NEEDS_DECLARE = const _State._("needs declare"); | |
808 | |
809 /// The transform is deferred and has run | |
810 /// [DeclaringAggregateTransformer.declareOutputs] but hasn't yet been forced. | |
811 /// | |
812 /// The [TransformNode] will transition to [APPLYING] when one of the outputs | |
813 /// has been forced or if the transformer is non-lazy and all of its primary | |
814 /// inputs become available. | |
815 static const DECLARED = const _State._("declared"); | |
816 | |
817 /// The transform is running [AggregateTransformer.apply]. | |
818 /// | |
819 /// If an input's contents change or a secondary input is added or removed | |
820 /// while in this state, the [TransformNode] will transition to [NEEDS_APPLY]. | |
821 /// If a primary input is added or removed, it will transition to | |
822 /// [NEEDS_DECLARE]. If it's still in this state when `apply` finishes | |
823 /// running, it will transition to [APPLIED]. | |
824 static const APPLYING = const _State._("applying"); | |
825 | |
826 /// The transform is running [AggregateTransformer.apply], but an input's | |
827 /// contents changed or a secondary input was added or removed after it | |
828 /// started, so it will need to re-run `apply`. | |
829 /// | |
830 /// If a primary input is added or removed while in this state, the | |
831 /// [TranformNode] will transition to [NEEDS_DECLARE]. If it's still in this | |
832 /// state when `apply` finishes running, it will transition to [APPLYING]. | |
833 static const NEEDS_APPLY = const _State._("needs apply"); | |
834 | |
835 /// The transform has finished running [AggregateTransformer.apply], whether | |
836 /// or not it emitted an error. | |
837 /// | |
838 /// If an input's contents change or a secondary input is added or removed, | |
839 /// the [TransformNode] will transition to [DECLARED] if the transform is | |
840 /// declaring and [APPLYING] otherwise. If a primary input is added or | |
841 /// removed, this will transition to [DECLARING]. | |
842 static const APPLIED = const _State._("applied"); | |
843 | |
844 final String name; | |
845 | |
846 const _State._(this.name); | |
847 | |
848 String toString() => name; | |
849 } | |
OLD | NEW |