OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library barback.graph.transform_node; | |
6 | |
7 import 'dart:async'; | |
8 | |
9 import '../asset/asset.dart'; | |
10 import '../asset/asset_id.dart'; | |
11 import '../asset/asset_node.dart'; | |
12 import '../asset/asset_node_set.dart'; | |
13 import '../errors.dart'; | |
14 import '../log.dart'; | |
15 import '../transformer/aggregate_transform.dart'; | |
16 import '../transformer/aggregate_transformer.dart'; | |
17 import '../transformer/declaring_aggregate_transform.dart'; | |
18 import '../transformer/declaring_aggregate_transformer.dart'; | |
19 import '../transformer/lazy_aggregate_transformer.dart'; | |
20 import '../utils.dart'; | |
21 import 'node_status.dart'; | |
22 import 'node_streams.dart'; | |
23 import 'phase.dart'; | |
24 import 'transformer_classifier.dart'; | |
25 | |
26 /// Every `_applyLogDuration`, we will issue a fine log entry letting the user | |
27 /// know that the transform is still executing. | |
28 const _applyLogDuration = const Duration(seconds: 10); | |
29 | |
30 /// Describes a transform on a set of assets and its relationship to the build | |
31 /// dependency graph. | |
32 /// | |
33 /// Keeps track of whether it's dirty and needs to be run and which assets it | |
34 /// depends on. | |
35 class TransformNode { | |
36 /// The aggregate key for this node. | |
37 final String key; | |
38 | |
39 /// The [TransformerClassifier] that [this] belongs to. | |
40 final TransformerClassifier classifier; | |
41 | |
42 /// The [Phase] that this transform runs in. | |
43 Phase get phase => classifier.phase; | |
44 | |
45 /// The [AggregateTransformer] to apply to this node's inputs. | |
46 final AggregateTransformer transformer; | |
47 | |
48 /// The primary asset nodes this transform runs on. | |
49 final _primaries = new AssetNodeSet(); | |
50 | |
51 /// A string describing the location of [this] in the transformer graph. | |
52 final String _location; | |
53 | |
54 /// The subscription to the [_primaries]' [AssetNode.onStateChange] streams. | |
55 final _primarySubscriptions = new Map<AssetId, StreamSubscription>(); | |
56 | |
57 /// The subscription to [phase]'s [Phase.onAsset] stream. | |
58 StreamSubscription<AssetNode> _phaseAssetSubscription; | |
59 | |
60 /// The subscription to [phase]'s [Phase.onStatusChange] stream. | |
61 StreamSubscription<NodeStatus> _phaseStatusSubscription; | |
62 | |
63 /// How far along [this] is in processing its assets. | |
64 NodeStatus get status { | |
65 if (_state == _State.APPLIED || _state == _State.DECLARED) { | |
66 return NodeStatus.IDLE; | |
67 } | |
68 | |
69 if (_declaring && _state != _State.DECLARING && | |
70 _state != _State.NEEDS_DECLARE) { | |
71 return NodeStatus.MATERIALIZING; | |
72 } else { | |
73 return NodeStatus.RUNNING; | |
74 } | |
75 } | |
76 | |
77 /// The [TransformInfo] describing this node. | |
78 /// | |
79 /// [TransformInfo] is the publicly-visible representation of a transform | |
80 /// node. | |
81 TransformInfo get info => new TransformInfo(transformer, | |
82 new AssetId(phase.cascade.package, key)); | |
83 | |
84 /// Whether this is a declaring transform. | |
85 /// | |
86 /// This is usually identical to `transformer is | |
87 /// DeclaringAggregateTransformer`, but if a declaring and non-lazy | |
88 /// transformer emits an error during `declareOutputs` it's treated as though | |
89 /// it wasn't declaring. | |
90 bool get _declaring => transformer is DeclaringAggregateTransformer && | |
91 (_state == _State.DECLARING || _declaredOutputs != null); | |
92 | |
93 /// Whether this transform has been forced since it last finished applying. | |
94 /// | |
95 /// A transform being forced means it should run until it generates outputs | |
96 /// and is no longer dirty. This is always true for non-declaring | |
97 /// transformers, since they always need to eagerly generate outputs. | |
98 bool _forced; | |
99 | |
100 /// The subscriptions to each secondary input's [AssetNode.onStateChange] | |
101 /// stream. | |
102 final _secondarySubscriptions = new Map<AssetId, StreamSubscription>(); | |
103 | |
104 /// The subscriptions to the [AssetCascade.onAsset] stream for cascades that | |
105 /// might generate assets in [_missingInputs]. | |
106 final _missingExternalInputSubscriptions = | |
107 new Map<String, StreamSubscription>(); | |
108 | |
109 /// The controllers for the asset nodes emitted by this node. | |
110 final _outputControllers = new Map<AssetId, AssetNodeController>(); | |
111 | |
112 /// The ids of inputs the transformer tried and failed to read last time it | |
113 /// ran. | |
114 final _missingInputs = new Set<AssetId>(); | |
115 | |
116 /// The controllers that are used to pass each primary input through [this] if | |
117 /// it's not consumed or overwritten. | |
118 /// | |
119 /// This needs an intervening controller to ensure that the output can be | |
120 /// marked dirty when determining whether [this] will consume or overwrite it, | |
121 /// and be marked removed if it does. No pass-through controller will exist | |
122 /// for primary inputs that are not being passed through. | |
123 final _passThroughControllers = new Map<AssetId, AssetNodeController>(); | |
124 | |
125 /// The asset node for this transform. | |
126 final _streams = new NodeStreams(); | |
127 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange; | |
128 Stream<AssetNode> get onAsset => _streams.onAsset; | |
129 Stream<LogEntry> get onLog => _streams.onLog; | |
130 | |
131 /// The current state of [this]. | |
132 var _state = _State.DECLARED; | |
133 | |
134 /// Whether [this] has been marked as removed. | |
135 bool get _isRemoved => _streams.onAssetController.isClosed; | |
136 | |
137 // If [transformer] is declaring but not lazy and [primary] is available, we | |
138 // can run [apply] even if [force] hasn't been called, since [transformer] | |
139 // should run eagerly if possible. | |
140 bool get _canRunDeclaringEagerly => | |
141 _declaring && transformer is! LazyAggregateTransformer && | |
142 _primaries.every((input) => input.state.isAvailable); | |
143 | |
144 /// Which primary inputs the most recent run of this transform has declared | |
145 /// that it consumes. | |
146 /// | |
147 /// This starts out `null`, indicating that the transform hasn't declared | |
148 /// anything yet. This is not meaningful unless [_state] is [_State.APPLIED] | |
149 /// or [_State.DECLARED]. | |
150 Set<AssetId> _consumedPrimaries; | |
151 | |
152 /// The set of output ids that [transformer] declared it would emit. | |
153 /// | |
154 /// This is only non-null if [transformer] is a | |
155 /// [DeclaringAggregateTransformer] and its [declareOutputs] has been run | |
156 /// successfully. | |
157 Set<AssetId> _declaredOutputs; | |
158 | |
159 /// The controller for the currently-running | |
160 /// [DeclaringAggregateTransformer.declareOutputs] call's | |
161 /// [DeclaringAggregateTransform]. | |
162 /// | |
163 /// This will be non-`null` when | |
164 /// [DeclaringAggregateTransformer.declareOutputs] is running. This means that | |
165 /// it's always non-`null` when [_state] is [_State.DECLARING], sometimes | |
166 /// non-`null` when it's [_State.NEEDS_DECLARE], and always `null` otherwise. | |
167 DeclaringAggregateTransformController _declareController; | |
168 | |
169 /// The controller for the currently-running [AggregateTransformer.apply] | |
170 /// call's [AggregateTransform]. | |
171 /// | |
172 /// This will be non-`null` when [AggregateTransform.apply] is running, which | |
173 /// means that it's always non-`null` when [_state] is [_State.APPLYING] or | |
174 /// [_State.NEEDS_APPLY], sometimes non-`null` when it's | |
175 /// [_State.NEEDS_DECLARE], and always `null` otherwise. | |
176 AggregateTransformController _applyController; | |
177 | |
178 /// Map to track pending requests for secondary inputs. | |
179 /// | |
180 /// Keys are the secondary inputs that have been requested but not yet | |
181 /// produced. Values are the number of requests for that input. | |
182 final _pendingSecondaryInputs = <AssetId, int>{}; | |
183 | |
184 /// A stopwatch that tracks the total time spent in a transformer's `apply` | |
185 /// function. | |
186 final _timeInTransformer = new Stopwatch(); | |
187 | |
188 /// A stopwatch that tracks the time in a transformer's `apply` function spent | |
189 /// waiting for [getInput] calls to complete. | |
190 final _timeAwaitingInputs = new Stopwatch(); | |
191 | |
192 TransformNode(this.classifier, this.transformer, this.key, this._location) { | |
193 _forced = transformer is! DeclaringAggregateTransformer; | |
194 | |
195 _phaseAssetSubscription = phase.previous.onAsset.listen((node) { | |
196 if (!_missingInputs.contains(node.id)) return; | |
197 if (_forced) node.force(); | |
198 _dirty(); | |
199 }); | |
200 | |
201 _phaseStatusSubscription = phase.previous.onStatusChange.listen((status) { | |
202 if (status == NodeStatus.RUNNING) return; | |
203 | |
204 _maybeFinishDeclareController(); | |
205 _maybeFinishApplyController(); | |
206 }); | |
207 | |
208 classifier.onDoneClassifying.listen((_) { | |
209 _maybeFinishDeclareController(); | |
210 _maybeFinishApplyController(); | |
211 }); | |
212 | |
213 _run(); | |
214 } | |
215 | |
216 /// Adds [input] as a primary input for this node. | |
217 void addPrimary(AssetNode input) { | |
218 _primaries.add(input); | |
219 if (_forced) input.force(); | |
220 | |
221 _primarySubscriptions[input.id] = input.onStateChange | |
222 .listen((_) => _onPrimaryStateChange(input)); | |
223 | |
224 if (_state == _State.DECLARING && !_declareController.isDone) { | |
225 // If we're running `declareOutputs` and its id stream isn't closed yet, | |
226 // pass this in as another id. | |
227 _declareController.addId(input.id); | |
228 _maybeFinishDeclareController(); | |
229 } else if (_state == _State.APPLYING) { | |
230 // If we're running `apply`, we need to wait until [input] is available | |
231 // before we pass it into the stream. If it's available now, great; if | |
232 // not, [_onPrimaryStateChange] will handle it. | |
233 if (!input.state.isAvailable) { | |
234 // If we started running eagerly without being forced, abort that run if | |
235 // a new unavailable asset comes in. | |
236 if (input.isLazy && !_forced) _restartRun(); | |
237 return; | |
238 } | |
239 | |
240 _onPrimaryStateChange(input); | |
241 _maybeFinishApplyController(); | |
242 } else { | |
243 // Otherwise, a new input means we'll need to re-run `declareOutputs`. | |
244 _restartRun(); | |
245 } | |
246 } | |
247 | |
248 /// Marks this transform as removed. | |
249 /// | |
250 /// This causes all of the transform's outputs to be marked as removed as | |
251 /// well. Normally this will be automatically done internally based on events | |
252 /// from the primary input, but it's possible for a transform to no longer be | |
253 /// valid even if its primary input still exists. | |
254 void remove() { | |
255 _streams.close(); | |
256 _phaseAssetSubscription.cancel(); | |
257 _phaseStatusSubscription.cancel(); | |
258 if (_declareController != null) _declareController.cancel(); | |
259 if (_applyController != null) _applyController.cancel(); | |
260 _clearSecondarySubscriptions(); | |
261 _clearOutputs(); | |
262 | |
263 for (var subscription in _primarySubscriptions.values) { | |
264 subscription.cancel(); | |
265 } | |
266 _primarySubscriptions.clear(); | |
267 | |
268 for (var controller in _passThroughControllers.values) { | |
269 controller.setRemoved(); | |
270 } | |
271 _passThroughControllers.clear(); | |
272 } | |
273 | |
274 /// If [this] is deferred, ensures that its concrete outputs will be | |
275 /// generated. | |
276 void force() { | |
277 if (_forced || _state == _State.APPLIED) return; | |
278 for (var input in _primaries) { | |
279 input.force(); | |
280 } | |
281 | |
282 _forced = true; | |
283 if (_state == _State.DECLARED) _apply(); | |
284 } | |
285 | |
286 /// Marks this transform as dirty. | |
287 /// | |
288 /// Specifically, this should be called when one of the transform's inputs' | |
289 /// contents change, or when a secondary input is removed. Primary inputs | |
290 /// being added or removed are handled by [addInput] and | |
291 /// [_onPrimaryStateChange]. | |
292 void _dirty() { | |
293 if (_state == _State.DECLARING || _state == _State.NEEDS_DECLARE || | |
294 _state == _State.NEEDS_APPLY) { | |
295 // If we already know that [_apply] needs to be run, there's nothing to do | |
296 // here. | |
297 return; | |
298 } | |
299 | |
300 if (!_forced && !_canRunDeclaringEagerly) { | |
301 // [forced] should only ever be false for a declaring transformer. | |
302 assert(_declaring); | |
303 | |
304 // If we've finished applying, transition to DECLARED, indicating that we | |
305 // know what outputs [apply] will emit but we're waiting to emit them | |
306 // concretely until [force] is called. If we're still applying, we'll | |
307 // transition to DECLARED once we finish. | |
308 if (_state == _State.APPLIED) _state = _State.DECLARED; | |
309 for (var controller in _outputControllers.values) { | |
310 controller.setLazy(force); | |
311 } | |
312 _emitDeclaredOutputs(); | |
313 return; | |
314 } | |
315 | |
316 if (_state == _State.APPLIED) { | |
317 if (_declaredOutputs != null) _emitDeclaredOutputs(); | |
318 _apply(); | |
319 } else if (_state == _State.DECLARED) { | |
320 _apply(); | |
321 } else { | |
322 _state = _State.NEEDS_APPLY; | |
323 } | |
324 } | |
325 | |
326 /// The callback called when [input]'s state changes. | |
327 void _onPrimaryStateChange(AssetNode input) { | |
328 if (input.state.isRemoved) { | |
329 _primarySubscriptions.remove(input.id); | |
330 | |
331 if (_primaries.isEmpty) { | |
332 // If there are no more primary inputs, there's no more use for this | |
333 // node in the graph. It will be re-created by its | |
334 // [TransformerClassifier] if a new input with [key] is added. | |
335 remove(); | |
336 return; | |
337 } | |
338 | |
339 // Any change to the number of primary inputs requires that we re-run the | |
340 // transformation. | |
341 _restartRun(); | |
342 } else if (input.state.isAvailable) { | |
343 if (_state == _State.DECLARED) { | |
344 // If we're passing through this input and its contents don't matter, | |
345 // update the pass-through controller. | |
346 var controller = _passThroughControllers[input.id]; | |
347 if (controller != null) controller.setAvailable(input.asset); | |
348 } | |
349 | |
350 if (_state == _State.DECLARED && _canRunDeclaringEagerly) { | |
351 // If [this] is fully declared but hasn't started applying, this input | |
352 // becoming available may mean that all inputs are available, in which | |
353 // case we can run apply eagerly. | |
354 _apply(); | |
355 return; | |
356 } | |
357 | |
358 // If we're not actively passing concrete assets to the transformer, the | |
359 // distinction between a dirty asset and an available one isn't relevant. | |
360 if (_state != _State.APPLYING) return; | |
361 | |
362 if (_applyController.isDone) { | |
363 // If we get a new asset after we've closed the asset stream, we need to | |
364 // re-run declare and then apply. | |
365 _restartRun(); | |
366 } else { | |
367 // If the new asset comes before the asset stream is done, we can just | |
368 // pass it to the stream. | |
369 _applyController.addInput(input.asset); | |
370 _maybeFinishApplyController(); | |
371 } | |
372 } else { | |
373 if (_forced) input.force(); | |
374 | |
375 var controller = _passThroughControllers[input.id]; | |
376 if (controller != null) controller.setDirty(); | |
377 | |
378 if (_state == _State.APPLYING && !_applyController.addedId(input.id) && | |
379 (_forced || !input.isLazy)) { | |
380 // If the input hasn't yet been added to the transform's input stream, | |
381 // there's no need to consider the transformation dirty. However, if the | |
382 // input is lazy and we're running eagerly, we need to restart the | |
383 // transformation. | |
384 return; | |
385 } | |
386 _dirty(); | |
387 } | |
388 } | |
389 | |
390 /// Run the entire transformation, including both `declareOutputs` (if | |
391 /// applicable) and `apply`. | |
392 void _run() { | |
393 assert(_state != _State.DECLARING); | |
394 assert(_state != _State.APPLYING); | |
395 | |
396 _markOutputsDirty(); | |
397 _declareOutputs(() { | |
398 if (_forced || _canRunDeclaringEagerly) { | |
399 _apply(); | |
400 } else { | |
401 _state = _State.DECLARED; | |
402 _streams.changeStatus(NodeStatus.IDLE); | |
403 } | |
404 }); | |
405 } | |
406 | |
407 /// Restart the entire transformation, including `declareOutputs` if | |
408 /// applicable. | |
409 void _restartRun() { | |
410 if (_state == _State.DECLARED || _state == _State.APPLIED) { | |
411 // If we're currently idle, we can restart the transformation immediately. | |
412 _run(); | |
413 return; | |
414 } | |
415 | |
416 // If we're actively running `declareOutputs` or `apply`, cancel the | |
417 // transforms and transition to `NEEDS_DECLARE`. Once the transformer's | |
418 // method returns, we'll transition to `DECLARING`. | |
419 if (_declareController != null) _declareController.cancel(); | |
420 if (_applyController != null) _applyController.cancel(); | |
421 _state = _State.NEEDS_DECLARE; | |
422 } | |
423 | |
424 /// Runs [transform.declareOutputs] and emits the resulting assets as dirty | |
425 /// assets. | |
426 /// | |
427 /// Calls [callback] when it's finished. This doesn't return a future so that | |
428 /// [callback] is called synchronously if there are no outputs to declare. If | |
429 /// [this] is removed while inputs are being declared, [callback] will not be | |
430 /// called. | |
431 void _declareOutputs(void callback()) { | |
432 if (transformer is! DeclaringAggregateTransformer) { | |
433 callback(); | |
434 return; | |
435 } | |
436 | |
437 _state = _State.DECLARING; | |
438 var controller = new DeclaringAggregateTransformController(this); | |
439 _declareController = controller; | |
440 _streams.onLogPool.add(controller.onLog); | |
441 for (var primary in _primaries) { | |
442 controller.addId(primary.id); | |
443 } | |
444 _maybeFinishDeclareController(); | |
445 | |
446 syncFuture(() { | |
447 return (transformer as DeclaringAggregateTransformer) | |
448 .declareOutputs(controller.transform); | |
449 }).whenComplete(() { | |
450 // Cancel the controller here even if `declareOutputs` wasn't interrupted. | |
451 // Since the declaration is finished, we want to close out the | |
452 // controller's streams. | |
453 controller.cancel(); | |
454 _declareController = null; | |
455 }).then((_) { | |
456 if (_isRemoved) return; | |
457 if (_state == _State.NEEDS_DECLARE) { | |
458 _declareOutputs(callback); | |
459 return; | |
460 } | |
461 | |
462 if (controller.loggedError) { | |
463 // If `declareOutputs` fails, fall back to treating a declaring | |
464 // transformer as though it were eager. | |
465 if (transformer is! LazyAggregateTransformer) _forced = true; | |
466 callback(); | |
467 return; | |
468 } | |
469 | |
470 _consumedPrimaries = controller.consumedPrimaries; | |
471 _declaredOutputs = controller.outputIds; | |
472 var invalidIds = _declaredOutputs | |
473 .where((id) => id.package != phase.cascade.package).toSet(); | |
474 for (var id in invalidIds) { | |
475 _declaredOutputs.remove(id); | |
476 // TODO(nweiz): report this as a warning rather than a failing error. | |
477 phase.cascade.reportError(new InvalidOutputException(info, id)); | |
478 } | |
479 | |
480 for (var primary in _primaries) { | |
481 if (_declaredOutputs.contains(primary.id)) continue; | |
482 _passThrough(primary.id); | |
483 } | |
484 _emitDeclaredOutputs(); | |
485 callback(); | |
486 }).catchError((error, stackTrace) { | |
487 if (_isRemoved) return; | |
488 if (transformer is! LazyAggregateTransformer) _forced = true; | |
489 phase.cascade.reportError(_wrapException(error, stackTrace)); | |
490 callback(); | |
491 }); | |
492 } | |
493 | |
494 /// Emits a dirty asset node for all outputs that were declared by the | |
495 /// transformer. | |
496 /// | |
497 /// This won't emit any outputs for which there already exist output | |
498 /// controllers. It should only be called for transforms that have declared | |
499 /// their outputs. | |
500 void _emitDeclaredOutputs() { | |
501 assert(_declaredOutputs != null); | |
502 for (var id in _declaredOutputs) { | |
503 if (_outputControllers.containsKey(id)) continue; | |
504 var controller = _forced | |
505 ? new AssetNodeController(id, this) | |
506 : new AssetNodeController.lazy(id, force, this); | |
507 _outputControllers[id] = controller; | |
508 _streams.onAssetController.add(controller.node); | |
509 } | |
510 } | |
511 | |
512 //// Mark all emitted and passed-through outputs of this transform as dirty. | |
513 void _markOutputsDirty() { | |
514 for (var controller in _passThroughControllers.values) { | |
515 controller.setDirty(); | |
516 } | |
517 for (var controller in _outputControllers.values) { | |
518 if (_forced) { | |
519 controller.setDirty(); | |
520 } else { | |
521 controller.setLazy(force); | |
522 } | |
523 } | |
524 } | |
525 | |
526 /// Applies this transform. | |
527 void _apply() { | |
528 assert(!_isRemoved); | |
529 | |
530 _markOutputsDirty(); | |
531 _clearSecondarySubscriptions(); | |
532 _state = _State.APPLYING; | |
533 _streams.changeStatus(status); | |
534 _runApply().then((hadError) { | |
535 if (_isRemoved) return; | |
536 | |
537 if (_state == _State.DECLARED) return; | |
538 | |
539 if (_state == _State.NEEDS_DECLARE) { | |
540 _run(); | |
541 return; | |
542 } | |
543 | |
544 // If an input's contents changed while running `apply`, retry unless the | |
545 // transformer is deferred and hasn't been forced. | |
546 if (_state == _State.NEEDS_APPLY) { | |
547 if (_forced || _canRunDeclaringEagerly) { | |
548 _apply(); | |
549 } else { | |
550 _state = _State.DECLARED; | |
551 } | |
552 return; | |
553 } | |
554 | |
555 if (_declaring) _forced = false; | |
556 | |
557 assert(_state == _State.APPLYING); | |
558 if (hadError) { | |
559 _clearOutputs(); | |
560 // If the transformer threw an error, we don't want to emit the | |
561 // pass-through assets in case they'll be overwritten by the | |
562 // transformer. However, if the transformer declared that it wouldn't | |
563 // overwrite or consume a pass-through asset, we can safely emit it. | |
564 if (_declaredOutputs != null) { | |
565 for (var input in _primaries) { | |
566 if (_consumedPrimaries.contains(input.id) || | |
567 _declaredOutputs.contains(input.id)) { | |
568 _consumePrimary(input.id); | |
569 } else { | |
570 _passThrough(input.id); | |
571 } | |
572 } | |
573 } | |
574 } | |
575 | |
576 _state = _State.APPLIED; | |
577 _streams.changeStatus(NodeStatus.IDLE); | |
578 }); | |
579 } | |
580 | |
581 /// Gets the asset for an input [id]. | |
582 /// | |
583 /// If an input with [id] cannot be found, throws an [AssetNotFoundException]. | |
584 Future<Asset> getInput(AssetId id) { | |
585 _timeAwaitingInputs.start(); | |
586 _pendingSecondaryInputs[id] = _pendingSecondaryInputs.containsKey(id) | |
587 ? _pendingSecondaryInputs[id] + 1 | |
588 : 1; | |
589 return phase.previous.getOutput(id).then((node) { | |
590 // Throw if the input isn't found. This ensures the transformer's apply | |
591 // is exited. We'll then catch this and report it through the proper | |
592 // results stream. | |
593 if (node == null) { | |
594 _missingInputs.add(id); | |
595 | |
596 // If this id is for an asset in another package, subscribe to that | |
597 // package's asset cascade so when it starts emitting the id we know to | |
598 // re-run the transformer. | |
599 if (id.package != phase.cascade.package) { | |
600 var stream = phase.cascade.graph.onAssetFor(id.package); | |
601 if (stream != null) { | |
602 _missingExternalInputSubscriptions.putIfAbsent(id.package, () { | |
603 return stream.listen((node) { | |
604 if (!_missingInputs.contains(node.id)) return; | |
605 if (_forced) node.force(); | |
606 _dirty(); | |
607 }); | |
608 }); | |
609 } | |
610 } | |
611 | |
612 throw new AssetNotFoundException(id); | |
613 } | |
614 | |
615 _secondarySubscriptions.putIfAbsent(node.id, () { | |
616 return node.onStateChange.listen((_) => _dirty()); | |
617 }); | |
618 | |
619 return node.asset; | |
620 }).whenComplete(() { | |
621 assert(_pendingSecondaryInputs.containsKey(id)); | |
622 if (_pendingSecondaryInputs[id] == 1) { | |
623 _pendingSecondaryInputs.remove(id); | |
624 } else { | |
625 _pendingSecondaryInputs[id]--; | |
626 } | |
627 if (_pendingSecondaryInputs.isEmpty) _timeAwaitingInputs.stop(); | |
628 }); | |
629 } | |
630 | |
631 /// Run [AggregateTransformer.apply]. | |
632 /// | |
633 /// Returns whether or not an error occurred while running the transformer. | |
634 Future<bool> _runApply() { | |
635 var controller = new AggregateTransformController(this); | |
636 _applyController = controller; | |
637 _streams.onLogPool.add(controller.onLog); | |
638 for (var primary in _primaries) { | |
639 if (!primary.state.isAvailable) continue; | |
640 controller.addInput(primary.asset); | |
641 } | |
642 _maybeFinishApplyController(); | |
643 | |
644 var transformCounterTimer; | |
645 | |
646 return syncFuture(() { | |
647 _timeInTransformer.reset(); | |
648 _timeAwaitingInputs.reset(); | |
649 _timeInTransformer.start(); | |
650 | |
651 transformCounterTimer = new Timer.periodic(_applyLogDuration, (_) { | |
652 if (_streams.onLogController.isClosed || | |
653 !_timeInTransformer.isRunning) { | |
654 return; | |
655 } | |
656 | |
657 var message = new StringBuffer("Not yet complete after " | |
658 "${niceDuration(_timeInTransformer.elapsed)}"); | |
659 if (_pendingSecondaryInputs.isNotEmpty) { | |
660 message.write(", waiting on input(s) " | |
661 "${_pendingSecondaryInputs.keys.join(", ")}"); | |
662 } | |
663 _streams.onLogController.add(new LogEntry( | |
664 info, | |
665 info.primaryId, | |
666 LogLevel.FINE, | |
667 message.toString(), | |
668 null)); | |
669 }); | |
670 | |
671 return transformer.apply(controller.transform); | |
672 }).whenComplete(() { | |
673 transformCounterTimer.cancel(); | |
674 _timeInTransformer.stop(); | |
675 _timeAwaitingInputs.stop(); | |
676 | |
677 // Cancel the controller here even if `apply` wasn't interrupted. Since | |
678 // the apply is finished, we want to close out the controller's streams. | |
679 controller.cancel(); | |
680 _applyController = null; | |
681 }).then((_) { | |
682 assert(_state != _State.DECLARED); | |
683 assert(_state != _State.DECLARING); | |
684 assert(_state != _State.APPLIED); | |
685 | |
686 if (!_forced && _primaries.any((node) => !node.state.isAvailable)) { | |
687 _state = _State.DECLARED; | |
688 _streams.changeStatus(NodeStatus.IDLE); | |
689 return false; | |
690 } | |
691 | |
692 if (_isRemoved) return false; | |
693 if (_state == _State.NEEDS_APPLY) return false; | |
694 if (_state == _State.NEEDS_DECLARE) return false; | |
695 if (controller.loggedError) return true; | |
696 | |
697 // If the transformer took long enough, log its duration in fine output. | |
698 // That way it's not always visible, but users running with "pub serve | |
699 // --verbose" can see it. | |
700 var ranLong = _timeInTransformer.elapsed > new Duration(seconds: 1); | |
701 var ranLongLocally = | |
702 _timeInTransformer.elapsed - _timeAwaitingInputs.elapsed > | |
703 new Duration(milliseconds: 200); | |
704 | |
705 // Report the transformer's timing information if it spent more than 0.2s | |
706 // doing things other than waiting for its secondary inputs or if it spent | |
707 // more than 1s in total. | |
708 if (ranLongLocally || ranLong) { | |
709 _streams.onLogController.add(new LogEntry( | |
710 info, info.primaryId, LogLevel.FINE, | |
711 "Took ${niceDuration(_timeInTransformer.elapsed)} " | |
712 "(${niceDuration(_timeAwaitingInputs.elapsed)} awaiting " | |
713 "secondary inputs).", | |
714 null)); | |
715 } | |
716 | |
717 _handleApplyResults(controller); | |
718 return false; | |
719 }).catchError((error, stackTrace) { | |
720 // If the transform became dirty while processing, ignore any errors from | |
721 // it. | |
722 if (_state == _State.NEEDS_APPLY || _isRemoved) return false; | |
723 | |
724 // Catch all transformer errors and pipe them to the results stream. This | |
725 // is so a broken transformer doesn't take down the whole graph. | |
726 phase.cascade.reportError(_wrapException(error, stackTrace)); | |
727 return true; | |
728 }); | |
729 } | |
730 | |
731 /// Handle the results of running [Transformer.apply]. | |
732 /// | |
733 /// [controller] should be the controller for the [AggegateTransform] passed | |
734 /// to [AggregateTransformer.apply]. | |
735 void _handleApplyResults(AggregateTransformController controller) { | |
736 _consumedPrimaries = controller.consumedPrimaries; | |
737 | |
738 var newOutputs = controller.outputs; | |
739 // Any ids that are for a different package are invalid. | |
740 var invalidIds = newOutputs | |
741 .map((asset) => asset.id) | |
742 .where((id) => id.package != phase.cascade.package) | |
743 .toSet(); | |
744 for (var id in invalidIds) { | |
745 newOutputs.removeId(id); | |
746 // TODO(nweiz): report this as a warning rather than a failing error. | |
747 phase.cascade.reportError(new InvalidOutputException(info, id)); | |
748 } | |
749 | |
750 // Remove outputs that used to exist but don't anymore. | |
751 for (var id in _outputControllers.keys.toList()) { | |
752 if (newOutputs.containsId(id)) continue; | |
753 _outputControllers.remove(id).setRemoved(); | |
754 } | |
755 | |
756 // Emit or stop emitting pass-through assets between removing and adding | |
757 // outputs to ensure there are no collisions. | |
758 for (var id in _primaries.map((node) => node.id)) { | |
759 if (_consumedPrimaries.contains(id) || newOutputs.containsId(id)) { | |
760 _consumePrimary(id); | |
761 } else { | |
762 _passThrough(id); | |
763 } | |
764 } | |
765 | |
766 // Store any new outputs or new contents for existing outputs. | |
767 for (var asset in newOutputs) { | |
768 var controller = _outputControllers[asset.id]; | |
769 if (controller != null) { | |
770 controller.setAvailable(asset); | |
771 } else { | |
772 var controller = new AssetNodeController.available(asset, this); | |
773 _outputControllers[asset.id] = controller; | |
774 _streams.onAssetController.add(controller.node); | |
775 } | |
776 } | |
777 } | |
778 | |
779 /// Cancels all subscriptions to secondary input nodes and other cascades. | |
780 void _clearSecondarySubscriptions() { | |
781 _missingInputs.clear(); | |
782 for (var subscription in _secondarySubscriptions.values) { | |
783 subscription.cancel(); | |
784 } | |
785 for (var subscription in _missingExternalInputSubscriptions.values) { | |
786 subscription.cancel(); | |
787 } | |
788 _secondarySubscriptions.clear(); | |
789 _missingExternalInputSubscriptions.clear(); | |
790 } | |
791 | |
792 /// Removes all output assets. | |
793 void _clearOutputs() { | |
794 // Remove all the previously-emitted assets. | |
795 for (var controller in _outputControllers.values) { | |
796 controller.setRemoved(); | |
797 } | |
798 _outputControllers.clear(); | |
799 } | |
800 | |
801 /// Emit the pass-through node for the primary input [id] if it's not being | |
802 /// emitted already. | |
803 void _passThrough(AssetId id) { | |
804 assert(!_outputControllers.containsKey(id)); | |
805 | |
806 if (_consumedPrimaries.contains(id)) return; | |
807 var controller = _passThroughControllers[id]; | |
808 var primary = _primaries[id]; | |
809 if (controller == null) { | |
810 controller = new AssetNodeController.from(primary); | |
811 _passThroughControllers[id] = controller; | |
812 _streams.onAssetController.add(controller.node); | |
813 } else if (primary.state.isDirty) { | |
814 controller.setDirty(); | |
815 } else if (!controller.node.state.isAvailable) { | |
816 controller.setAvailable(primary.asset); | |
817 } | |
818 } | |
819 | |
820 /// Stops emitting the pass-through node for the primary input [id] if it's | |
821 /// being emitted. | |
822 void _consumePrimary(AssetId id) { | |
823 var controller = _passThroughControllers.remove(id); | |
824 if (controller == null) return; | |
825 controller.setRemoved(); | |
826 } | |
827 | |
828 /// If `declareOutputs` is running and all previous phases have declared their | |
829 /// outputs, mark [_declareController] as done. | |
830 void _maybeFinishDeclareController() { | |
831 if (_declareController == null) return; | |
832 if (classifier.isClassifying) return; | |
833 if (phase.previous.status == NodeStatus.RUNNING) return; | |
834 _declareController.done(); | |
835 } | |
836 | |
837 /// If `apply` is running, all previous phases have declared their outputs, | |
838 /// and all primary inputs are available and thus have been passed to the | |
839 /// transformer, mark [_applyController] as done. | |
840 void _maybeFinishApplyController() { | |
841 if (_applyController == null) return; | |
842 if (classifier.isClassifying) return; | |
843 if (_primaries.any((input) => !input.state.isAvailable)) return; | |
844 if (phase.previous.status == NodeStatus.RUNNING) return; | |
845 _applyController.done(); | |
846 } | |
847 | |
848 BarbackException _wrapException(error, StackTrace stackTrace) { | |
849 if (error is! AssetNotFoundException) { | |
850 return new TransformerException(info, error, stackTrace); | |
851 } else { | |
852 return new MissingInputException(info, error.id); | |
853 } | |
854 } | |
855 | |
856 String toString() => | |
857 "transform node in $_location for $transformer on ${info.primaryId} " | |
858 "($_state, $status, ${_forced ? '' : 'un'}forced)"; | |
859 } | |
860 | |
861 /// The enum of states that [TransformNode] can be in. | |
862 class _State { | |
863 /// The transform is running [DeclaringAggregateTransformer.declareOutputs]. | |
864 /// | |
865 /// If the set of primary inputs changes while in this state, it will | |
866 /// transition to [NEEDS_DECLARE]. If the [TransformNode] is still in this | |
867 /// state when `declareOutputs` finishes running, it will transition to | |
868 /// [APPLYING] if the transform is non-lazy and all of its primary inputs are | |
869 /// available, and [DECLARED] otherwise. | |
870 /// | |
871 /// Non-declaring transformers will transition out of this state and into | |
872 /// [APPLYING] immediately. | |
873 static const DECLARING = const _State._("declaring outputs"); | |
874 | |
875 /// The transform is running [AggregateTransformer.declareOutputs] or | |
876 /// [AggregateTransform.apply], but a primary input was added or removed after | |
877 /// it started, so it will need to re-run `declareOutputs`. | |
878 /// | |
879 /// The [TransformNode] will transition to [DECLARING] once `declareOutputs` | |
880 /// or `apply` finishes running. | |
881 static const NEEDS_DECLARE = const _State._("needs declare"); | |
882 | |
883 /// The transform is deferred and has run | |
884 /// [DeclaringAggregateTransformer.declareOutputs] but hasn't yet been forced. | |
885 /// | |
886 /// The [TransformNode] will transition to [APPLYING] when one of the outputs | |
887 /// has been forced or if the transformer is non-lazy and all of its primary | |
888 /// inputs become available. | |
889 static const DECLARED = const _State._("declared"); | |
890 | |
891 /// The transform is running [AggregateTransformer.apply]. | |
892 /// | |
893 /// If an input's contents change or a secondary input is added or removed | |
894 /// while in this state, the [TransformNode] will transition to [NEEDS_APPLY]. | |
895 /// If a primary input is added or removed, it will transition to | |
896 /// [NEEDS_DECLARE]. If it's still in this state when `apply` finishes | |
897 /// running, it will transition to [APPLIED]. | |
898 static const APPLYING = const _State._("applying"); | |
899 | |
900 /// The transform is running [AggregateTransformer.apply], but an input's | |
901 /// contents changed or a secondary input was added or removed after it | |
902 /// started, so it will need to re-run `apply`. | |
903 /// | |
904 /// If a primary input is added or removed while in this state, the | |
905 /// [TranformNode] will transition to [NEEDS_DECLARE]. If it's still in this | |
906 /// state when `apply` finishes running, it will transition to [APPLYING]. | |
907 static const NEEDS_APPLY = const _State._("needs apply"); | |
908 | |
909 /// The transform has finished running [AggregateTransformer.apply], whether | |
910 /// or not it emitted an error. | |
911 /// | |
912 /// If an input's contents change or a secondary input is added or removed, | |
913 /// the [TransformNode] will transition to [DECLARED] if the transform is | |
914 /// declaring and [APPLYING] otherwise. If a primary input is added or | |
915 /// removed, this will transition to [DECLARING]. | |
916 static const APPLIED = const _State._("applied"); | |
917 | |
918 final String name; | |
919 | |
920 const _State._(this.name); | |
921 | |
922 String toString() => name; | |
923 } | |
OLD | NEW |