Chromium Code Reviews

Side by Side Diff: barback/lib/src/graph/transform_node.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff |
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library barback.graph.transform_node;
6
7 import 'dart:async';
8
9 import '../asset/asset.dart';
10 import '../asset/asset_id.dart';
11 import '../asset/asset_node.dart';
12 import '../asset/asset_node_set.dart';
13 import '../errors.dart';
14 import '../log.dart';
15 import '../transformer/aggregate_transform.dart';
16 import '../transformer/aggregate_transformer.dart';
17 import '../transformer/declaring_aggregate_transform.dart';
18 import '../transformer/declaring_aggregate_transformer.dart';
19 import '../transformer/lazy_aggregate_transformer.dart';
20 import '../utils.dart';
21 import 'node_status.dart';
22 import 'node_streams.dart';
23 import 'phase.dart';
24 import 'transformer_classifier.dart';
25
26 /// Every `_applyLogDuration`, we will issue a fine log entry letting the user
27 /// know that the transform is still executing.
28 const _applyLogDuration = const Duration(seconds: 10);
29
30 /// Describes a transform on a set of assets and its relationship to the build
31 /// dependency graph.
32 ///
33 /// Keeps track of whether it's dirty and needs to be run and which assets it
34 /// depends on.
35 class TransformNode {
36 /// The aggregate key for this node.
37 final String key;
38
39 /// The [TransformerClassifier] that [this] belongs to.
40 final TransformerClassifier classifier;
41
42 /// The [Phase] that this transform runs in.
43 Phase get phase => classifier.phase;
44
45 /// The [AggregateTransformer] to apply to this node's inputs.
46 final AggregateTransformer transformer;
47
48 /// The primary asset nodes this transform runs on.
49 final _primaries = new AssetNodeSet();
50
51 /// A string describing the location of [this] in the transformer graph.
52 final String _location;
53
54 /// The subscription to the [_primaries]' [AssetNode.onStateChange] streams.
55 final _primarySubscriptions = new Map<AssetId, StreamSubscription>();
56
57 /// The subscription to [phase]'s [Phase.onAsset] stream.
58 StreamSubscription<AssetNode> _phaseAssetSubscription;
59
60 /// The subscription to [phase]'s [Phase.onStatusChange] stream.
61 StreamSubscription<NodeStatus> _phaseStatusSubscription;
62
63 /// How far along [this] is in processing its assets.
64 NodeStatus get status {
65 if (_state == _State.APPLIED || _state == _State.DECLARED) {
66 return NodeStatus.IDLE;
67 }
68
69 if (_declaring && _state != _State.DECLARING &&
70 _state != _State.NEEDS_DECLARE) {
71 return NodeStatus.MATERIALIZING;
72 } else {
73 return NodeStatus.RUNNING;
74 }
75 }
76
77 /// The [TransformInfo] describing this node.
78 ///
79 /// [TransformInfo] is the publicly-visible representation of a transform
80 /// node.
81 TransformInfo get info => new TransformInfo(transformer,
82 new AssetId(phase.cascade.package, key));
83
84 /// Whether this is a declaring transform.
85 ///
86 /// This is usually identical to `transformer is
87 /// DeclaringAggregateTransformer`, but if a declaring and non-lazy
88 /// transformer emits an error during `declareOutputs` it's treated as though
89 /// it wasn't declaring.
90 bool get _declaring => transformer is DeclaringAggregateTransformer &&
91 (_state == _State.DECLARING || _declaredOutputs != null);
92
93 /// Whether this transform has been forced since it last finished applying.
94 ///
95 /// A transform being forced means it should run until it generates outputs
96 /// and is no longer dirty. This is always true for non-declaring
97 /// transformers, since they always need to eagerly generate outputs.
98 bool _forced;
99
100 /// The subscriptions to each secondary input's [AssetNode.onStateChange]
101 /// stream.
102 final _secondarySubscriptions = new Map<AssetId, StreamSubscription>();
103
104 /// The subscriptions to the [AssetCascade.onAsset] stream for cascades that
105 /// might generate assets in [_missingInputs].
106 final _missingExternalInputSubscriptions =
107 new Map<String, StreamSubscription>();
108
109 /// The controllers for the asset nodes emitted by this node.
110 final _outputControllers = new Map<AssetId, AssetNodeController>();
111
112 /// The ids of inputs the transformer tried and failed to read last time it
113 /// ran.
114 final _missingInputs = new Set<AssetId>();
115
116 /// The controllers that are used to pass each primary input through [this] if
117 /// it's not consumed or overwritten.
118 ///
119 /// This needs an intervening controller to ensure that the output can be
120 /// marked dirty when determining whether [this] will consume or overwrite it,
121 /// and be marked removed if it does. No pass-through controller will exist
122 /// for primary inputs that are not being passed through.
123 final _passThroughControllers = new Map<AssetId, AssetNodeController>();
124
125 /// The asset node for this transform.
126 final _streams = new NodeStreams();
127 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange;
128 Stream<AssetNode> get onAsset => _streams.onAsset;
129 Stream<LogEntry> get onLog => _streams.onLog;
130
131 /// The current state of [this].
132 var _state = _State.DECLARED;
133
134 /// Whether [this] has been marked as removed.
135 bool get _isRemoved => _streams.onAssetController.isClosed;
136
137 // If [transformer] is declaring but not lazy and [primary] is available, we
138 // can run [apply] even if [force] hasn't been called, since [transformer]
139 // should run eagerly if possible.
140 bool get _canRunDeclaringEagerly =>
141 _declaring && transformer is! LazyAggregateTransformer &&
142 _primaries.every((input) => input.state.isAvailable);
143
144 /// Which primary inputs the most recent run of this transform has declared
145 /// that it consumes.
146 ///
147 /// This starts out `null`, indicating that the transform hasn't declared
148 /// anything yet. This is not meaningful unless [_state] is [_State.APPLIED]
149 /// or [_State.DECLARED].
150 Set<AssetId> _consumedPrimaries;
151
152 /// The set of output ids that [transformer] declared it would emit.
153 ///
154 /// This is only non-null if [transformer] is a
155 /// [DeclaringAggregateTransformer] and its [declareOutputs] has been run
156 /// successfully.
157 Set<AssetId> _declaredOutputs;
158
159 /// The controller for the currently-running
160 /// [DeclaringAggregateTransformer.declareOutputs] call's
161 /// [DeclaringAggregateTransform].
162 ///
163 /// This will be non-`null` when
164 /// [DeclaringAggregateTransformer.declareOutputs] is running. This means that
165 /// it's always non-`null` when [_state] is [_State.DECLARING], sometimes
166 /// non-`null` when it's [_State.NEEDS_DECLARE], and always `null` otherwise.
167 DeclaringAggregateTransformController _declareController;
168
169 /// The controller for the currently-running [AggregateTransformer.apply]
170 /// call's [AggregateTransform].
171 ///
172 /// This will be non-`null` when [AggregateTransform.apply] is running, which
173 /// means that it's always non-`null` when [_state] is [_State.APPLYING] or
174 /// [_State.NEEDS_APPLY], sometimes non-`null` when it's
175 /// [_State.NEEDS_DECLARE], and always `null` otherwise.
176 AggregateTransformController _applyController;
177
178 /// Map to track pending requests for secondary inputs.
179 ///
180 /// Keys are the secondary inputs that have been requested but not yet
181 /// produced. Values are the number of requests for that input.
182 final _pendingSecondaryInputs = <AssetId, int>{};
183
184 /// A stopwatch that tracks the total time spent in a transformer's `apply`
185 /// function.
186 final _timeInTransformer = new Stopwatch();
187
188 /// A stopwatch that tracks the time in a transformer's `apply` function spent
189 /// waiting for [getInput] calls to complete.
190 final _timeAwaitingInputs = new Stopwatch();
191
192 TransformNode(this.classifier, this.transformer, this.key, this._location) {
193 _forced = transformer is! DeclaringAggregateTransformer;
194
195 _phaseAssetSubscription = phase.previous.onAsset.listen((node) {
196 if (!_missingInputs.contains(node.id)) return;
197 if (_forced) node.force();
198 _dirty();
199 });
200
201 _phaseStatusSubscription = phase.previous.onStatusChange.listen((status) {
202 if (status == NodeStatus.RUNNING) return;
203
204 _maybeFinishDeclareController();
205 _maybeFinishApplyController();
206 });
207
208 classifier.onDoneClassifying.listen((_) {
209 _maybeFinishDeclareController();
210 _maybeFinishApplyController();
211 });
212
213 _run();
214 }
215
216 /// Adds [input] as a primary input for this node.
217 void addPrimary(AssetNode input) {
218 _primaries.add(input);
219 if (_forced) input.force();
220
221 _primarySubscriptions[input.id] = input.onStateChange
222 .listen((_) => _onPrimaryStateChange(input));
223
224 if (_state == _State.DECLARING && !_declareController.isDone) {
225 // If we're running `declareOutputs` and its id stream isn't closed yet,
226 // pass this in as another id.
227 _declareController.addId(input.id);
228 _maybeFinishDeclareController();
229 } else if (_state == _State.APPLYING) {
230 // If we're running `apply`, we need to wait until [input] is available
231 // before we pass it into the stream. If it's available now, great; if
232 // not, [_onPrimaryStateChange] will handle it.
233 if (!input.state.isAvailable) {
234 // If we started running eagerly without being forced, abort that run if
235 // a new unavailable asset comes in.
236 if (input.isLazy && !_forced) _restartRun();
237 return;
238 }
239
240 _onPrimaryStateChange(input);
241 _maybeFinishApplyController();
242 } else {
243 // Otherwise, a new input means we'll need to re-run `declareOutputs`.
244 _restartRun();
245 }
246 }
247
248 /// Marks this transform as removed.
249 ///
250 /// This causes all of the transform's outputs to be marked as removed as
251 /// well. Normally this will be automatically done internally based on events
252 /// from the primary input, but it's possible for a transform to no longer be
253 /// valid even if its primary input still exists.
254 void remove() {
255 _streams.close();
256 _phaseAssetSubscription.cancel();
257 _phaseStatusSubscription.cancel();
258 if (_declareController != null) _declareController.cancel();
259 if (_applyController != null) _applyController.cancel();
260 _clearSecondarySubscriptions();
261 _clearOutputs();
262
263 for (var subscription in _primarySubscriptions.values) {
264 subscription.cancel();
265 }
266 _primarySubscriptions.clear();
267
268 for (var controller in _passThroughControllers.values) {
269 controller.setRemoved();
270 }
271 _passThroughControllers.clear();
272 }
273
274 /// If [this] is deferred, ensures that its concrete outputs will be
275 /// generated.
276 void force() {
277 if (_forced || _state == _State.APPLIED) return;
278 for (var input in _primaries) {
279 input.force();
280 }
281
282 _forced = true;
283 if (_state == _State.DECLARED) _apply();
284 }
285
286 /// Marks this transform as dirty.
287 ///
288 /// Specifically, this should be called when one of the transform's inputs'
289 /// contents change, or when a secondary input is removed. Primary inputs
290 /// being added or removed are handled by [addInput] and
291 /// [_onPrimaryStateChange].
292 void _dirty() {
293 if (_state == _State.DECLARING || _state == _State.NEEDS_DECLARE ||
294 _state == _State.NEEDS_APPLY) {
295 // If we already know that [_apply] needs to be run, there's nothing to do
296 // here.
297 return;
298 }
299
300 if (!_forced && !_canRunDeclaringEagerly) {
301 // [forced] should only ever be false for a declaring transformer.
302 assert(_declaring);
303
304 // If we've finished applying, transition to DECLARED, indicating that we
305 // know what outputs [apply] will emit but we're waiting to emit them
306 // concretely until [force] is called. If we're still applying, we'll
307 // transition to DECLARED once we finish.
308 if (_state == _State.APPLIED) _state = _State.DECLARED;
309 for (var controller in _outputControllers.values) {
310 controller.setLazy(force);
311 }
312 _emitDeclaredOutputs();
313 return;
314 }
315
316 if (_state == _State.APPLIED) {
317 if (_declaredOutputs != null) _emitDeclaredOutputs();
318 _apply();
319 } else if (_state == _State.DECLARED) {
320 _apply();
321 } else {
322 _state = _State.NEEDS_APPLY;
323 }
324 }
325
326 /// The callback called when [input]'s state changes.
327 void _onPrimaryStateChange(AssetNode input) {
328 if (input.state.isRemoved) {
329 _primarySubscriptions.remove(input.id);
330
331 if (_primaries.isEmpty) {
332 // If there are no more primary inputs, there's no more use for this
333 // node in the graph. It will be re-created by its
334 // [TransformerClassifier] if a new input with [key] is added.
335 remove();
336 return;
337 }
338
339 // Any change to the number of primary inputs requires that we re-run the
340 // transformation.
341 _restartRun();
342 } else if (input.state.isAvailable) {
343 if (_state == _State.DECLARED) {
344 // If we're passing through this input and its contents don't matter,
345 // update the pass-through controller.
346 var controller = _passThroughControllers[input.id];
347 if (controller != null) controller.setAvailable(input.asset);
348 }
349
350 if (_state == _State.DECLARED && _canRunDeclaringEagerly) {
351 // If [this] is fully declared but hasn't started applying, this input
352 // becoming available may mean that all inputs are available, in which
353 // case we can run apply eagerly.
354 _apply();
355 return;
356 }
357
358 // If we're not actively passing concrete assets to the transformer, the
359 // distinction between a dirty asset and an available one isn't relevant.
360 if (_state != _State.APPLYING) return;
361
362 if (_applyController.isDone) {
363 // If we get a new asset after we've closed the asset stream, we need to
364 // re-run declare and then apply.
365 _restartRun();
366 } else {
367 // If the new asset comes before the asset stream is done, we can just
368 // pass it to the stream.
369 _applyController.addInput(input.asset);
370 _maybeFinishApplyController();
371 }
372 } else {
373 if (_forced) input.force();
374
375 var controller = _passThroughControllers[input.id];
376 if (controller != null) controller.setDirty();
377
378 if (_state == _State.APPLYING && !_applyController.addedId(input.id) &&
379 (_forced || !input.isLazy)) {
380 // If the input hasn't yet been added to the transform's input stream,
381 // there's no need to consider the transformation dirty. However, if the
382 // input is lazy and we're running eagerly, we need to restart the
383 // transformation.
384 return;
385 }
386 _dirty();
387 }
388 }
389
390 /// Run the entire transformation, including both `declareOutputs` (if
391 /// applicable) and `apply`.
392 void _run() {
393 assert(_state != _State.DECLARING);
394 assert(_state != _State.APPLYING);
395
396 _markOutputsDirty();
397 _declareOutputs(() {
398 if (_forced || _canRunDeclaringEagerly) {
399 _apply();
400 } else {
401 _state = _State.DECLARED;
402 _streams.changeStatus(NodeStatus.IDLE);
403 }
404 });
405 }
406
407 /// Restart the entire transformation, including `declareOutputs` if
408 /// applicable.
409 void _restartRun() {
410 if (_state == _State.DECLARED || _state == _State.APPLIED) {
411 // If we're currently idle, we can restart the transformation immediately.
412 _run();
413 return;
414 }
415
416 // If we're actively running `declareOutputs` or `apply`, cancel the
417 // transforms and transition to `NEEDS_DECLARE`. Once the transformer's
418 // method returns, we'll transition to `DECLARING`.
419 if (_declareController != null) _declareController.cancel();
420 if (_applyController != null) _applyController.cancel();
421 _state = _State.NEEDS_DECLARE;
422 }
423
424 /// Runs [transform.declareOutputs] and emits the resulting assets as dirty
425 /// assets.
426 ///
427 /// Calls [callback] when it's finished. This doesn't return a future so that
428 /// [callback] is called synchronously if there are no outputs to declare. If
429 /// [this] is removed while inputs are being declared, [callback] will not be
430 /// called.
431 void _declareOutputs(void callback()) {
432 if (transformer is! DeclaringAggregateTransformer) {
433 callback();
434 return;
435 }
436
437 _state = _State.DECLARING;
438 var controller = new DeclaringAggregateTransformController(this);
439 _declareController = controller;
440 _streams.onLogPool.add(controller.onLog);
441 for (var primary in _primaries) {
442 controller.addId(primary.id);
443 }
444 _maybeFinishDeclareController();
445
446 syncFuture(() {
447 return (transformer as DeclaringAggregateTransformer)
448 .declareOutputs(controller.transform);
449 }).whenComplete(() {
450 // Cancel the controller here even if `declareOutputs` wasn't interrupted.
451 // Since the declaration is finished, we want to close out the
452 // controller's streams.
453 controller.cancel();
454 _declareController = null;
455 }).then((_) {
456 if (_isRemoved) return;
457 if (_state == _State.NEEDS_DECLARE) {
458 _declareOutputs(callback);
459 return;
460 }
461
462 if (controller.loggedError) {
463 // If `declareOutputs` fails, fall back to treating a declaring
464 // transformer as though it were eager.
465 if (transformer is! LazyAggregateTransformer) _forced = true;
466 callback();
467 return;
468 }
469
470 _consumedPrimaries = controller.consumedPrimaries;
471 _declaredOutputs = controller.outputIds;
472 var invalidIds = _declaredOutputs
473 .where((id) => id.package != phase.cascade.package).toSet();
474 for (var id in invalidIds) {
475 _declaredOutputs.remove(id);
476 // TODO(nweiz): report this as a warning rather than a failing error.
477 phase.cascade.reportError(new InvalidOutputException(info, id));
478 }
479
480 for (var primary in _primaries) {
481 if (_declaredOutputs.contains(primary.id)) continue;
482 _passThrough(primary.id);
483 }
484 _emitDeclaredOutputs();
485 callback();
486 }).catchError((error, stackTrace) {
487 if (_isRemoved) return;
488 if (transformer is! LazyAggregateTransformer) _forced = true;
489 phase.cascade.reportError(_wrapException(error, stackTrace));
490 callback();
491 });
492 }
493
494 /// Emits a dirty asset node for all outputs that were declared by the
495 /// transformer.
496 ///
497 /// This won't emit any outputs for which there already exist output
498 /// controllers. It should only be called for transforms that have declared
499 /// their outputs.
500 void _emitDeclaredOutputs() {
501 assert(_declaredOutputs != null);
502 for (var id in _declaredOutputs) {
503 if (_outputControllers.containsKey(id)) continue;
504 var controller = _forced
505 ? new AssetNodeController(id, this)
506 : new AssetNodeController.lazy(id, force, this);
507 _outputControllers[id] = controller;
508 _streams.onAssetController.add(controller.node);
509 }
510 }
511
512 //// Mark all emitted and passed-through outputs of this transform as dirty.
513 void _markOutputsDirty() {
514 for (var controller in _passThroughControllers.values) {
515 controller.setDirty();
516 }
517 for (var controller in _outputControllers.values) {
518 if (_forced) {
519 controller.setDirty();
520 } else {
521 controller.setLazy(force);
522 }
523 }
524 }
525
526 /// Applies this transform.
527 void _apply() {
528 assert(!_isRemoved);
529
530 _markOutputsDirty();
531 _clearSecondarySubscriptions();
532 _state = _State.APPLYING;
533 _streams.changeStatus(status);
534 _runApply().then((hadError) {
535 if (_isRemoved) return;
536
537 if (_state == _State.DECLARED) return;
538
539 if (_state == _State.NEEDS_DECLARE) {
540 _run();
541 return;
542 }
543
544 // If an input's contents changed while running `apply`, retry unless the
545 // transformer is deferred and hasn't been forced.
546 if (_state == _State.NEEDS_APPLY) {
547 if (_forced || _canRunDeclaringEagerly) {
548 _apply();
549 } else {
550 _state = _State.DECLARED;
551 }
552 return;
553 }
554
555 if (_declaring) _forced = false;
556
557 assert(_state == _State.APPLYING);
558 if (hadError) {
559 _clearOutputs();
560 // If the transformer threw an error, we don't want to emit the
561 // pass-through assets in case they'll be overwritten by the
562 // transformer. However, if the transformer declared that it wouldn't
563 // overwrite or consume a pass-through asset, we can safely emit it.
564 if (_declaredOutputs != null) {
565 for (var input in _primaries) {
566 if (_consumedPrimaries.contains(input.id) ||
567 _declaredOutputs.contains(input.id)) {
568 _consumePrimary(input.id);
569 } else {
570 _passThrough(input.id);
571 }
572 }
573 }
574 }
575
576 _state = _State.APPLIED;
577 _streams.changeStatus(NodeStatus.IDLE);
578 });
579 }
580
581 /// Gets the asset for an input [id].
582 ///
583 /// If an input with [id] cannot be found, throws an [AssetNotFoundException].
584 Future<Asset> getInput(AssetId id) {
585 _timeAwaitingInputs.start();
586 _pendingSecondaryInputs[id] = _pendingSecondaryInputs.containsKey(id)
587 ? _pendingSecondaryInputs[id] + 1
588 : 1;
589 return phase.previous.getOutput(id).then((node) {
590 // Throw if the input isn't found. This ensures the transformer's apply
591 // is exited. We'll then catch this and report it through the proper
592 // results stream.
593 if (node == null) {
594 _missingInputs.add(id);
595
596 // If this id is for an asset in another package, subscribe to that
597 // package's asset cascade so when it starts emitting the id we know to
598 // re-run the transformer.
599 if (id.package != phase.cascade.package) {
600 var stream = phase.cascade.graph.onAssetFor(id.package);
601 if (stream != null) {
602 _missingExternalInputSubscriptions.putIfAbsent(id.package, () {
603 return stream.listen((node) {
604 if (!_missingInputs.contains(node.id)) return;
605 if (_forced) node.force();
606 _dirty();
607 });
608 });
609 }
610 }
611
612 throw new AssetNotFoundException(id);
613 }
614
615 _secondarySubscriptions.putIfAbsent(node.id, () {
616 return node.onStateChange.listen((_) => _dirty());
617 });
618
619 return node.asset;
620 }).whenComplete(() {
621 assert(_pendingSecondaryInputs.containsKey(id));
622 if (_pendingSecondaryInputs[id] == 1) {
623 _pendingSecondaryInputs.remove(id);
624 } else {
625 _pendingSecondaryInputs[id]--;
626 }
627 if (_pendingSecondaryInputs.isEmpty) _timeAwaitingInputs.stop();
628 });
629 }
630
631 /// Run [AggregateTransformer.apply].
632 ///
633 /// Returns whether or not an error occurred while running the transformer.
634 Future<bool> _runApply() {
635 var controller = new AggregateTransformController(this);
636 _applyController = controller;
637 _streams.onLogPool.add(controller.onLog);
638 for (var primary in _primaries) {
639 if (!primary.state.isAvailable) continue;
640 controller.addInput(primary.asset);
641 }
642 _maybeFinishApplyController();
643
644 var transformCounterTimer;
645
646 return syncFuture(() {
647 _timeInTransformer.reset();
648 _timeAwaitingInputs.reset();
649 _timeInTransformer.start();
650
651 transformCounterTimer = new Timer.periodic(_applyLogDuration, (_) {
652 if (_streams.onLogController.isClosed ||
653 !_timeInTransformer.isRunning) {
654 return;
655 }
656
657 var message = new StringBuffer("Not yet complete after "
658 "${niceDuration(_timeInTransformer.elapsed)}");
659 if (_pendingSecondaryInputs.isNotEmpty) {
660 message.write(", waiting on input(s) "
661 "${_pendingSecondaryInputs.keys.join(", ")}");
662 }
663 _streams.onLogController.add(new LogEntry(
664 info,
665 info.primaryId,
666 LogLevel.FINE,
667 message.toString(),
668 null));
669 });
670
671 return transformer.apply(controller.transform);
672 }).whenComplete(() {
673 transformCounterTimer.cancel();
674 _timeInTransformer.stop();
675 _timeAwaitingInputs.stop();
676
677 // Cancel the controller here even if `apply` wasn't interrupted. Since
678 // the apply is finished, we want to close out the controller's streams.
679 controller.cancel();
680 _applyController = null;
681 }).then((_) {
682 assert(_state != _State.DECLARED);
683 assert(_state != _State.DECLARING);
684 assert(_state != _State.APPLIED);
685
686 if (!_forced && _primaries.any((node) => !node.state.isAvailable)) {
687 _state = _State.DECLARED;
688 _streams.changeStatus(NodeStatus.IDLE);
689 return false;
690 }
691
692 if (_isRemoved) return false;
693 if (_state == _State.NEEDS_APPLY) return false;
694 if (_state == _State.NEEDS_DECLARE) return false;
695 if (controller.loggedError) return true;
696
697 // If the transformer took long enough, log its duration in fine output.
698 // That way it's not always visible, but users running with "pub serve
699 // --verbose" can see it.
700 var ranLong = _timeInTransformer.elapsed > new Duration(seconds: 1);
701 var ranLongLocally =
702 _timeInTransformer.elapsed - _timeAwaitingInputs.elapsed >
703 new Duration(milliseconds: 200);
704
705 // Report the transformer's timing information if it spent more than 0.2s
706 // doing things other than waiting for its secondary inputs or if it spent
707 // more than 1s in total.
708 if (ranLongLocally || ranLong) {
709 _streams.onLogController.add(new LogEntry(
710 info, info.primaryId, LogLevel.FINE,
711 "Took ${niceDuration(_timeInTransformer.elapsed)} "
712 "(${niceDuration(_timeAwaitingInputs.elapsed)} awaiting "
713 "secondary inputs).",
714 null));
715 }
716
717 _handleApplyResults(controller);
718 return false;
719 }).catchError((error, stackTrace) {
720 // If the transform became dirty while processing, ignore any errors from
721 // it.
722 if (_state == _State.NEEDS_APPLY || _isRemoved) return false;
723
724 // Catch all transformer errors and pipe them to the results stream. This
725 // is so a broken transformer doesn't take down the whole graph.
726 phase.cascade.reportError(_wrapException(error, stackTrace));
727 return true;
728 });
729 }
730
731 /// Handle the results of running [Transformer.apply].
732 ///
733 /// [controller] should be the controller for the [AggegateTransform] passed
734 /// to [AggregateTransformer.apply].
735 void _handleApplyResults(AggregateTransformController controller) {
736 _consumedPrimaries = controller.consumedPrimaries;
737
738 var newOutputs = controller.outputs;
739 // Any ids that are for a different package are invalid.
740 var invalidIds = newOutputs
741 .map((asset) => asset.id)
742 .where((id) => id.package != phase.cascade.package)
743 .toSet();
744 for (var id in invalidIds) {
745 newOutputs.removeId(id);
746 // TODO(nweiz): report this as a warning rather than a failing error.
747 phase.cascade.reportError(new InvalidOutputException(info, id));
748 }
749
750 // Remove outputs that used to exist but don't anymore.
751 for (var id in _outputControllers.keys.toList()) {
752 if (newOutputs.containsId(id)) continue;
753 _outputControllers.remove(id).setRemoved();
754 }
755
756 // Emit or stop emitting pass-through assets between removing and adding
757 // outputs to ensure there are no collisions.
758 for (var id in _primaries.map((node) => node.id)) {
759 if (_consumedPrimaries.contains(id) || newOutputs.containsId(id)) {
760 _consumePrimary(id);
761 } else {
762 _passThrough(id);
763 }
764 }
765
766 // Store any new outputs or new contents for existing outputs.
767 for (var asset in newOutputs) {
768 var controller = _outputControllers[asset.id];
769 if (controller != null) {
770 controller.setAvailable(asset);
771 } else {
772 var controller = new AssetNodeController.available(asset, this);
773 _outputControllers[asset.id] = controller;
774 _streams.onAssetController.add(controller.node);
775 }
776 }
777 }
778
779 /// Cancels all subscriptions to secondary input nodes and other cascades.
780 void _clearSecondarySubscriptions() {
781 _missingInputs.clear();
782 for (var subscription in _secondarySubscriptions.values) {
783 subscription.cancel();
784 }
785 for (var subscription in _missingExternalInputSubscriptions.values) {
786 subscription.cancel();
787 }
788 _secondarySubscriptions.clear();
789 _missingExternalInputSubscriptions.clear();
790 }
791
792 /// Removes all output assets.
793 void _clearOutputs() {
794 // Remove all the previously-emitted assets.
795 for (var controller in _outputControllers.values) {
796 controller.setRemoved();
797 }
798 _outputControllers.clear();
799 }
800
801 /// Emit the pass-through node for the primary input [id] if it's not being
802 /// emitted already.
803 void _passThrough(AssetId id) {
804 assert(!_outputControllers.containsKey(id));
805
806 if (_consumedPrimaries.contains(id)) return;
807 var controller = _passThroughControllers[id];
808 var primary = _primaries[id];
809 if (controller == null) {
810 controller = new AssetNodeController.from(primary);
811 _passThroughControllers[id] = controller;
812 _streams.onAssetController.add(controller.node);
813 } else if (primary.state.isDirty) {
814 controller.setDirty();
815 } else if (!controller.node.state.isAvailable) {
816 controller.setAvailable(primary.asset);
817 }
818 }
819
820 /// Stops emitting the pass-through node for the primary input [id] if it's
821 /// being emitted.
822 void _consumePrimary(AssetId id) {
823 var controller = _passThroughControllers.remove(id);
824 if (controller == null) return;
825 controller.setRemoved();
826 }
827
828 /// If `declareOutputs` is running and all previous phases have declared their
829 /// outputs, mark [_declareController] as done.
830 void _maybeFinishDeclareController() {
831 if (_declareController == null) return;
832 if (classifier.isClassifying) return;
833 if (phase.previous.status == NodeStatus.RUNNING) return;
834 _declareController.done();
835 }
836
837 /// If `apply` is running, all previous phases have declared their outputs,
838 /// and all primary inputs are available and thus have been passed to the
839 /// transformer, mark [_applyController] as done.
840 void _maybeFinishApplyController() {
841 if (_applyController == null) return;
842 if (classifier.isClassifying) return;
843 if (_primaries.any((input) => !input.state.isAvailable)) return;
844 if (phase.previous.status == NodeStatus.RUNNING) return;
845 _applyController.done();
846 }
847
848 BarbackException _wrapException(error, StackTrace stackTrace) {
849 if (error is! AssetNotFoundException) {
850 return new TransformerException(info, error, stackTrace);
851 } else {
852 return new MissingInputException(info, error.id);
853 }
854 }
855
856 String toString() =>
857 "transform node in $_location for $transformer on ${info.primaryId} "
858 "($_state, $status, ${_forced ? '' : 'un'}forced)";
859 }
860
861 /// The enum of states that [TransformNode] can be in.
862 class _State {
863 /// The transform is running [DeclaringAggregateTransformer.declareOutputs].
864 ///
865 /// If the set of primary inputs changes while in this state, it will
866 /// transition to [NEEDS_DECLARE]. If the [TransformNode] is still in this
867 /// state when `declareOutputs` finishes running, it will transition to
868 /// [APPLYING] if the transform is non-lazy and all of its primary inputs are
869 /// available, and [DECLARED] otherwise.
870 ///
871 /// Non-declaring transformers will transition out of this state and into
872 /// [APPLYING] immediately.
873 static const DECLARING = const _State._("declaring outputs");
874
875 /// The transform is running [AggregateTransformer.declareOutputs] or
876 /// [AggregateTransform.apply], but a primary input was added or removed after
877 /// it started, so it will need to re-run `declareOutputs`.
878 ///
879 /// The [TransformNode] will transition to [DECLARING] once `declareOutputs`
880 /// or `apply` finishes running.
881 static const NEEDS_DECLARE = const _State._("needs declare");
882
883 /// The transform is deferred and has run
884 /// [DeclaringAggregateTransformer.declareOutputs] but hasn't yet been forced.
885 ///
886 /// The [TransformNode] will transition to [APPLYING] when one of the outputs
887 /// has been forced or if the transformer is non-lazy and all of its primary
888 /// inputs become available.
889 static const DECLARED = const _State._("declared");
890
891 /// The transform is running [AggregateTransformer.apply].
892 ///
893 /// If an input's contents change or a secondary input is added or removed
894 /// while in this state, the [TransformNode] will transition to [NEEDS_APPLY].
895 /// If a primary input is added or removed, it will transition to
896 /// [NEEDS_DECLARE]. If it's still in this state when `apply` finishes
897 /// running, it will transition to [APPLIED].
898 static const APPLYING = const _State._("applying");
899
900 /// The transform is running [AggregateTransformer.apply], but an input's
901 /// contents changed or a secondary input was added or removed after it
902 /// started, so it will need to re-run `apply`.
903 ///
904 /// If a primary input is added or removed while in this state, the
905 /// [TranformNode] will transition to [NEEDS_DECLARE]. If it's still in this
906 /// state when `apply` finishes running, it will transition to [APPLYING].
907 static const NEEDS_APPLY = const _State._("needs apply");
908
909 /// The transform has finished running [AggregateTransformer.apply], whether
910 /// or not it emitted an error.
911 ///
912 /// If an input's contents change or a secondary input is added or removed,
913 /// the [TransformNode] will transition to [DECLARED] if the transform is
914 /// declaring and [APPLYING] otherwise. If a primary input is added or
915 /// removed, this will transition to [DECLARING].
916 static const APPLIED = const _State._("applied");
917
918 final String name;
919
920 const _State._(this.name);
921
922 String toString() => name;
923 }
OLDNEW
« no previous file with comments | « barback/lib/src/graph/static_asset_cascade.dart ('k') | barback/lib/src/graph/transformer_classifier.dart » ('j') | no next file with comments »

Powered by Google App Engine