Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(248)

Unified Diff: pkg/barback/lib/src/graph/transform_node.dart

Issue 267393009: Transition barback's infrastructure to an aggregate-based model. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: code review Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « pkg/barback/lib/src/errors.dart ('k') | pkg/barback/lib/src/graph/transformer_classifier.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: pkg/barback/lib/src/graph/transform_node.dart
diff --git a/pkg/barback/lib/src/graph/transform_node.dart b/pkg/barback/lib/src/graph/transform_node.dart
index 75a76a26e22714d8411211ecf35f00762cc65d99..5ef4b8108363930ad4b13acd4852057677bd5a89 100644
--- a/pkg/barback/lib/src/graph/transform_node.dart
+++ b/pkg/barback/lib/src/graph/transform_node.dart
@@ -9,15 +9,15 @@ import 'dart:async';
import '../asset/asset.dart';
import '../asset/asset_id.dart';
import '../asset/asset_node.dart';
+import '../asset/asset_node_set.dart';
import '../errors.dart';
import '../log.dart';
import '../transformer/aggregate_transform.dart';
+import '../transformer/aggregate_transformer.dart';
import '../transformer/declaring_aggregate_transform.dart';
-import '../transformer/declaring_transform.dart';
-import '../transformer/declaring_transformer.dart';
-import '../transformer/lazy_transformer.dart';
-import '../transformer/transform.dart';
-import '../transformer/transformer.dart';
+import '../transformer/declaring_aggregate_transformer.dart';
+import '../transformer/lazy_aggregate_transformer.dart';
+import '../utils.dart';
import 'node_status.dart';
import 'node_streams.dart';
import 'phase.dart';
@@ -34,20 +34,23 @@ class TransformNode {
/// The [Phase] that this transform runs in.
final Phase phase;
- /// The [Transformer] to apply to this node's inputs.
- final Transformer transformer;
+ /// The [AggregateTransformer] to apply to this node's inputs.
+ final AggregateTransformer transformer;
- /// The node for the primary asset this transform depends on.
- final AssetNode primary;
+ /// The primary asset nodes this transform runs on.
+ final _primaries = new AssetNodeSet();
/// A string describing the location of [this] in the transformer graph.
final String _location;
- /// The subscription to [primary]'s [AssetNode.onStateChange] stream.
- StreamSubscription _primarySubscription;
+ /// The subscription to the [_primaries]' [AssetNode.onStateChange] streams.
+ final _primarySubscriptions = new Map<AssetId, StreamSubscription>();
/// The subscription to [phase]'s [Phase.onAsset] stream.
- StreamSubscription<AssetNode> _phaseSubscription;
+ StreamSubscription<AssetNode> _phaseAssetSubscription;
+
+ /// The subscription to [phase]'s [Phase.onStatusChange] stream.
+ StreamSubscription<NodeStatus> _phaseStatusSubscription;
/// How far along [this] is in processing its assets.
NodeStatus get status {
@@ -55,19 +58,28 @@ class TransformNode {
return NodeStatus.IDLE;
}
- if (_declaring && _state != _State.DECLARING) {
+ if (_declaring && _state != _State.DECLARING &&
+ _state != _State.NEEDS_DECLARE) {
return NodeStatus.MATERIALIZING;
} else {
return NodeStatus.RUNNING;
}
}
+ /// The [TransformInfo] describing this node.
+ ///
+ /// [TransformInfo] is the publicly-visible representation of a transform
+ /// node.
+ TransformInfo get info => new TransformInfo(transformer,
+ new AssetId(phase.cascade.package, key));
+
/// Whether this is a declaring transform.
///
- /// This is usually identical to `transformer is DeclaringTransformer`, but if
- /// a declaring and non-lazy transformer emits an error during
- /// `declareOutputs` it's treated as though it wasn't declaring.
- bool get _declaring => transformer is DeclaringTransformer &&
+ /// This is usually identical to `transformer is
+ /// DeclaringAggregateTransformer`, but if a declaring and non-lazy
+ /// transformer emits an error during `declareOutputs` it's treated as though
+ /// it wasn't declaring.
+ bool get _declaring => transformer is DeclaringAggregateTransformer &&
(_state == _State.DECLARING || _declaredOutputs != null);
/// Whether this transform has been forced since it last finished applying.
@@ -77,8 +89,9 @@ class TransformNode {
/// transformers, since they always need to eagerly generate outputs.
bool _forced;
- /// The subscriptions to each input's [AssetNode.onStateChange] stream.
- final _inputSubscriptions = new Map<AssetId, StreamSubscription>();
+ /// The subscriptions to each secondary input's [AssetNode.onStateChange]
+ /// stream.
+ final _secondarySubscriptions = new Map<AssetId, StreamSubscription>();
/// The controllers for the asset nodes emitted by this node.
final _outputControllers = new Map<AssetId, AssetNodeController>();
@@ -87,14 +100,14 @@ class TransformNode {
/// ran.
final _missingInputs = new Set<AssetId>();
- /// The controller that's used to pass [primary] through [this] if it's not
- /// consumed or overwritten.
+ /// The controllers that are used to pass each primary input through [this] if
+ /// it's not consumed or overwritten.
///
/// This needs an intervening controller to ensure that the output can be
/// marked dirty when determining whether [this] will consume or overwrite it,
- /// and be marked removed if it does. [_passThroughController] will be null
- /// if the asset is not being passed through.
- AssetNodeController _passThroughController;
+ /// and be marked removed if it does. No pass-through controller will exist
+ /// for primary inputs that are not being passed through.
+ final _passThroughControllers = new Map<AssetId, AssetNodeController>();
/// The asset node for this transform.
final _streams = new NodeStreams();
@@ -103,7 +116,7 @@ class TransformNode {
Stream<LogEntry> get onLog => _streams.onLog;
/// The current state of [this].
- var _state = _State.DECLARING;
+ var _state = _State.DECLARED;
/// Whether [this] has been marked as removed.
bool get _isRemoved => _streams.onAssetController.isClosed;
@@ -112,60 +125,87 @@ class TransformNode {
// can run [apply] even if [force] hasn't been called, since [transformer]
// should run eagerly if possible.
bool get _canRunDeclaringEagerly =>
- _declaring && transformer is! LazyTransformer &&
- primary.state.isAvailable;
+ _declaring && transformer is! LazyAggregateTransformer &&
+ _primaries.every((input) => input.state.isAvailable);
- /// Whether the most recent run of this transform has declared that it
- /// consumes the primary input.
+ /// Which primary inputs the most recent run of this transform has declared
+ /// that it consumes.
///
- /// Defaults to `false`. This is not meaningful unless [_state] is
- /// [_State.APPLIED] or [_State.DECLARED].
- bool _consumePrimary = false;
+ /// This starts out `null`, indicating that the transform hasn't declared
+ /// anything yet. This is not meaningful unless [_state] is [_State.APPLIED]
+ /// or [_State.DECLARED].
+ Set<AssetId> _consumedPrimaries;
/// The set of output ids that [transformer] declared it would emit.
///
- /// This is only non-null if [transformer] is a [DeclaringTransformer] and its
- /// [declareOutputs] has been run successfully.
+ /// This is only non-null if [transformer] is a
+ /// [DeclaringAggregateTransformer] and its [declareOutputs] has been run
+ /// successfully.
Set<AssetId> _declaredOutputs;
- TransformNode(this.phase, Transformer transformer, AssetNode primary,
- this._location)
- : key = primary.id.path,
- transformer = transformer,
- primary = primary {
- _forced = transformer is! DeclaringTransformer;
- if (_forced) primary.force();
+ /// The controller for the currently-running
+ /// [DeclaringAggregateTransformer.declareOutputs] call's
+ /// [DeclaringAggregateTransform].
+ ///
+ /// This will be non-`null` when
+ /// [DeclaringAggregateTransformer.declareOutputs] is running. This means that
+ /// it's always non-`null` when [_state] is [_State.DECLARING], sometimes
+ /// non-`null` when it's [_State.NEEDS_DECLARE], and always `null` otherwise.
+ DeclaringAggregateTransformController _declareController;
+
+ /// The controller for the currently-running [AggregateTransformer.apply]
+ /// call's [AggregateTransform].
+ ///
+ /// This will be non-`null` when [AggregateTransform.apply] is running, which
+ /// means that it's always non-`null` when [_state] is [_State.APPLYING] or
+ /// [_State.NEEDS_APPLY], sometimes non-`null` when it's
+ /// [_State.NEEDS_DECLARE], and always `null` otherwise.
+ AggregateTransformController _applyController;
- _primarySubscription = primary.onStateChange.listen((state) {
- if (state.isRemoved) {
- remove();
- } else {
- if (_forced) primary.force();
- _dirty();
- }
- });
+ TransformNode(this.phase, this.transformer, this.key, this._location) {
+ _forced = transformer is! DeclaringAggregateTransformer;
- _phaseSubscription = phase.previous.onAsset.listen((node) {
+ _phaseAssetSubscription = phase.previous.onAsset.listen((node) {
if (!_missingInputs.contains(node.id)) return;
if (_forced) node.force();
_dirty();
});
- _declareOutputs().then((_) {
- if (_forced || _canRunDeclaringEagerly) {
- _apply();
- } else {
- _state = _State.DECLARED;
- _streams.changeStatus(NodeStatus.IDLE);
- }
+ _phaseStatusSubscription = phase.previous.onStatusChange.listen((status) {
+ if (status == NodeStatus.RUNNING) return;
+
+ _maybeFinishDeclareController();
+ _maybeFinishApplyController();
});
+
+ _run();
}
- /// The [TransformInfo] describing this node.
- ///
- /// [TransformInfo] is the publicly-visible representation of a transform
- /// node.
- TransformInfo get info => new TransformInfo(transformer, primary.id);
+ /// Adds [input] as a primary input for this node.
+ void addPrimary(AssetNode input) {
+ _primaries.add(input);
+ if (_forced) input.force();
+
+ _primarySubscriptions[input.id] = input.onStateChange
+ .listen((_) => _onPrimaryStateChange(input));
+
+ if (_state == _State.DECLARING && !_declareController.isDone) {
+ // If we're running `declareOutputs` and its id stream isn't closed yet,
+ // pass this in as another id.
+ _declareController.addId(input.id);
+ _maybeFinishDeclareController();
+ } else if (_state == _State.APPLYING) {
+ // If we're running `apply`, we need to wait until [input] is available
+ // before we pass it into the stream. If it's available now, great; if
+ // not, [_onPrimaryStateChange] will handle it.
+ if (!input.state.isAvailable) return;
+ _onPrimaryStateChange(input);
+ _maybeFinishApplyController();
+ } else {
+ // Otherwise, a new input means we'll need to re-run `declareOutputs`.
+ _restartRun();
+ }
+ }
/// Marks this transform as removed.
///
@@ -175,41 +215,58 @@ class TransformNode {
/// valid even if its primary input still exists.
void remove() {
_streams.close();
- _primarySubscription.cancel();
- _phaseSubscription.cancel();
- _clearInputSubscriptions();
+ _phaseAssetSubscription.cancel();
+ _phaseStatusSubscription.cancel();
+ if (_declareController != null) _declareController.cancel();
+ if (_applyController != null) _applyController.cancel();
+ _clearSecondarySubscriptions();
_clearOutputs();
- if (_passThroughController != null) {
- _passThroughController.setRemoved();
- _passThroughController = null;
+
+ for (var subscription in _primarySubscriptions.values) {
+ subscription.cancel();
+ }
+ _primarySubscriptions.clear();
+
+ for (var controller in _passThroughControllers.values) {
+ controller.setRemoved();
}
+ _passThroughControllers.clear();
}
/// If [this] is deferred, ensures that its concrete outputs will be
/// generated.
void force() {
if (_forced || _state == _State.APPLIED) return;
- primary.force();
+ for (var input in _primaries) {
+ input.force();
+ }
+
_forced = true;
- if (_state == _State.DECLARED) _dirty();
+ if (_state == _State.DECLARED) _apply();
}
/// Marks this transform as dirty.
///
- /// This causes all of the transform's outputs to be marked as dirty as well.
+ /// Specifically, this should be called when one of the transform's inputs'
+ /// contents change, or when a secondary input is removed. Primary inputs
+ /// being added or removed are handled by [addInput] and
+ /// [_onPrimaryStateChange].
void _dirty() {
- // If we're in the process of running [declareOutputs], we already know that
- // [apply] needs to be run so there's nothing we need to mark as dirty.
- if (_state == _State.DECLARING) return;
+ if (_state == _State.DECLARING || _state == _State.NEEDS_DECLARE ||
+ _state == _State.NEEDS_APPLY) {
+ // If we already know that [_apply] needs to be run, there's nothing to do
+ // here.
+ return;
+ }
if (!_forced && !_canRunDeclaringEagerly) {
// [forced] should only ever be false for a declaring transformer.
assert(_declaring);
- // If we've finished applying, transition to MATERIALIZING, indicating
- // that we know what outputs [apply] will emit but we're waiting to emit
- // them concretely until [force] is called. If we're still applying, we'll
- // transition to MATERIALIZING once we finish.
+ // If we've finished applying, transition to DECLARED, indicating that we
+ // know what outputs [apply] will emit but we're waiting to emit them
+ // concretely until [force] is called. If we're still applying, we'll
+ // transition to DECLARED once we finish.
if (_state == _State.APPLIED) _state = _State.DECLARED;
for (var controller in _outputControllers.values) {
controller.setLazy(force);
@@ -218,11 +275,6 @@ class TransformNode {
return;
}
- if (_passThroughController != null) _passThroughController.setDirty();
- for (var controller in _outputControllers.values) {
- controller.setDirty();
- }
-
if (_state == _State.APPLIED) {
if (_declaredOutputs != null) _emitDeclaredOutputs();
_apply();
@@ -233,21 +285,136 @@ class TransformNode {
}
}
+ /// The callback called when [input]'s state changes.
+ void _onPrimaryStateChange(AssetNode input) {
+ if (input.state.isRemoved) {
+ _primarySubscriptions.remove(input.id);
+
+ if (_primaries.isEmpty) {
+ // If there are no more primary inputs, there's no more use for this
+ // node in the graph. It will be re-created by its
+ // [TransformerClassifier] if a new input with [key] is added.
+ remove();
+ return;
+ }
+
+ // Any change to the number of primary inputs requires that we re-run the
+ // transformation.
+ _restartRun();
+ } else if (input.state.isAvailable) {
+ if (_state == _State.DECLARED && _canRunDeclaringEagerly) {
+ // If [this] is fully declared but hasn't started applying, this input
+ // becoming available may mean that all inputs are available, in which
+ // case we can run apply eagerly.
+ _apply();
+ return;
+ }
+
+ // If we're not actively passing concrete assets to the transformer, the
+ // distinction between a dirty asset and an available one isn't relevant.
+ if (_state != _State.APPLYING) return;
+
+ if (_applyController.isDone) {
+ // If we get a new asset after we've closed the asset stream, we need to
+ // re-run declare and then apply.
+ _restartRun();
+ } else {
+ // If the new asset comes before the asset stream is done, we can just
+ // pass it to the stream.
+ _applyController.addInput(input.asset);
+ _maybeFinishApplyController();
+ }
+ } else {
+ if (_forced) input.force();
+ if (_state == _State.APPLYING && !_applyController.addedId(input.id)) {
+ // If the input hasn't yet been added to the transform's input stream,
+ // there's no need to consider the transformation dirty.
+ return;
+ }
+ _dirty();
+ }
+ }
+
+ /// Run the entire transformation, including both `declareOutputs` (if
+ /// applicable) and `apply`.
+ void _run() {
+ assert(_state != _State.DECLARING);
+ assert(_state != _State.APPLYING);
+
+ _markOutputsDirty();
+ _declareOutputs(() {
+ if (_forced || _canRunDeclaringEagerly) {
+ _apply();
+ } else {
+ _state = _State.DECLARED;
+ _streams.changeStatus(NodeStatus.IDLE);
+ }
+ });
+ }
+
+ /// Restart the entire transformation, including `declareOutputs` if
+ /// applicable.
+ void _restartRun() {
+ if (_state == _State.DECLARED || _state == _State.APPLIED) {
+ // If we're currently idle, we can restart the transformation immediately.
+ _run();
+ return;
+ }
+
+ // If we're actively running `declareOutputs` or `apply`, cancel the
+ // transforms and transition to `NEEDS_DECLARE`. Once the transformer's
+ // method returns, we'll transition to `DECLARING`.
+ if (_declareController != null) _declareController.cancel();
+ if (_applyController != null) _applyController.cancel();
+ _state = _State.NEEDS_DECLARE;
+ }
+
/// Runs [transform.declareOutputs] and emits the resulting assets as dirty
/// assets.
- Future _declareOutputs() {
- if (transformer is! DeclaringTransformer) return new Future.value();
+ ///
+ /// Calls [callback] when it's finished. This doesn't return a future so that
+ /// [callback] is called synchronously if there are no outputs to declare. If
+ /// [this] is removed while inputs are being declared, [callback] will not be
+ /// called.
+ void _declareOutputs(void callback()) {
+ if (transformer is! DeclaringAggregateTransformer) {
+ callback();
+ return;
+ }
+ _state = _State.DECLARING;
var controller = new DeclaringAggregateTransformController(this);
+ _declareController = controller;
_streams.onLogPool.add(controller.onLog);
- controller.idController.add(primary.id);
- return newDeclaringTransform(controller.transform).then((transform) {
- return (transformer as DeclaringTransformer).declareOutputs(transform);
+ for (var primary in _primaries) {
+ controller.addId(primary.id);
+ }
+
+ syncFuture(() {
+ return (transformer as DeclaringAggregateTransformer)
+ .declareOutputs(controller.transform);
+ }).whenComplete(() {
+ // Cancel the controller here even if `declareOutputs` wasn't interrupted.
+ // Since the declaration is finished, we want to close out the
+ // controller's streams.
+ controller.cancel();
+ _declareController = null;
}).then((_) {
if (_isRemoved) return;
- if (controller.loggedError) return;
+ if (_state == _State.NEEDS_DECLARE) {
+ _declareOutputs(callback);
+ return;
+ }
- _consumePrimary = controller.consumedPrimaries.contains(primary.id);
+ if (controller.loggedError) {
+ // If `declareOutputs` fails, fall back to treating a declaring
+ // transformer as though it were eager.
+ if (transformer is! LazyAggregateTransformer) _forced = true;
+ callback();
+ return;
+ }
+
+ _consumedPrimaries = controller.consumedPrimaries;
_declaredOutputs = controller.outputIds;
var invalidIds = _declaredOutputs
.where((id) => id.package != phase.cascade.package).toSet();
@@ -257,12 +424,17 @@ class TransformNode {
phase.cascade.reportError(new InvalidOutputException(info, id));
}
- if (!_declaredOutputs.contains(primary.id)) _emitPassThrough();
+ for (var primary in _primaries) {
+ if (_declaredOutputs.contains(primary.id)) continue;
+ _passThrough(primary.id);
+ }
_emitDeclaredOutputs();
+ callback();
}).catchError((error, stackTrace) {
if (_isRemoved) return;
- if (transformer is! LazyTransformer) _forced = true;
+ if (transformer is! LazyAggregateTransformer) _forced = true;
phase.cascade.reportError(_wrapException(error, stackTrace));
+ callback();
});
}
@@ -284,13 +456,26 @@ class TransformNode {
}
}
+ //// Mark all emitted and passed-through outputs of this transform as dirty.
+ void _markOutputsDirty() {
+ for (var controller in _passThroughControllers.values) {
+ controller.setDirty();
+ }
+ for (var controller in _outputControllers.values) {
+ if (_forced) {
+ controller.setDirty();
+ } else {
+ controller.setLazy(force);
+ }
+ }
+ }
+
/// Applies this transform.
void _apply() {
assert(!_isRemoved);
- // Clear input subscriptions here as well as in [_process] because [_apply]
- // may be restarted independently if only a secondary input changes.
- _clearInputSubscriptions();
+ _markOutputsDirty();
+ _clearSecondarySubscriptions();
_state = _State.APPLYING;
_streams.changeStatus(status);
_runApply().then((hadError) {
@@ -298,8 +483,19 @@ class TransformNode {
if (_state == _State.DECLARED) return;
+ if (_state == _State.NEEDS_DECLARE) {
+ _run();
+ return;
+ }
+
+ // If an input's contents changed while running `apply`, retry unless the
+ // transformer is deferred and hasn't been forced.
if (_state == _State.NEEDS_APPLY) {
- _apply();
+ if (_forced || _canRunDeclaringEagerly) {
+ _apply();
+ } else {
+ _state = _State.DECLARED;
+ }
return;
}
@@ -309,14 +505,18 @@ class TransformNode {
if (hadError) {
_clearOutputs();
// If the transformer threw an error, we don't want to emit the
- // pass-through asset in case it will be overwritten by the transformer.
- // However, if the transformer declared that it wouldn't overwrite or
- // consume the pass-through asset, we can safely emit it.
- if (_declaredOutputs != null && !_consumePrimary &&
- !_declaredOutputs.contains(primary.id)) {
- _emitPassThrough();
- } else {
- _dontEmitPassThrough();
+ // pass-through assets in case they'll be overwritten by the
+ // transformer. However, if the transformer declared that it wouldn't
+ // overwrite or consume a pass-through asset, we can safely emit it.
+ if (_declaredOutputs != null) {
+ for (var input in _primaries) {
+ if (_consumedPrimaries.contains(input.id) ||
+ _declaredOutputs.contains(input.id)) {
+ _consumePrimary(input.id);
+ } else {
+ _passThrough(input.id);
+ }
+ }
}
}
@@ -338,30 +538,39 @@ class TransformNode {
throw new AssetNotFoundException(id);
}
- _inputSubscriptions.putIfAbsent(node.id, () {
- return node.onStateChange.listen((state) => _dirty());
+ _secondarySubscriptions.putIfAbsent(node.id, () {
+ return node.onStateChange.listen((_) => _dirty());
});
return node.asset;
});
}
- /// Run [Transformer.apply] as soon as [primary] is available.
+ /// Run [AggregateTransformer.apply].
///
/// Returns whether or not an error occurred while running the transformer.
Future<bool> _runApply() {
var controller = new AggregateTransformController(this);
+ _applyController = controller;
_streams.onLogPool.add(controller.onLog);
+ for (var primary in _primaries) {
+ if (!primary.state.isAvailable) continue;
+ controller.addInput(primary.asset);
+ }
- return primary.whenAvailable((_) {
- if (_isRemoved) return null;
- _state = _State.APPLYING;
- controller.inputController.add(primary.asset);
- return newTransform(controller.transform).then((transform) {
- return transformer.apply(transform);
- });
+ return syncFuture(() {
+ return transformer.apply(controller.transform);
+ }).whenComplete(() {
+ // Cancel the controller here even if `apply` wasn't interrupted. Since
+ // the apply is finished, we want to close out the controller's streams.
+ controller.cancel();
+ _applyController = null;
}).then((_) {
- if (!_forced && !primary.state.isAvailable) {
+ assert(_state != _State.DECLARED);
+ assert(_state != _State.DECLARING);
+ assert(_state != _State.APPLIED);
+
+ if (!_forced && _primaries.any((node) => !node.state.isAvailable)) {
_state = _State.DECLARED;
_streams.changeStatus(NodeStatus.IDLE);
return false;
@@ -369,7 +578,7 @@ class TransformNode {
if (_isRemoved) return false;
if (_state == _State.NEEDS_APPLY) return false;
- if (_state == _State.DECLARING) return false;
+ if (_state == _State.NEEDS_DECLARE) return false;
if (controller.loggedError) return true;
_handleApplyResults(controller);
return false;
@@ -390,7 +599,7 @@ class TransformNode {
/// [controller] should be the controller for the [AggegateTransform] passed
/// to [AggregateTransformer.apply].
void _handleApplyResults(AggregateTransformController controller) {
- _consumePrimary = controller.consumedPrimaries.contains(primary.id);
+ _consumedPrimaries = controller.consumedPrimaries;
var newOutputs = controller.outputs;
// Any ids that are for a different package are invalid.
@@ -410,12 +619,14 @@ class TransformNode {
_outputControllers.remove(id).setRemoved();
}
- // Emit or stop emitting the pass-through asset between removing and
- // adding outputs to ensure there are no collisions.
- if (!_consumePrimary && !newOutputs.containsId(primary.id)) {
- _emitPassThrough();
- } else {
- _dontEmitPassThrough();
+ // Emit or stop emitting pass-through assets between removing and adding
+ // outputs to ensure there are no collisions.
+ for (var id in _primaries.map((node) => node.id)) {
+ if (_consumedPrimaries.contains(id) || newOutputs.containsId(id)) {
+ _consumePrimary(id);
+ } else {
+ _passThrough(id);
+ }
}
// Store any new outputs or new contents for existing outputs.
@@ -432,12 +643,12 @@ class TransformNode {
}
/// Cancels all subscriptions to secondary input nodes.
- void _clearInputSubscriptions() {
+ void _clearSecondarySubscriptions() {
_missingInputs.clear();
- for (var subscription in _inputSubscriptions.values) {
+ for (var subscription in _secondarySubscriptions.values) {
subscription.cancel();
}
- _inputSubscriptions.clear();
+ _secondarySubscriptions.clear();
}
/// Removes all output assets.
@@ -449,26 +660,49 @@ class TransformNode {
_outputControllers.clear();
}
- /// Emit the pass-through asset if it's not being emitted already.
- void _emitPassThrough() {
- assert(!_outputControllers.containsKey(primary.id));
-
- if (_consumePrimary) return;
- if (_passThroughController == null) {
- _passThroughController = new AssetNodeController.from(primary);
- _streams.onAssetController.add(_passThroughController.node);
+ /// Emit the pass-through node for the primary input [id] if it's not being
+ /// emitted already.
+ void _passThrough(AssetId id) {
+ assert(!_outputControllers.containsKey(id));
+
+ if (_consumedPrimaries.contains(id)) return;
+ var controller = _passThroughControllers[id];
+ var primary = _primaries[id];
+ if (controller == null) {
+ controller = new AssetNodeController.from(primary);
+ _passThroughControllers[id] = controller;
+ _streams.onAssetController.add(controller.node);
} else if (primary.state.isDirty) {
- _passThroughController.setDirty();
- } else if (!_passThroughController.node.state.isAvailable) {
- _passThroughController.setAvailable(primary.asset);
+ controller.setDirty();
+ } else if (!controller.node.state.isAvailable) {
+ controller.setAvailable(primary.asset);
}
}
- /// Stop emitting the pass-through asset if it's being emitted already.
- void _dontEmitPassThrough() {
- if (_passThroughController == null) return;
- _passThroughController.setRemoved();
- _passThroughController = null;
+ /// Stops emitting the pass-through node for the primary input [id] if it's
+ /// being emitted.
+ void _consumePrimary(AssetId id) {
+ var controller = _passThroughControllers.remove(id);
+ if (controller == null) return;
+ controller.setRemoved();
+ }
+
+ /// If `declareOutputs` is running and all previous phases have declared their
+ /// outputs, mark [_declareController] as done.
+ void _maybeFinishDeclareController() {
+ if (_declareController == null) return;
+ if (phase.previous.status == NodeStatus.RUNNING) return;
+ _declareController.done();
+ }
+
+ /// If `apply` is running, all previous phases have declared their outputs,
+ /// and all primary inputs are available and thus have been passed to the
+ /// transformer, mark [_applyController] as done.
+ void _maybeFinishApplyController() {
+ if (_applyController == null) return;
+ if (_primaries.any((input) => !input.state.isAvailable)) return;
+ if (phase.previous.status == NodeStatus.RUNNING) return;
+ _applyController.done();
}
BarbackException _wrapException(error, StackTrace stackTrace) {
@@ -479,62 +713,67 @@ class TransformNode {
}
}
- /// Emit a warning about the transformer on [id].
- void _warn(String message) {
- _streams.onLogController.add(
- new LogEntry(info, primary.id, LogLevel.WARNING, message, null));
- }
-
String toString() =>
- "transform node in $_location for $transformer on $primary ($_state, "
- "$status, ${_forced ? '' : 'un'}forced)";
+ "transform node in $_location for $transformer on ${info.primaryId} "
+ "($_state, $status, ${_forced ? '' : 'un'}forced)";
}
/// The enum of states that [TransformNode] can be in.
class _State {
- /// The transform is running [DeclaringTransformer.declareOutputs].
+ /// The transform is running [DeclaringAggregateTransformer.declareOutputs].
///
- /// This is the initial state of the transformer, and it will only occur once
- /// since [DeclaringTransformer.declareOutputs] is independent of the contents
- /// of the primary input. Once the method finishes running, this will
- /// transition to [APPLYING] if the transform is non-lazy and the input is
+ /// If the set of primary inputs changes while in this state, it will
+ /// transition to [NEEDS_DECLARE]. If the [TransformNode] is still in this
+ /// state when `declareOutputs` finishes running, it will transition to
+ /// [APPLYING] if the transform is non-lazy and all of its primary inputs are
/// available, and [DECLARED] otherwise.
///
/// Non-declaring transformers will transition out of this state and into
/// [APPLYING] immediately.
- static final DECLARING = const _State._("declaring outputs");
+ static const DECLARING = const _State._("declaring outputs");
- /// The transform is deferred and has run
- /// [DeclaringTransformer.declareOutputs] but hasn't yet been forced.
+ /// The transform is running [AggregateTransformer.declareOutputs] or
+ /// [AggregateTransform.apply], but a primary input was added or removed after
+ /// it started, so it will need to re-run `declareOutputs`.
///
- /// This will transition to [APPLYING] when one of the outputs has been
- /// forced.
- static final DECLARED = const _State._("declared");
+ /// The [TransformNode] will transition to [DECLARING] once `declareOutputs`
+ /// or `apply` finishes running.
+ static const NEEDS_DECLARE = const _State._("needs declare");
- /// The transform is running [Transformer.apply].
+ /// The transform is deferred and has run
+ /// [DeclaringAggregateTransformer.declareOutputs] but hasn't yet been forced.
///
- /// If an input changes while in this state, it will transition to
- /// [NEEDS_APPLY]. If the [TransformNode] is still in this state when
- /// [Transformer.apply] finishes running, it will transition to [APPLIED].
- static final APPLYING = const _State._("applying");
+ /// The [TransformNode] will transition to [APPLYING] when one of the outputs
+ /// has been forced or if the transformer is non-lazy and all of its primary
+ /// inputs become available.
+ static const DECLARED = const _State._("declared");
- /// The transform is running [Transformer.apply], but an input changed after
- /// it started, so it will need to re-run [Transformer.apply].
+ /// The transform is running [AggregateTransformer.apply].
///
- /// This will transition to [APPLYING] once [Transformer.apply] finishes
- /// running.
- static final NEEDS_APPLY = const _State._("needs apply");
-
- /// The transform has finished running [Transformer.apply], whether or not it
- /// emitted an error.
+ /// If an input's contents change or a secondary input is added or removed
+ /// while in this state, the [TransformNode] will transition to [NEEDS_APPLY].
+ /// If a primary input is added or removed, it will transition to
+ /// [NEEDS_DECLARE]. If it's still in this state when `apply` finishes
+ /// running, it will transition to [APPLIED].
+ static const APPLYING = const _State._("applying");
+
+ /// The transform is running [AggregateTransformer.apply], but an input's
+ /// contents changed or a secondary input was added or removed after it
+ /// started, so it will need to re-run `apply`.
///
- /// If the transformer is deferred, the [TransformNode] can also be in this
- /// state when [Transformer.declareOutputs] has been run but
- /// [Transformer.apply] has not.
+ /// If a primary input is added or removed while in this state, the
+ /// [TranformNode] will transition to [NEEDS_DECLARE]. If it's still in this
+ /// state when `apply` finishes running, it will transition to [APPLYING].
+ static const NEEDS_APPLY = const _State._("needs apply");
+
+ /// The transform has finished running [AggregateTransformer.apply], whether
+ /// or not it emitted an error.
///
- /// If an input changes, this will transition to [DECLARED] if the transform
- /// is deferred and [APPLYING] otherwise.
- static final APPLIED = const _State._("applied");
+ /// If an input's contents change or a secondary input is added or removed,
+ /// the [TransformNode] will transition to [DECLARED] if the transform is
+ /// declaring and [APPLYING] otherwise. If a primary input is added or
+ /// removed, this will transition to [DECLARING].
+ static const APPLIED = const _State._("applied");
final String name;
« no previous file with comments | « pkg/barback/lib/src/errors.dart ('k') | pkg/barback/lib/src/graph/transformer_classifier.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698