Index: pkg/barback/lib/src/transform_node.dart |
diff --git a/pkg/barback/lib/src/transform_node.dart b/pkg/barback/lib/src/transform_node.dart |
index e31d5e8f350d5ec04f298fd33d9d5658ce8cf8e5..ecd8d8f677f6b481fc6ca1b586bab96a51542974 100644 |
--- a/pkg/barback/lib/src/transform_node.dart |
+++ b/pkg/barback/lib/src/transform_node.dart |
@@ -10,6 +10,7 @@ import 'asset.dart'; |
import 'asset_id.dart'; |
import 'asset_node.dart'; |
import 'declaring_transform.dart'; |
+import 'declaring_transformer.dart'; |
import 'errors.dart'; |
import 'lazy_transformer.dart'; |
import 'log.dart'; |
@@ -44,7 +45,7 @@ class TransformNode { |
StreamSubscription<AssetNode> _phaseSubscription; |
/// Whether [this] is dirty and still has more processing to do. |
- bool get isDirty => !_state.isDone; |
+ bool get isDirty => _state != _State.NOT_PRIMARY && _state != _State.APPLIED; |
/// Whether [transformer] is lazy and this transform has yet to be forced. |
bool _isLazy; |
@@ -55,6 +56,8 @@ class TransformNode { |
/// The controllers for the asset nodes emitted by this node. |
final _outputControllers = new Map<AssetId, AssetNodeController>(); |
+ /// The ids of inputs the transformer tried and failed to read last time it |
+ /// ran. |
final _missingInputs = new Set<AssetId>(); |
/// The controller that's used to pass [primary] through [this] if it's not |
@@ -89,8 +92,11 @@ class TransformNode { |
Stream<LogEntry> get onLog => _onLogPool.stream; |
final _onLogPool = new StreamPool<LogEntry>.broadcast(); |
+ /// A controller for log entries emitted by this node. |
+ final _onLogController = new StreamController<LogEntry>.broadcast(sync: true); |
+ |
/// The current state of [this]. |
- var _state = _TransformNodeState.PROCESSING; |
+ var _state = _State.COMPUTING_IS_PRIMARY; |
/// Whether [this] has been marked as removed. |
bool get _isRemoved => _onAssetController.isClosed; |
@@ -99,26 +105,34 @@ class TransformNode { |
/// consumes the primary input. |
/// |
/// Defaults to `false`. This is not meaningful unless [_state] is |
- /// [_TransformNodeState.APPLIED]. |
+ /// [_State.APPLIED]. |
bool _consumePrimary = false; |
+ /// The set of output ids that [transformer] declared it would emit. |
+ /// |
+ /// This is only non-null if [transformer] is a [DeclaringTransformer] and its |
+ /// [declareOutputs] has been run successfully. |
+ Set<AssetId> _declaredOutputs; |
+ |
TransformNode(this.phase, Transformer transformer, this.primary, |
this._location) |
: transformer = transformer, |
_isLazy = transformer is LazyTransformer { |
+ _onLogPool.add(_onLogController.stream); |
+ |
_primarySubscription = primary.onStateChange.listen((state) { |
if (state.isRemoved) { |
remove(); |
} else { |
- _dirty(primaryChanged: true); |
+ _dirty(); |
} |
}); |
_phaseSubscription = phase.previous.onAsset.listen((node) { |
- if (_missingInputs.contains(node.id)) _dirty(primaryChanged: false); |
+ if (_missingInputs.contains(node.id)) _dirty(); |
}); |
- _process(); |
+ _isPrimary(); |
} |
/// The [TransformInfo] describing this node. |
@@ -134,6 +148,7 @@ class TransformNode { |
/// from the primary input, but it's possible for a transform to no longer be |
/// valid even if its primary input still exists. |
void remove() { |
+ _onLogController.close(); |
_onAssetController.close(); |
_onDoneController.close(); |
_primarySubscription.cancel(); |
@@ -153,49 +168,39 @@ class TransformNode { |
// transform's outputs have gone unused, we switch it back to lazy mode. |
if (!_isLazy) return; |
_isLazy = false; |
- _dirty(primaryChanged: false); |
+ _dirty(); |
} |
/// Marks this transform as dirty. |
/// |
/// This causes all of the transform's outputs to be marked as dirty as well. |
- /// [primaryChanged] should be true if and only if [this] was set dirty |
- /// because [primary] changed. |
- void _dirty({bool primaryChanged: false}) { |
- if (!primaryChanged && _state.isNotPrimary) return; |
+ void _dirty() { |
+ if (_state == _State.NOT_PRIMARY) { |
+ _emitPassThrough(); |
+ return; |
+ } |
+ if (_state == _State.COMPUTING_IS_PRIMARY || _isLazy) return; |
if (_passThroughController != null) _passThroughController.setDirty(); |
for (var controller in _outputControllers.values) { |
controller.setDirty(); |
} |
- if (_state.isDone) { |
- if (primaryChanged) { |
- _process(); |
- } else { |
- _apply(); |
- } |
- } else if (primaryChanged) { |
- _state = _TransformNodeState.NEEDS_IS_PRIMARY; |
- } else if (!_state.needsIsPrimary) { |
- _state = _TransformNodeState.NEEDS_APPLY; |
+ if (_state == _State.APPLIED) { |
+ _apply(); |
+ } else { |
+ _state = _State.NEEDS_APPLY; |
} |
} |
- /// Determines whether [primary] is primary for [transformer], and if so runs |
- /// [transformer.apply]. |
- void _process() { |
- // Clear all the old input subscriptions. If an input is re-used, we'll |
- // re-subscribe. |
- _clearInputSubscriptions(); |
- _state = _TransformNodeState.PROCESSING; |
- primary.whenAvailable((_) { |
- _state = _TransformNodeState.PROCESSING; |
- return transformer.isPrimary(primary.asset); |
- }).catchError((error, stackTrace) { |
- // If the transform became dirty while processing, ignore any errors from |
- // it. |
- if (_state.needsIsPrimary || _isRemoved) return false; |
+ /// Runs [transformer.isPrimary] and adjusts [this]'s state according to the |
+ /// result. |
+ /// |
+ /// This will also run [_declareOutputs] and/or [_apply] as appropriate. |
+ void _isPrimary() { |
+ syncFuture(() => transformer.isPrimary(primary.id)) |
+ .catchError((error, stackTrace) { |
+ if (_isRemoved) return false; |
// Catch all transformer errors and pipe them to the results stream. This |
// is so a broken transformer doesn't take down the whole graph. |
@@ -203,62 +208,96 @@ class TransformNode { |
return false; |
}).then((isPrimary) { |
+ if (_isRemoved) return null; |
+ if (isPrimary) { |
+ return _declareOutputs().then((_) { |
+ if (_isRemoved) return; |
+ if (_isLazy) { |
+ _state = _State.APPLIED; |
+ _onDoneController.add(null); |
+ } else { |
+ _apply(); |
+ } |
+ }); |
+ } |
+ |
+ _emitPassThrough(); |
+ _state = _State.NOT_PRIMARY; |
+ _onDoneController.add(null); |
+ }); |
+ } |
+ |
+ /// Runs [transform.declareOutputs] and emits the resulting assets as dirty |
+ /// assets. |
+ Future _declareOutputs() { |
+ if (transformer is! DeclaringTransformer) return new Future.value(); |
+ |
+ var controller = new DeclaringTransformController(this); |
+ return syncFuture(() { |
+ return (transformer as DeclaringTransformer) |
+ .declareOutputs(controller.transform); |
+ }).then((_) { |
if (_isRemoved) return; |
- if (_state.needsIsPrimary) { |
- _process(); |
- } else if (isPrimary) { |
- _apply(); |
- } else { |
- _clearOutputs(); |
- _emitPassThrough(); |
- _state = _TransformNodeState.NOT_PRIMARY; |
- _onDoneController.add(null); |
+ if (controller.loggedError) return; |
+ |
+ _consumePrimary = controller.consumePrimary; |
+ _declaredOutputs = controller.outputIds; |
+ var invalidIds = _declaredOutputs |
+ .where((id) => id.package != phase.cascade.package).toSet(); |
+ for (var id in invalidIds) { |
+ _declaredOutputs.remove(id); |
+ // TODO(nweiz): report this as a warning rather than a failing error. |
+ phase.cascade.reportError(new InvalidOutputException(info, id)); |
+ } |
+ |
+ if (!_declaredOutputs.contains(primary.id)) _emitPassThrough(); |
+ |
+ for (var id in _declaredOutputs) { |
+ var controller = transformer is LazyTransformer |
+ ? new AssetNodeController.lazy(id, force, this) |
+ : new AssetNodeController(id, this); |
+ _outputControllers[id] = controller; |
+ _onAssetController.add(controller.node); |
} |
+ }).catchError((error, stackTrace) { |
+ if (_isRemoved) return; |
+ phase.cascade.reportError(_wrapException(error, stackTrace)); |
}); |
} |
/// Applies this transform. |
void _apply() { |
- assert(!_onAssetController.isClosed); |
+ assert(!_isRemoved && !_isLazy); |
// Clear input subscriptions here as well as in [_process] because [_apply] |
// may be restarted independently if only a secondary input changes. |
_clearInputSubscriptions(); |
- _state = _TransformNodeState.PROCESSING; |
- primary.whenAvailable((_) { |
- if (_state.needsIsPrimary) return null; |
- _state = _TransformNodeState.PROCESSING; |
- // TODO(nweiz): If [transformer] is a [DeclaringTransformer] but not a |
- // [LazyTransformer], we can get some mileage out of doing a declarative |
- // first so we know how to hook up the assets. |
- if (_isLazy) return _declareLazy(); |
- return _applyImmediate(); |
- }).catchError((error, stackTrace) { |
- // If the transform became dirty while processing, ignore any errors from |
- // it. |
- if (!_state.isProcessing || _isRemoved) return false; |
- |
- // Catch all transformer errors and pipe them to the results stream. This |
- // is so a broken transformer doesn't take down the whole graph. |
- phase.cascade.reportError(_wrapException(error, stackTrace)); |
- return true; |
- }).then((hadError) { |
+ _state = _State.APPLYING; |
+ _runApply().then((hadError) { |
if (_isRemoved) return; |
- if (_state.needsIsPrimary) { |
- _process(); |
- } else if (_state.needsApply) { |
+ if (_state == _State.NEEDS_APPLY) { |
_apply(); |
- } else { |
- assert(_state.isProcessing); |
- if (hadError) { |
- _clearOutputs(); |
+ return; |
+ } |
+ |
+ assert(_state == _State.APPLYING); |
+ if (hadError) { |
+ _clearOutputs(); |
+ // If the transformer threw an error, we don't want to emit the |
+ // pass-through asset in case it will be overwritten by the transformer. |
+ // However, if the transformer declared that it wouldn't overwrite or |
+ // consume the pass-through asset, we can safely emit it. |
+ if (_declaredOutputs != null && !_consumePrimary && |
+ !_declaredOutputs.contains(primary.id)) { |
+ _emitPassThrough(); |
+ } else { |
_dontEmitPassThrough(); |
} |
- |
- _state = _TransformNodeState.APPLIED; |
- _onDoneController.add(null); |
} |
+ |
+ _state = _State.APPLIED; |
+ _onDoneController.add(null); |
}); |
} |
@@ -276,123 +315,95 @@ class TransformNode { |
} |
_inputSubscriptions.putIfAbsent(node.id, () { |
- return node.onStateChange.listen((_) => _dirty(primaryChanged: false)); |
+ return node.onStateChange.listen((_) => _dirty()); |
}); |
return node.asset; |
}); |
} |
- /// Applies the transform so that it produces concrete (as opposed to lazy) |
- /// outputs. |
+ /// Run [Transformer.apply] as soon as [primary] is available. |
/// |
- /// Returns whether or not the transformer logged an error. |
- Future<bool> _applyImmediate() { |
+ /// Returns whether or not an error occurred while running the transformer. |
+ Future<bool> _runApply() { |
var transformController = new TransformController(this); |
_onLogPool.add(transformController.onLog); |
- return syncFuture(() { |
- return transformer.apply(transformController.transform); |
+ return primary.whenAvailable((_) { |
+ if (_isRemoved) return null; |
+ _state = _State.APPLYING; |
+ return syncFuture(() => transformer.apply(transformController.transform)); |
}).then((_) { |
- if (!_state.isProcessing || _onAssetController.isClosed) return false; |
+ if (_state == _State.NEEDS_APPLY || _isRemoved) return false; |
if (transformController.loggedError) return true; |
- |
- _consumePrimary = transformController.consumePrimary; |
- |
- var newOutputs = transformController.outputs; |
- // Any ids that are for a different package are invalid. |
- var invalidIds = newOutputs |
- .map((asset) => asset.id) |
- .where((id) => id.package != phase.cascade.package) |
- .toSet(); |
- for (var id in invalidIds) { |
- newOutputs.removeId(id); |
- // TODO(nweiz): report this as a warning rather than a failing error. |
- phase.cascade.reportError(new InvalidOutputException(info, id)); |
- } |
- |
- // Remove outputs that used to exist but don't anymore. |
- for (var id in _outputControllers.keys.toList()) { |
- if (newOutputs.containsId(id)) continue; |
- _outputControllers.remove(id).setRemoved(); |
- } |
- |
- // Emit or stop emitting the pass-through asset between removing and |
- // adding outputs to ensure there are no collisions. |
- if (!newOutputs.containsId(primary.id)) { |
- _emitPassThrough(); |
- } else { |
- _dontEmitPassThrough(); |
- } |
- |
- // Store any new outputs or new contents for existing outputs. |
- for (var asset in newOutputs) { |
- var controller = _outputControllers[asset.id]; |
- if (controller != null) { |
- controller.setAvailable(asset); |
- } else { |
- var controller = new AssetNodeController.available(asset, this); |
- _outputControllers[asset.id] = controller; |
- _onAssetController.add(controller.node); |
- } |
- } |
- |
+ _handleApplyResults(transformController); |
return false; |
+ }).catchError((error, stackTrace) { |
+ // If the transform became dirty while processing, ignore any errors from |
+ // it. |
+ if (_state == _State.NEEDS_APPLY || _isRemoved) return false; |
+ |
+ // Catch all transformer errors and pipe them to the results stream. This |
+ // is so a broken transformer doesn't take down the whole graph. |
+ phase.cascade.reportError(_wrapException(error, stackTrace)); |
+ return true; |
}); |
} |
- /// Applies the transform in declarative mode so that it produces lazy |
- /// outputs. |
+ /// Handle the results of running [Transformer.apply]. |
/// |
- /// Returns whether or not the transformer logged an error. |
- Future<bool> _declareLazy() { |
- var transformController = new DeclaringTransformController(this); |
- |
- return syncFuture(() { |
- return (transformer as LazyTransformer) |
- .declareOutputs(transformController.transform); |
- }).then((_) { |
- if (!_state.isProcessing || _onAssetController.isClosed) return false; |
- if (transformController.loggedError) return true; |
- |
- _consumePrimary = transformController.consumePrimary; |
+ /// [transformController] should be the controller for the [Transform] passed |
+ /// to [Transformer.apply]. |
+ void _handleApplyResults(TransformController transformController) { |
+ _consumePrimary = transformController.consumePrimary; |
+ |
+ var newOutputs = transformController.outputs; |
+ // Any ids that are for a different package are invalid. |
+ var invalidIds = newOutputs |
+ .map((asset) => asset.id) |
+ .where((id) => id.package != phase.cascade.package) |
+ .toSet(); |
+ for (var id in invalidIds) { |
+ newOutputs.removeId(id); |
+ // TODO(nweiz): report this as a warning rather than a failing error. |
+ phase.cascade.reportError(new InvalidOutputException(info, id)); |
+ } |
- var newIds = transformController.outputIds; |
- var invalidIds = |
- newIds.where((id) => id.package != phase.cascade.package).toSet(); |
- for (var id in invalidIds) { |
- newIds.remove(id); |
- // TODO(nweiz): report this as a warning rather than a failing error. |
- phase.cascade.reportError(new InvalidOutputException(info, id)); |
+ if (_declaredOutputs != null) { |
+ var missingOutputs = _declaredOutputs.difference( |
+ newOutputs.map((asset) => asset.id).toSet()); |
+ if (missingOutputs.isNotEmpty) { |
+ _warn("This transformer didn't emit declared " |
+ "${pluralize('output asset', missingOutputs.length)} " |
+ "${toSentence(missingOutputs)}."); |
} |
+ } |
- // Remove outputs that used to exist but don't anymore. |
- for (var id in _outputControllers.keys.toList()) { |
- if (newIds.contains(id)) continue; |
- _outputControllers.remove(id).setRemoved(); |
- } |
+ // Remove outputs that used to exist but don't anymore. |
+ for (var id in _outputControllers.keys.toList()) { |
+ if (newOutputs.containsId(id)) continue; |
+ _outputControllers.remove(id).setRemoved(); |
+ } |
- // Emit or stop emitting the pass-through asset between removing and |
- // adding outputs to ensure there are no collisions. |
- if (!newIds.contains(primary.id)) { |
- _emitPassThrough(); |
- } else { |
- _dontEmitPassThrough(); |
- } |
+ // Emit or stop emitting the pass-through asset between removing and |
+ // adding outputs to ensure there are no collisions. |
+ if (!_consumePrimary && !newOutputs.containsId(primary.id)) { |
+ _emitPassThrough(); |
+ } else { |
+ _dontEmitPassThrough(); |
+ } |
- for (var id in newIds) { |
- var controller = _outputControllers[id]; |
- if (controller != null) { |
- controller.setLazy(force); |
- } else { |
- var controller = new AssetNodeController.lazy(id, force, this); |
- _outputControllers[id] = controller; |
- _onAssetController.add(controller.node); |
- } |
+ // Store any new outputs or new contents for existing outputs. |
+ for (var asset in newOutputs) { |
+ var controller = _outputControllers[asset.id]; |
+ if (controller != null) { |
+ controller.setAvailable(asset); |
+ } else { |
+ var controller = new AssetNodeController.available(asset, this); |
+ _outputControllers[asset.id] = controller; |
+ _onAssetController.add(controller.node); |
} |
- |
- return false; |
- }); |
+ } |
} |
/// Cancels all subscriptions to secondary input nodes. |
@@ -421,7 +432,9 @@ class TransformNode { |
if (_passThroughController == null) { |
_passThroughController = new AssetNodeController.from(primary); |
_onAssetController.add(_passThroughController.node); |
- } else { |
+ } else if (primary.state.isDirty) { |
+ _passThroughController.setDirty(); |
+ } else if (!_passThroughController.node.state.isAvailable) { |
_passThroughController.setAvailable(primary.asset); |
} |
} |
@@ -441,75 +454,58 @@ class TransformNode { |
} |
} |
+ /// Emit a warning about the transformer on [id]. |
+ void _warn(String message) { |
+ _onLogController.add( |
+ new LogEntry(info, primary.id, LogLevel.WARNING, message, null)); |
+ } |
+ |
String toString() => |
"transform node in $_location for $transformer on $primary"; |
} |
/// The enum of states that [TransformNode] can be in. |
-class _TransformNodeState { |
- /// The transform node is running [Transformer.isPrimary] or |
- /// [Transformer.apply] and doesn't need to re-run them. |
+class _State { |
+ /// The transform is running [Transformer.isPrimary]. |
/// |
- /// If there are no external changes by the time the processing finishes, this |
- /// will transition to [APPLIED] or [NOT_PRIMARY] depending on the result of |
- /// [Transformer.isPrimary]. If the primary input changes, this will |
- /// transition to [NEEDS_IS_PRIMARY]. If a secondary input changes, this will |
- /// transition to [NEEDS_APPLY]. |
- static final PROCESSING = const _TransformNodeState._("processing"); |
- |
- /// The transform is running [Transformer.isPrimary] or [Transformer.apply], |
- /// but since it started the primary input changed, so it will need to re-run |
- /// [Transformer.isPrimary]. |
+ /// This is the initial state of the transformer. Once [Transformer.isPrimary] |
+ /// finishes running, this will transition to [APPLYING] if the input is |
+ /// primary, or [NOT_PRIMARY] if it's not. |
+ static final COMPUTING_IS_PRIMARY = const _State._("computing isPrimary"); |
+ |
+ /// The transform is running [Transformer.apply]. |
/// |
- /// This will always transition to [Transformer.PROCESSING]. |
- static final NEEDS_IS_PRIMARY = |
- const _TransformNodeState._("needs isPrimary"); |
+ /// If an input changes while in this state, it will transition to |
+ /// [NEEDS_APPLY]. If the [TransformNode] is still in this state when |
+ /// [Transformer.apply] finishes running, it will transition to [APPLIED]. |
+ static final APPLYING = const _State._("applying"); |
- /// The transform is running [Transformer.apply], but since it started a |
- /// secondary input changed, so it will need to re-run [Transformer.apply]. |
+ /// The transform is running [Transformer.apply], but an input changed after |
+ /// it started, so it will need to re-run [Transformer.apply]. |
/// |
- /// If there are no external changes by the time [Transformer.apply] finishes, |
- /// this will transition to [PROCESSING]. If the primary input changes, this |
- /// will transition to [NEEDS_IS_PRIMARY]. |
- static final NEEDS_APPLY = const _TransformNodeState._("needs apply"); |
+ /// This will transition to [APPLYING] once [Transformer.apply] finishes |
+ /// running. |
+ static final NEEDS_APPLY = const _State._("needs apply"); |
/// The transform has finished running [Transformer.apply], whether or not it |
/// emitted an error. |
/// |
- /// If the primary input or a secondary input changes, this will transition to |
- /// [PROCESSING]. |
- static final APPLIED = const _TransformNodeState._("applied"); |
+ /// If the transformer is lazy, the [TransformNode] can also be in this state |
+ /// when [Transformer.declareOutputs] has been run but [Transformer.apply] has |
+ /// not. |
+ /// |
+ /// If an input changes, this will transition to [APPLYING]. |
+ static final APPLIED = const _State._("applied"); |
/// The transform has finished running [Transformer.isPrimary], which returned |
/// `false`. |
/// |
- /// If the primary input changes, this will transition to [PROCESSING]. |
- static final NOT_PRIMARY = const _TransformNodeState._("not primary"); |
- |
- /// Whether [this] is [PROCESSING]. |
- bool get isProcessing => this == _TransformNodeState.PROCESSING; |
- |
- /// Whether [this] is [NEEDS_IS_PRIMARY]. |
- bool get needsIsPrimary => this == _TransformNodeState.NEEDS_IS_PRIMARY; |
- |
- /// Whether [this] is [NEEDS_APPLY]. |
- bool get needsApply => this == _TransformNodeState.NEEDS_APPLY; |
- |
- /// Whether [this] is [APPLIED]. |
- bool get isApplied => this == _TransformNodeState.APPLIED; |
- |
- /// Whether [this] is [NOT_PRIMARY]. |
- bool get isNotPrimary => this == _TransformNodeState.NOT_PRIMARY; |
- |
- /// Whether the transform has finished running [Transformer.isPrimary] and |
- /// [Transformer.apply]. |
- /// |
- /// Specifically, whether [this] is [APPLIED] or [NOT_PRIMARY]. |
- bool get isDone => isApplied || isNotPrimary; |
+ /// This will never transition to another state. |
+ static final NOT_PRIMARY = const _State._("not primary"); |
final String name; |
- const _TransformNodeState._(this.name); |
+ const _State._(this.name); |
String toString() => name; |
} |