Index: pkg/barback/lib/src/graph/transform_node.dart |
diff --git a/pkg/barback/lib/src/graph/transform_node.dart b/pkg/barback/lib/src/graph/transform_node.dart |
index 75a76a26e22714d8411211ecf35f00762cc65d99..5ef4b8108363930ad4b13acd4852057677bd5a89 100644 |
--- a/pkg/barback/lib/src/graph/transform_node.dart |
+++ b/pkg/barback/lib/src/graph/transform_node.dart |
@@ -9,15 +9,15 @@ import 'dart:async'; |
import '../asset/asset.dart'; |
import '../asset/asset_id.dart'; |
import '../asset/asset_node.dart'; |
+import '../asset/asset_node_set.dart'; |
import '../errors.dart'; |
import '../log.dart'; |
import '../transformer/aggregate_transform.dart'; |
+import '../transformer/aggregate_transformer.dart'; |
import '../transformer/declaring_aggregate_transform.dart'; |
-import '../transformer/declaring_transform.dart'; |
-import '../transformer/declaring_transformer.dart'; |
-import '../transformer/lazy_transformer.dart'; |
-import '../transformer/transform.dart'; |
-import '../transformer/transformer.dart'; |
+import '../transformer/declaring_aggregate_transformer.dart'; |
+import '../transformer/lazy_aggregate_transformer.dart'; |
+import '../utils.dart'; |
import 'node_status.dart'; |
import 'node_streams.dart'; |
import 'phase.dart'; |
@@ -34,20 +34,23 @@ class TransformNode { |
/// The [Phase] that this transform runs in. |
final Phase phase; |
- /// The [Transformer] to apply to this node's inputs. |
- final Transformer transformer; |
+ /// The [AggregateTransformer] to apply to this node's inputs. |
+ final AggregateTransformer transformer; |
- /// The node for the primary asset this transform depends on. |
- final AssetNode primary; |
+ /// The primary asset nodes this transform runs on. |
+ final _primaries = new AssetNodeSet(); |
/// A string describing the location of [this] in the transformer graph. |
final String _location; |
- /// The subscription to [primary]'s [AssetNode.onStateChange] stream. |
- StreamSubscription _primarySubscription; |
+ /// The subscription to the [_primaries]' [AssetNode.onStateChange] streams. |
+ final _primarySubscriptions = new Map<AssetId, StreamSubscription>(); |
/// The subscription to [phase]'s [Phase.onAsset] stream. |
- StreamSubscription<AssetNode> _phaseSubscription; |
+ StreamSubscription<AssetNode> _phaseAssetSubscription; |
+ |
+ /// The subscription to [phase]'s [Phase.onStatusChange] stream. |
+ StreamSubscription<NodeStatus> _phaseStatusSubscription; |
/// How far along [this] is in processing its assets. |
NodeStatus get status { |
@@ -55,19 +58,28 @@ class TransformNode { |
return NodeStatus.IDLE; |
} |
- if (_declaring && _state != _State.DECLARING) { |
+ if (_declaring && _state != _State.DECLARING && |
+ _state != _State.NEEDS_DECLARE) { |
return NodeStatus.MATERIALIZING; |
} else { |
return NodeStatus.RUNNING; |
} |
} |
+ /// The [TransformInfo] describing this node. |
+ /// |
+ /// [TransformInfo] is the publicly-visible representation of a transform |
+ /// node. |
+ TransformInfo get info => new TransformInfo(transformer, |
+ new AssetId(phase.cascade.package, key)); |
+ |
/// Whether this is a declaring transform. |
/// |
- /// This is usually identical to `transformer is DeclaringTransformer`, but if |
- /// a declaring and non-lazy transformer emits an error during |
- /// `declareOutputs` it's treated as though it wasn't declaring. |
- bool get _declaring => transformer is DeclaringTransformer && |
+ /// This is usually identical to `transformer is |
+ /// DeclaringAggregateTransformer`, but if a declaring and non-lazy |
+ /// transformer emits an error during `declareOutputs` it's treated as though |
+ /// it wasn't declaring. |
+ bool get _declaring => transformer is DeclaringAggregateTransformer && |
(_state == _State.DECLARING || _declaredOutputs != null); |
/// Whether this transform has been forced since it last finished applying. |
@@ -77,8 +89,9 @@ class TransformNode { |
/// transformers, since they always need to eagerly generate outputs. |
bool _forced; |
- /// The subscriptions to each input's [AssetNode.onStateChange] stream. |
- final _inputSubscriptions = new Map<AssetId, StreamSubscription>(); |
+ /// The subscriptions to each secondary input's [AssetNode.onStateChange] |
+ /// stream. |
+ final _secondarySubscriptions = new Map<AssetId, StreamSubscription>(); |
/// The controllers for the asset nodes emitted by this node. |
final _outputControllers = new Map<AssetId, AssetNodeController>(); |
@@ -87,14 +100,14 @@ class TransformNode { |
/// ran. |
final _missingInputs = new Set<AssetId>(); |
- /// The controller that's used to pass [primary] through [this] if it's not |
- /// consumed or overwritten. |
+ /// The controllers that are used to pass each primary input through [this] if |
+ /// it's not consumed or overwritten. |
/// |
/// This needs an intervening controller to ensure that the output can be |
/// marked dirty when determining whether [this] will consume or overwrite it, |
- /// and be marked removed if it does. [_passThroughController] will be null |
- /// if the asset is not being passed through. |
- AssetNodeController _passThroughController; |
+ /// and be marked removed if it does. No pass-through controller will exist |
+ /// for primary inputs that are not being passed through. |
+ final _passThroughControllers = new Map<AssetId, AssetNodeController>(); |
/// The asset node for this transform. |
final _streams = new NodeStreams(); |
@@ -103,7 +116,7 @@ class TransformNode { |
Stream<LogEntry> get onLog => _streams.onLog; |
/// The current state of [this]. |
- var _state = _State.DECLARING; |
+ var _state = _State.DECLARED; |
/// Whether [this] has been marked as removed. |
bool get _isRemoved => _streams.onAssetController.isClosed; |
@@ -112,60 +125,87 @@ class TransformNode { |
// can run [apply] even if [force] hasn't been called, since [transformer] |
// should run eagerly if possible. |
bool get _canRunDeclaringEagerly => |
- _declaring && transformer is! LazyTransformer && |
- primary.state.isAvailable; |
+ _declaring && transformer is! LazyAggregateTransformer && |
+ _primaries.every((input) => input.state.isAvailable); |
- /// Whether the most recent run of this transform has declared that it |
- /// consumes the primary input. |
+ /// Which primary inputs the most recent run of this transform has declared |
+ /// that it consumes. |
/// |
- /// Defaults to `false`. This is not meaningful unless [_state] is |
- /// [_State.APPLIED] or [_State.DECLARED]. |
- bool _consumePrimary = false; |
+ /// This starts out `null`, indicating that the transform hasn't declared |
+ /// anything yet. This is not meaningful unless [_state] is [_State.APPLIED] |
+ /// or [_State.DECLARED]. |
+ Set<AssetId> _consumedPrimaries; |
/// The set of output ids that [transformer] declared it would emit. |
/// |
- /// This is only non-null if [transformer] is a [DeclaringTransformer] and its |
- /// [declareOutputs] has been run successfully. |
+ /// This is only non-null if [transformer] is a |
+ /// [DeclaringAggregateTransformer] and its [declareOutputs] has been run |
+ /// successfully. |
Set<AssetId> _declaredOutputs; |
- TransformNode(this.phase, Transformer transformer, AssetNode primary, |
- this._location) |
- : key = primary.id.path, |
- transformer = transformer, |
- primary = primary { |
- _forced = transformer is! DeclaringTransformer; |
- if (_forced) primary.force(); |
+ /// The controller for the currently-running |
+ /// [DeclaringAggregateTransformer.declareOutputs] call's |
+ /// [DeclaringAggregateTransform]. |
+ /// |
+ /// This will be non-`null` when |
+ /// [DeclaringAggregateTransformer.declareOutputs] is running. This means that |
+ /// it's always non-`null` when [_state] is [_State.DECLARING], sometimes |
+ /// non-`null` when it's [_State.NEEDS_DECLARE], and always `null` otherwise. |
+ DeclaringAggregateTransformController _declareController; |
+ |
+ /// The controller for the currently-running [AggregateTransformer.apply] |
+ /// call's [AggregateTransform]. |
+ /// |
+ /// This will be non-`null` when [AggregateTransform.apply] is running, which |
+ /// means that it's always non-`null` when [_state] is [_State.APPLYING] or |
+ /// [_State.NEEDS_APPLY], sometimes non-`null` when it's |
+ /// [_State.NEEDS_DECLARE], and always `null` otherwise. |
+ AggregateTransformController _applyController; |
- _primarySubscription = primary.onStateChange.listen((state) { |
- if (state.isRemoved) { |
- remove(); |
- } else { |
- if (_forced) primary.force(); |
- _dirty(); |
- } |
- }); |
+ TransformNode(this.phase, this.transformer, this.key, this._location) { |
+ _forced = transformer is! DeclaringAggregateTransformer; |
- _phaseSubscription = phase.previous.onAsset.listen((node) { |
+ _phaseAssetSubscription = phase.previous.onAsset.listen((node) { |
if (!_missingInputs.contains(node.id)) return; |
if (_forced) node.force(); |
_dirty(); |
}); |
- _declareOutputs().then((_) { |
- if (_forced || _canRunDeclaringEagerly) { |
- _apply(); |
- } else { |
- _state = _State.DECLARED; |
- _streams.changeStatus(NodeStatus.IDLE); |
- } |
+ _phaseStatusSubscription = phase.previous.onStatusChange.listen((status) { |
+ if (status == NodeStatus.RUNNING) return; |
+ |
+ _maybeFinishDeclareController(); |
+ _maybeFinishApplyController(); |
}); |
+ |
+ _run(); |
} |
- /// The [TransformInfo] describing this node. |
- /// |
- /// [TransformInfo] is the publicly-visible representation of a transform |
- /// node. |
- TransformInfo get info => new TransformInfo(transformer, primary.id); |
+ /// Adds [input] as a primary input for this node. |
+ void addPrimary(AssetNode input) { |
+ _primaries.add(input); |
+ if (_forced) input.force(); |
+ |
+ _primarySubscriptions[input.id] = input.onStateChange |
+ .listen((_) => _onPrimaryStateChange(input)); |
+ |
+ if (_state == _State.DECLARING && !_declareController.isDone) { |
+ // If we're running `declareOutputs` and its id stream isn't closed yet, |
+ // pass this in as another id. |
+ _declareController.addId(input.id); |
+ _maybeFinishDeclareController(); |
+ } else if (_state == _State.APPLYING) { |
+ // If we're running `apply`, we need to wait until [input] is available |
+ // before we pass it into the stream. If it's available now, great; if |
+ // not, [_onPrimaryStateChange] will handle it. |
+ if (!input.state.isAvailable) return; |
+ _onPrimaryStateChange(input); |
+ _maybeFinishApplyController(); |
+ } else { |
+ // Otherwise, a new input means we'll need to re-run `declareOutputs`. |
+ _restartRun(); |
+ } |
+ } |
/// Marks this transform as removed. |
/// |
@@ -175,41 +215,58 @@ class TransformNode { |
/// valid even if its primary input still exists. |
void remove() { |
_streams.close(); |
- _primarySubscription.cancel(); |
- _phaseSubscription.cancel(); |
- _clearInputSubscriptions(); |
+ _phaseAssetSubscription.cancel(); |
+ _phaseStatusSubscription.cancel(); |
+ if (_declareController != null) _declareController.cancel(); |
+ if (_applyController != null) _applyController.cancel(); |
+ _clearSecondarySubscriptions(); |
_clearOutputs(); |
- if (_passThroughController != null) { |
- _passThroughController.setRemoved(); |
- _passThroughController = null; |
+ |
+ for (var subscription in _primarySubscriptions.values) { |
+ subscription.cancel(); |
+ } |
+ _primarySubscriptions.clear(); |
+ |
+ for (var controller in _passThroughControllers.values) { |
+ controller.setRemoved(); |
} |
+ _passThroughControllers.clear(); |
} |
/// If [this] is deferred, ensures that its concrete outputs will be |
/// generated. |
void force() { |
if (_forced || _state == _State.APPLIED) return; |
- primary.force(); |
+ for (var input in _primaries) { |
+ input.force(); |
+ } |
+ |
_forced = true; |
- if (_state == _State.DECLARED) _dirty(); |
+ if (_state == _State.DECLARED) _apply(); |
} |
/// Marks this transform as dirty. |
/// |
- /// This causes all of the transform's outputs to be marked as dirty as well. |
+ /// Specifically, this should be called when one of the transform's inputs' |
+ /// contents change, or when a secondary input is removed. Primary inputs |
+ /// being added or removed are handled by [addInput] and |
+ /// [_onPrimaryStateChange]. |
void _dirty() { |
- // If we're in the process of running [declareOutputs], we already know that |
- // [apply] needs to be run so there's nothing we need to mark as dirty. |
- if (_state == _State.DECLARING) return; |
+ if (_state == _State.DECLARING || _state == _State.NEEDS_DECLARE || |
+ _state == _State.NEEDS_APPLY) { |
+ // If we already know that [_apply] needs to be run, there's nothing to do |
+ // here. |
+ return; |
+ } |
if (!_forced && !_canRunDeclaringEagerly) { |
// [forced] should only ever be false for a declaring transformer. |
assert(_declaring); |
- // If we've finished applying, transition to MATERIALIZING, indicating |
- // that we know what outputs [apply] will emit but we're waiting to emit |
- // them concretely until [force] is called. If we're still applying, we'll |
- // transition to MATERIALIZING once we finish. |
+ // If we've finished applying, transition to DECLARED, indicating that we |
+ // know what outputs [apply] will emit but we're waiting to emit them |
+ // concretely until [force] is called. If we're still applying, we'll |
+ // transition to DECLARED once we finish. |
if (_state == _State.APPLIED) _state = _State.DECLARED; |
for (var controller in _outputControllers.values) { |
controller.setLazy(force); |
@@ -218,11 +275,6 @@ class TransformNode { |
return; |
} |
- if (_passThroughController != null) _passThroughController.setDirty(); |
- for (var controller in _outputControllers.values) { |
- controller.setDirty(); |
- } |
- |
if (_state == _State.APPLIED) { |
if (_declaredOutputs != null) _emitDeclaredOutputs(); |
_apply(); |
@@ -233,21 +285,136 @@ class TransformNode { |
} |
} |
+ /// The callback called when [input]'s state changes. |
+ void _onPrimaryStateChange(AssetNode input) { |
+ if (input.state.isRemoved) { |
+ _primarySubscriptions.remove(input.id); |
+ |
+ if (_primaries.isEmpty) { |
+ // If there are no more primary inputs, there's no more use for this |
+ // node in the graph. It will be re-created by its |
+ // [TransformerClassifier] if a new input with [key] is added. |
+ remove(); |
+ return; |
+ } |
+ |
+ // Any change to the number of primary inputs requires that we re-run the |
+ // transformation. |
+ _restartRun(); |
+ } else if (input.state.isAvailable) { |
+ if (_state == _State.DECLARED && _canRunDeclaringEagerly) { |
+ // If [this] is fully declared but hasn't started applying, this input |
+ // becoming available may mean that all inputs are available, in which |
+ // case we can run apply eagerly. |
+ _apply(); |
+ return; |
+ } |
+ |
+ // If we're not actively passing concrete assets to the transformer, the |
+ // distinction between a dirty asset and an available one isn't relevant. |
+ if (_state != _State.APPLYING) return; |
+ |
+ if (_applyController.isDone) { |
+ // If we get a new asset after we've closed the asset stream, we need to |
+ // re-run declare and then apply. |
+ _restartRun(); |
+ } else { |
+ // If the new asset comes before the asset stream is done, we can just |
+ // pass it to the stream. |
+ _applyController.addInput(input.asset); |
+ _maybeFinishApplyController(); |
+ } |
+ } else { |
+ if (_forced) input.force(); |
+ if (_state == _State.APPLYING && !_applyController.addedId(input.id)) { |
+ // If the input hasn't yet been added to the transform's input stream, |
+ // there's no need to consider the transformation dirty. |
+ return; |
+ } |
+ _dirty(); |
+ } |
+ } |
+ |
+ /// Run the entire transformation, including both `declareOutputs` (if |
+ /// applicable) and `apply`. |
+ void _run() { |
+ assert(_state != _State.DECLARING); |
+ assert(_state != _State.APPLYING); |
+ |
+ _markOutputsDirty(); |
+ _declareOutputs(() { |
+ if (_forced || _canRunDeclaringEagerly) { |
+ _apply(); |
+ } else { |
+ _state = _State.DECLARED; |
+ _streams.changeStatus(NodeStatus.IDLE); |
+ } |
+ }); |
+ } |
+ |
+ /// Restart the entire transformation, including `declareOutputs` if |
+ /// applicable. |
+ void _restartRun() { |
+ if (_state == _State.DECLARED || _state == _State.APPLIED) { |
+ // If we're currently idle, we can restart the transformation immediately. |
+ _run(); |
+ return; |
+ } |
+ |
+ // If we're actively running `declareOutputs` or `apply`, cancel the |
+ // transforms and transition to `NEEDS_DECLARE`. Once the transformer's |
+ // method returns, we'll transition to `DECLARING`. |
+ if (_declareController != null) _declareController.cancel(); |
+ if (_applyController != null) _applyController.cancel(); |
+ _state = _State.NEEDS_DECLARE; |
+ } |
+ |
/// Runs [transform.declareOutputs] and emits the resulting assets as dirty |
/// assets. |
- Future _declareOutputs() { |
- if (transformer is! DeclaringTransformer) return new Future.value(); |
+ /// |
+ /// Calls [callback] when it's finished. This doesn't return a future so that |
+ /// [callback] is called synchronously if there are no outputs to declare. If |
+ /// [this] is removed while inputs are being declared, [callback] will not be |
+ /// called. |
+ void _declareOutputs(void callback()) { |
+ if (transformer is! DeclaringAggregateTransformer) { |
+ callback(); |
+ return; |
+ } |
+ _state = _State.DECLARING; |
var controller = new DeclaringAggregateTransformController(this); |
+ _declareController = controller; |
_streams.onLogPool.add(controller.onLog); |
- controller.idController.add(primary.id); |
- return newDeclaringTransform(controller.transform).then((transform) { |
- return (transformer as DeclaringTransformer).declareOutputs(transform); |
+ for (var primary in _primaries) { |
+ controller.addId(primary.id); |
+ } |
+ |
+ syncFuture(() { |
+ return (transformer as DeclaringAggregateTransformer) |
+ .declareOutputs(controller.transform); |
+ }).whenComplete(() { |
+ // Cancel the controller here even if `declareOutputs` wasn't interrupted. |
+ // Since the declaration is finished, we want to close out the |
+ // controller's streams. |
+ controller.cancel(); |
+ _declareController = null; |
}).then((_) { |
if (_isRemoved) return; |
- if (controller.loggedError) return; |
+ if (_state == _State.NEEDS_DECLARE) { |
+ _declareOutputs(callback); |
+ return; |
+ } |
- _consumePrimary = controller.consumedPrimaries.contains(primary.id); |
+ if (controller.loggedError) { |
+ // If `declareOutputs` fails, fall back to treating a declaring |
+ // transformer as though it were eager. |
+ if (transformer is! LazyAggregateTransformer) _forced = true; |
+ callback(); |
+ return; |
+ } |
+ |
+ _consumedPrimaries = controller.consumedPrimaries; |
_declaredOutputs = controller.outputIds; |
var invalidIds = _declaredOutputs |
.where((id) => id.package != phase.cascade.package).toSet(); |
@@ -257,12 +424,17 @@ class TransformNode { |
phase.cascade.reportError(new InvalidOutputException(info, id)); |
} |
- if (!_declaredOutputs.contains(primary.id)) _emitPassThrough(); |
+ for (var primary in _primaries) { |
+ if (_declaredOutputs.contains(primary.id)) continue; |
+ _passThrough(primary.id); |
+ } |
_emitDeclaredOutputs(); |
+ callback(); |
}).catchError((error, stackTrace) { |
if (_isRemoved) return; |
- if (transformer is! LazyTransformer) _forced = true; |
+ if (transformer is! LazyAggregateTransformer) _forced = true; |
phase.cascade.reportError(_wrapException(error, stackTrace)); |
+ callback(); |
}); |
} |
@@ -284,13 +456,26 @@ class TransformNode { |
} |
} |
+ //// Mark all emitted and passed-through outputs of this transform as dirty. |
+ void _markOutputsDirty() { |
+ for (var controller in _passThroughControllers.values) { |
+ controller.setDirty(); |
+ } |
+ for (var controller in _outputControllers.values) { |
+ if (_forced) { |
+ controller.setDirty(); |
+ } else { |
+ controller.setLazy(force); |
+ } |
+ } |
+ } |
+ |
/// Applies this transform. |
void _apply() { |
assert(!_isRemoved); |
- // Clear input subscriptions here as well as in [_process] because [_apply] |
- // may be restarted independently if only a secondary input changes. |
- _clearInputSubscriptions(); |
+ _markOutputsDirty(); |
+ _clearSecondarySubscriptions(); |
_state = _State.APPLYING; |
_streams.changeStatus(status); |
_runApply().then((hadError) { |
@@ -298,8 +483,19 @@ class TransformNode { |
if (_state == _State.DECLARED) return; |
+ if (_state == _State.NEEDS_DECLARE) { |
+ _run(); |
+ return; |
+ } |
+ |
+ // If an input's contents changed while running `apply`, retry unless the |
+ // transformer is deferred and hasn't been forced. |
if (_state == _State.NEEDS_APPLY) { |
- _apply(); |
+ if (_forced || _canRunDeclaringEagerly) { |
+ _apply(); |
+ } else { |
+ _state = _State.DECLARED; |
+ } |
return; |
} |
@@ -309,14 +505,18 @@ class TransformNode { |
if (hadError) { |
_clearOutputs(); |
// If the transformer threw an error, we don't want to emit the |
- // pass-through asset in case it will be overwritten by the transformer. |
- // However, if the transformer declared that it wouldn't overwrite or |
- // consume the pass-through asset, we can safely emit it. |
- if (_declaredOutputs != null && !_consumePrimary && |
- !_declaredOutputs.contains(primary.id)) { |
- _emitPassThrough(); |
- } else { |
- _dontEmitPassThrough(); |
+ // pass-through assets in case they'll be overwritten by the |
+ // transformer. However, if the transformer declared that it wouldn't |
+ // overwrite or consume a pass-through asset, we can safely emit it. |
+ if (_declaredOutputs != null) { |
+ for (var input in _primaries) { |
+ if (_consumedPrimaries.contains(input.id) || |
+ _declaredOutputs.contains(input.id)) { |
+ _consumePrimary(input.id); |
+ } else { |
+ _passThrough(input.id); |
+ } |
+ } |
} |
} |
@@ -338,30 +538,39 @@ class TransformNode { |
throw new AssetNotFoundException(id); |
} |
- _inputSubscriptions.putIfAbsent(node.id, () { |
- return node.onStateChange.listen((state) => _dirty()); |
+ _secondarySubscriptions.putIfAbsent(node.id, () { |
+ return node.onStateChange.listen((_) => _dirty()); |
}); |
return node.asset; |
}); |
} |
- /// Run [Transformer.apply] as soon as [primary] is available. |
+ /// Run [AggregateTransformer.apply]. |
/// |
/// Returns whether or not an error occurred while running the transformer. |
Future<bool> _runApply() { |
var controller = new AggregateTransformController(this); |
+ _applyController = controller; |
_streams.onLogPool.add(controller.onLog); |
+ for (var primary in _primaries) { |
+ if (!primary.state.isAvailable) continue; |
+ controller.addInput(primary.asset); |
+ } |
- return primary.whenAvailable((_) { |
- if (_isRemoved) return null; |
- _state = _State.APPLYING; |
- controller.inputController.add(primary.asset); |
- return newTransform(controller.transform).then((transform) { |
- return transformer.apply(transform); |
- }); |
+ return syncFuture(() { |
+ return transformer.apply(controller.transform); |
+ }).whenComplete(() { |
+ // Cancel the controller here even if `apply` wasn't interrupted. Since |
+ // the apply is finished, we want to close out the controller's streams. |
+ controller.cancel(); |
+ _applyController = null; |
}).then((_) { |
- if (!_forced && !primary.state.isAvailable) { |
+ assert(_state != _State.DECLARED); |
+ assert(_state != _State.DECLARING); |
+ assert(_state != _State.APPLIED); |
+ |
+ if (!_forced && _primaries.any((node) => !node.state.isAvailable)) { |
_state = _State.DECLARED; |
_streams.changeStatus(NodeStatus.IDLE); |
return false; |
@@ -369,7 +578,7 @@ class TransformNode { |
if (_isRemoved) return false; |
if (_state == _State.NEEDS_APPLY) return false; |
- if (_state == _State.DECLARING) return false; |
+ if (_state == _State.NEEDS_DECLARE) return false; |
if (controller.loggedError) return true; |
_handleApplyResults(controller); |
return false; |
@@ -390,7 +599,7 @@ class TransformNode { |
/// [controller] should be the controller for the [AggegateTransform] passed |
/// to [AggregateTransformer.apply]. |
void _handleApplyResults(AggregateTransformController controller) { |
- _consumePrimary = controller.consumedPrimaries.contains(primary.id); |
+ _consumedPrimaries = controller.consumedPrimaries; |
var newOutputs = controller.outputs; |
// Any ids that are for a different package are invalid. |
@@ -410,12 +619,14 @@ class TransformNode { |
_outputControllers.remove(id).setRemoved(); |
} |
- // Emit or stop emitting the pass-through asset between removing and |
- // adding outputs to ensure there are no collisions. |
- if (!_consumePrimary && !newOutputs.containsId(primary.id)) { |
- _emitPassThrough(); |
- } else { |
- _dontEmitPassThrough(); |
+ // Emit or stop emitting pass-through assets between removing and adding |
+ // outputs to ensure there are no collisions. |
+ for (var id in _primaries.map((node) => node.id)) { |
+ if (_consumedPrimaries.contains(id) || newOutputs.containsId(id)) { |
+ _consumePrimary(id); |
+ } else { |
+ _passThrough(id); |
+ } |
} |
// Store any new outputs or new contents for existing outputs. |
@@ -432,12 +643,12 @@ class TransformNode { |
} |
/// Cancels all subscriptions to secondary input nodes. |
- void _clearInputSubscriptions() { |
+ void _clearSecondarySubscriptions() { |
_missingInputs.clear(); |
- for (var subscription in _inputSubscriptions.values) { |
+ for (var subscription in _secondarySubscriptions.values) { |
subscription.cancel(); |
} |
- _inputSubscriptions.clear(); |
+ _secondarySubscriptions.clear(); |
} |
/// Removes all output assets. |
@@ -449,26 +660,49 @@ class TransformNode { |
_outputControllers.clear(); |
} |
- /// Emit the pass-through asset if it's not being emitted already. |
- void _emitPassThrough() { |
- assert(!_outputControllers.containsKey(primary.id)); |
- |
- if (_consumePrimary) return; |
- if (_passThroughController == null) { |
- _passThroughController = new AssetNodeController.from(primary); |
- _streams.onAssetController.add(_passThroughController.node); |
+ /// Emit the pass-through node for the primary input [id] if it's not being |
+ /// emitted already. |
+ void _passThrough(AssetId id) { |
+ assert(!_outputControllers.containsKey(id)); |
+ |
+ if (_consumedPrimaries.contains(id)) return; |
+ var controller = _passThroughControllers[id]; |
+ var primary = _primaries[id]; |
+ if (controller == null) { |
+ controller = new AssetNodeController.from(primary); |
+ _passThroughControllers[id] = controller; |
+ _streams.onAssetController.add(controller.node); |
} else if (primary.state.isDirty) { |
- _passThroughController.setDirty(); |
- } else if (!_passThroughController.node.state.isAvailable) { |
- _passThroughController.setAvailable(primary.asset); |
+ controller.setDirty(); |
+ } else if (!controller.node.state.isAvailable) { |
+ controller.setAvailable(primary.asset); |
} |
} |
- /// Stop emitting the pass-through asset if it's being emitted already. |
- void _dontEmitPassThrough() { |
- if (_passThroughController == null) return; |
- _passThroughController.setRemoved(); |
- _passThroughController = null; |
+ /// Stops emitting the pass-through node for the primary input [id] if it's |
+ /// being emitted. |
+ void _consumePrimary(AssetId id) { |
+ var controller = _passThroughControllers.remove(id); |
+ if (controller == null) return; |
+ controller.setRemoved(); |
+ } |
+ |
+ /// If `declareOutputs` is running and all previous phases have declared their |
+ /// outputs, mark [_declareController] as done. |
+ void _maybeFinishDeclareController() { |
+ if (_declareController == null) return; |
+ if (phase.previous.status == NodeStatus.RUNNING) return; |
+ _declareController.done(); |
+ } |
+ |
+ /// If `apply` is running, all previous phases have declared their outputs, |
+ /// and all primary inputs are available and thus have been passed to the |
+ /// transformer, mark [_applyController] as done. |
+ void _maybeFinishApplyController() { |
+ if (_applyController == null) return; |
+ if (_primaries.any((input) => !input.state.isAvailable)) return; |
+ if (phase.previous.status == NodeStatus.RUNNING) return; |
+ _applyController.done(); |
} |
BarbackException _wrapException(error, StackTrace stackTrace) { |
@@ -479,62 +713,67 @@ class TransformNode { |
} |
} |
- /// Emit a warning about the transformer on [id]. |
- void _warn(String message) { |
- _streams.onLogController.add( |
- new LogEntry(info, primary.id, LogLevel.WARNING, message, null)); |
- } |
- |
String toString() => |
- "transform node in $_location for $transformer on $primary ($_state, " |
- "$status, ${_forced ? '' : 'un'}forced)"; |
+ "transform node in $_location for $transformer on ${info.primaryId} " |
+ "($_state, $status, ${_forced ? '' : 'un'}forced)"; |
} |
/// The enum of states that [TransformNode] can be in. |
class _State { |
- /// The transform is running [DeclaringTransformer.declareOutputs]. |
+ /// The transform is running [DeclaringAggregateTransformer.declareOutputs]. |
/// |
- /// This is the initial state of the transformer, and it will only occur once |
- /// since [DeclaringTransformer.declareOutputs] is independent of the contents |
- /// of the primary input. Once the method finishes running, this will |
- /// transition to [APPLYING] if the transform is non-lazy and the input is |
+ /// If the set of primary inputs changes while in this state, it will |
+ /// transition to [NEEDS_DECLARE]. If the [TransformNode] is still in this |
+ /// state when `declareOutputs` finishes running, it will transition to |
+ /// [APPLYING] if the transform is non-lazy and all of its primary inputs are |
/// available, and [DECLARED] otherwise. |
/// |
/// Non-declaring transformers will transition out of this state and into |
/// [APPLYING] immediately. |
- static final DECLARING = const _State._("declaring outputs"); |
+ static const DECLARING = const _State._("declaring outputs"); |
- /// The transform is deferred and has run |
- /// [DeclaringTransformer.declareOutputs] but hasn't yet been forced. |
+ /// The transform is running [AggregateTransformer.declareOutputs] or |
+ /// [AggregateTransform.apply], but a primary input was added or removed after |
+ /// it started, so it will need to re-run `declareOutputs`. |
/// |
- /// This will transition to [APPLYING] when one of the outputs has been |
- /// forced. |
- static final DECLARED = const _State._("declared"); |
+ /// The [TransformNode] will transition to [DECLARING] once `declareOutputs` |
+ /// or `apply` finishes running. |
+ static const NEEDS_DECLARE = const _State._("needs declare"); |
- /// The transform is running [Transformer.apply]. |
+ /// The transform is deferred and has run |
+ /// [DeclaringAggregateTransformer.declareOutputs] but hasn't yet been forced. |
/// |
- /// If an input changes while in this state, it will transition to |
- /// [NEEDS_APPLY]. If the [TransformNode] is still in this state when |
- /// [Transformer.apply] finishes running, it will transition to [APPLIED]. |
- static final APPLYING = const _State._("applying"); |
+ /// The [TransformNode] will transition to [APPLYING] when one of the outputs |
+ /// has been forced or if the transformer is non-lazy and all of its primary |
+ /// inputs become available. |
+ static const DECLARED = const _State._("declared"); |
- /// The transform is running [Transformer.apply], but an input changed after |
- /// it started, so it will need to re-run [Transformer.apply]. |
+ /// The transform is running [AggregateTransformer.apply]. |
/// |
- /// This will transition to [APPLYING] once [Transformer.apply] finishes |
- /// running. |
- static final NEEDS_APPLY = const _State._("needs apply"); |
- |
- /// The transform has finished running [Transformer.apply], whether or not it |
- /// emitted an error. |
+ /// If an input's contents change or a secondary input is added or removed |
+ /// while in this state, the [TransformNode] will transition to [NEEDS_APPLY]. |
+ /// If a primary input is added or removed, it will transition to |
+ /// [NEEDS_DECLARE]. If it's still in this state when `apply` finishes |
+ /// running, it will transition to [APPLIED]. |
+ static const APPLYING = const _State._("applying"); |
+ |
+ /// The transform is running [AggregateTransformer.apply], but an input's |
+ /// contents changed or a secondary input was added or removed after it |
+ /// started, so it will need to re-run `apply`. |
/// |
- /// If the transformer is deferred, the [TransformNode] can also be in this |
- /// state when [Transformer.declareOutputs] has been run but |
- /// [Transformer.apply] has not. |
+ /// If a primary input is added or removed while in this state, the |
+ /// [TranformNode] will transition to [NEEDS_DECLARE]. If it's still in this |
+ /// state when `apply` finishes running, it will transition to [APPLYING]. |
+ static const NEEDS_APPLY = const _State._("needs apply"); |
+ |
+ /// The transform has finished running [AggregateTransformer.apply], whether |
+ /// or not it emitted an error. |
/// |
- /// If an input changes, this will transition to [DECLARED] if the transform |
- /// is deferred and [APPLYING] otherwise. |
- static final APPLIED = const _State._("applied"); |
+ /// If an input's contents change or a secondary input is added or removed, |
+ /// the [TransformNode] will transition to [DECLARED] if the transform is |
+ /// declaring and [APPLYING] otherwise. If a primary input is added or |
+ /// removed, this will transition to [DECLARING]. |
+ static const APPLIED = const _State._("applied"); |
final String name; |