Chromium Code Reviews| Index: pkg/barback/lib/src/phase.dart |
| diff --git a/pkg/barback/lib/src/phase.dart b/pkg/barback/lib/src/phase.dart |
| index e16f94b7a8e9e3aa7a4a175a7ae0be3ff81484d6..c5af5093508ca18761839534e0bcea31c06cf22b 100644 |
| --- a/pkg/barback/lib/src/phase.dart |
| +++ b/pkg/barback/lib/src/phase.dart |
| @@ -14,6 +14,7 @@ import 'asset_set.dart'; |
| import 'errors.dart'; |
| import 'transform_node.dart'; |
| import 'transformer.dart'; |
| +import 'utils.dart'; |
| /// One phase in the ordered series of transformations in an [AssetCascade]. |
| /// |
| @@ -45,19 +46,32 @@ class Phase { |
| /// phases, they will be the outputs from the previous phase. |
| final inputs = new Map<AssetId, AssetNode>(); |
| - /// The transforms currently applicable to assets in [inputs]. |
| + /// The transforms currently applicable to assets in [inputs], indexed by |
| + /// the ids of their primary inputs. |
| /// |
| /// These are the transforms that have been "wired up": they represent a |
| /// repeatable transformation of a single concrete set of inputs. "dart2js" |
| /// is a transformer. "dart2js on web/main.dart" is a transform. |
| - final _transforms = new Set<TransformNode>(); |
| + final _transforms = new Map<AssetId, Set<TransformNode>>(); |
| - /// The nodes that are new in this phase since the last time [process] was |
| - /// called. |
| + /// Futures that will complete once the transformers that can consume a given |
| + /// asset are determined. |
| /// |
| - /// When we process, we'll check these to see if we can hang new transforms |
| - /// off them. |
| - final _newInputs = new Set<AssetNode>(); |
| + /// Whenever an asset is added or modified, we need to asynchronously |
| + /// determine which transformers can use it as their primary input. We can't |
| + /// start processing until we know which transformers to run, and this allows |
| + /// us to wait until we do. |
| + var _adjustTransformersFutures = new Map<AssetId, Future>(); |
|
Bob Nystrom
2013/07/31 20:05:41
"adjust" is a bit confusing to me. How about "poss
nweiz
2013/07/31 22:47:53
The name comes from the fact that these are future
|
| + |
| + /// New asset nodes that were added while [_adjustTransformers] was still |
| + /// being run on an old version of that asset. |
| + var _pendingNewInputs = new Map<AssetId, AssetNode>(); |
| + |
| + /// The ids of assets that are emitted by transforms in this phase. |
| + /// |
| + /// This is used to detect collisions where multiple transforms emit the same |
| + /// output. |
| + final _outputs = new Set<AssetId>(); |
| /// The phase after this one. |
| /// |
| @@ -66,133 +80,208 @@ class Phase { |
| Phase(this.cascade, this._index, this._transformers, this._next); |
| - /// Updates the phase's inputs with [updated] and removes [removed]. |
| + /// Adds a new asset as an input for this phase. |
| /// |
| - /// This marks any affected [transforms] as dirty or discards them if their |
| - /// inputs are removed. |
| - void updateInputs(AssetSet updated, Set<AssetId> removed) { |
| - // Remove any nodes that are no longer being output. Handle removals first |
| - // in case there are assets that were removed by one transform but updated |
| - // by another. In that case, the update should win. |
| - for (var id in removed) { |
| - var node = inputs.remove(id); |
| - |
| - // Every transform that was using it is dirty now. |
| - if (node != null) { |
| - node.consumers.forEach((consumer) => consumer.dirty()); |
| - } |
| - } |
| + /// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase |
| + /// will automatically begin determining which transforms can consume it as a |
| + /// primary input. The transforms themselves won't be applied until [process] |
| + /// is called, however. |
| + /// |
| + /// This should only be used for brand-new assets or assets that have been |
| + /// removed and re-created. The phase will automatically handle updated assets |
| + /// using the [AssetNode.onStateChange] stream. |
| + void addInput(AssetNode node) { |
| + // We remove [node.id] from [inputs] as soon as the node is removed rather |
| + // than at the same time [node.id] is removed from [_transforms] so we don't |
| + // have to wait on [_adjustTransformers]. It's important that [inputs] is |
| + // always up-to-date so that the [AssetCascade] can look there for available |
| + // assets. |
| + inputs[node.id] = node; |
| + node.whenRemoved.then((_) => inputs.remove(node.id)); |
| - // Update and new or modified assets. |
| - for (var asset in updated) { |
| - var node = inputs[asset.id]; |
| - if (node == null) { |
| - // It's a new node. Add it and remember it so we can see if any new |
| - // transforms will consume it. |
| - node = new AssetNode(asset); |
| - inputs[asset.id] = node; |
| - _newInputs.add(node); |
| - } else { |
| - node.updateAsset(asset); |
| - } |
| + if (_adjustTransformersFutures.containsKey(node.id)) { |
|
Bob Nystrom
2013/07/31 20:05:41
Switch the order of cases here and do if (!_adjust
nweiz
2013/07/31 22:47:53
Done.
|
| + // If an input is added while the same input is still being processed, |
| + // that means that the asset was removed and recreated while |
| + // [_adjustTransformers] was being run on the old value. We have to wait |
| + // until that finishes, then run it again on whatever the newest version |
| + // of that asset is. |
|
Bob Nystrom
2013/07/31 20:05:41
This comment is confused compared to the code. Is
nweiz
2013/07/31 22:47:53
The comment is notionally attached to the _pending
|
| + var containedKey = _pendingNewInputs.containsKey(node.id); |
| + _pendingNewInputs[node.id] = node; |
| + if (containedKey) return; |
| + |
| + _adjustTransformersFutures[node.id].then((_) { |
| + assert(!_adjustTransformersFutures.containsKey(node.id)); |
| + assert(_pendingNewInputs.containsKey(node.id)); |
| + _transforms[node.id] = new Set<TransformNode>(); |
| + _adjustTransformers(_pendingNewInputs.remove(node.id)); |
| + }, onError: (_) { |
| + // If there was a programmatic error while processing the old input, |
| + // we don't want to just ignore it; it may have left the system in an |
| + // inconsistent state. We also don't want to top-level it, so we |
| + // ignore it here but don't start processing the new input. That way |
| + // when [process] is called, the error will be piped through its |
| + // return value. |
| + }).catchError((e) { |
| + // If our code above has a programmatic error, ensure it will be piped |
| + // through [process] by putting it into [_adjustTransformersFutures]. |
| + _adjustTransformersFutures[node.id] = new Future.error(e); |
| + }); |
| + } else { |
| + _transforms[node.id] = new Set<TransformNode>(); |
| + _adjustTransformers(node); |
| } |
| } |
| - /// Processes this phase. |
| + /// Returns the input for this phase with the given [id], but only if that |
| + /// input is known not to be consumed as a transformer's primary input. |
| /// |
| - /// For all new inputs, it tries to see if there are transformers that can |
| - /// consume them. Then all applicable transforms are applied. |
| + /// If the input is unavailable, or if the phase hasn't determined whether or |
| + /// not any transformers will consume it as a primary input, null will be |
| + /// returned instead. This means that the return value is guaranteed to always |
| + /// be [AssetState.AVAILABLE]. |
| + AssetNode getUnconsumedInput(AssetId id) { |
| + if (!inputs.containsKey(id)) return null; |
|
Bob Nystrom
2013/07/31 20:05:41
How about some blank lines above the comments in h
nweiz
2013/07/31 22:47:53
Done.
|
| + // If the asset has inputs but no _transforms set, that means that |
|
Bob Nystrom
2013/07/31 20:05:41
means what?
nweiz
2013/07/31 22:47:53
This can't actually happen any more; removed the c
|
| + if (!_transforms.containsKey(id)) return null; |
|
Bob Nystrom
2013/07/31 20:05:41
I don't understand this. If there's no transformer
nweiz
2013/07/31 22:47:53
There's a difference between no transformer being
|
| + // If the asset has transforms, it's not unconsumed. |
| + if (!_transforms[id].isEmpty) return null; |
| + // If we're working on figuring out if the asset has transforms, we can't |
| + // prove that it's unconsumed. |
| + if (_adjustTransformersFutures.containsKey(id)) return null; |
| + // The asset should be available. If it were removed, it wouldn't be in |
| + // _inputs, and if it were dirty, it'd be in _adjustTransformersFutures. |
| + assert(inputs[id].state.isAvailable); |
| + return inputs[id]; |
| + } |
| + |
| + /// Asynchronously determines which transformers can consume [node] as a |
| + /// primary input and creates transforms for them. |
| /// |
| - /// Returns a future that completes when processing is done. If there is |
| - /// nothing to process, returns `null`. |
| - Future process() { |
| - var future = _processNewInputs(); |
| - if (future == null) { |
| - return _processTransforms(); |
| - } |
| + /// This ensures that if [node] is modified or removed during or after the |
| + /// time it takes to adjust its transformers, they're appropriately |
| + /// re-adjusted. Its progress can be tracked in [_adjustTransformersFutures]. |
| + void _adjustTransformers(AssetNode node) { |
|
Bob Nystrom
2013/07/31 20:05:41
This method name isn't very clear. How about _find
nweiz
2013/07/31 22:47:53
It doesn't just find new transforms. It also check
|
| + // Once the input is available, hook up transformers for it. If it changes |
| + // while that's happening, try again. |
| + _adjustTransformersFutures[node.id] = node.tryUntilStable((asset) { |
| + var oldTransformers = _transforms[node.id] |
| + .map((transform) => transform.transformer).toSet(); |
| + |
| + return _removeStaleTransforms(asset) |
| + .then((_) => _addNewTransforms(node, oldTransformers)); |
| + }).then((_) { |
| + // Now all the transforms are set up correctly and the asset is available |
| + // for the time being. Set up handlers for when the asset changes in the |
| + // future. |
| + node.onStateChange.first.then((state) { |
|
Bob Nystrom
2013/07/31 20:05:41
Do we need to handle the .first future having an e
nweiz
2013/07/31 22:47:53
Done. It now pipes the error so that it will be em
|
| + if (state.isRemoved) { |
| + _transforms.remove(node.id); |
| + } else { |
| + _adjustTransformers(node); |
| + } |
| + }); |
| + }).catchError((error) { |
| + if (error is! AssetNotFoundException || error.id != node.id) throw error; |
| + |
| + // If the asset is removed, [tryUntilStable] will throw an |
| + // [AssetNotFoundException]. In that case, just remove all transforms for |
| + // the node. |
| + _transforms.remove(node.id); |
| + }).whenComplete(() { |
| + _adjustTransformersFutures.remove(node.id); |
| + }); |
| - return future.then((_) => _processTransforms()); |
| + // Don't top-level errors coming from the input processing. Any errors will |
| + // eventually be piped through [process]'s returned Future. |
| + _adjustTransformersFutures[node.id].catchError((_) {}); |
| } |
| - /// Creates new transforms for any new inputs that are applicable. |
| - Future _processNewInputs() { |
| - if (_newInputs.isEmpty) return null; |
| - |
| - var futures = []; |
| - for (var node in _newInputs) { |
| - for (var transformer in _transformers) { |
| - // TODO(rnystrom): Catch all errors from isPrimary() and redirect |
| - // to results. |
| - futures.add(transformer.isPrimary(node.asset).then((isPrimary) { |
| - if (!isPrimary) return; |
| - var transform = new TransformNode(this, transformer, node); |
| - node.consumers.add(transform); |
| - _transforms.add(transform); |
| - })); |
| - } |
| - } |
| + // Remove any old transforms that used to have [asset] as a primary asset but |
| + // no longer apply to its new contents. |
| + Future _removeStaleTransforms(Asset asset) { |
| + return Future.wait(_transforms[asset.id].map((transform) { |
| + // TODO(rnystrom): Catch all errors from isPrimary() and redirect to |
| + // results. |
| + return transform.transformer.isPrimary(asset).then((isPrimary) { |
| + if (isPrimary) return; |
| + _transforms[asset.id].remove(transform); |
| + transform.remove(); |
| + }); |
| + })); |
| + } |
| - _newInputs.clear(); |
| + // Add new transforms for transformers that consider [asset] to be a primary |
|
Bob Nystrom
2013/07/31 20:05:41
"[asset]" -> "[node]'s asset"
nweiz
2013/07/31 22:47:53
Done.
nweiz
2013/07/31 22:47:53
Done.
|
| + // input. |
| + // |
| + // [oldTransformers] is the set of transformers that had [node] as a primary |
| + // input prior to this. They don't need to be checked, since they were removed |
| + // or preserved in [_removeStaleTransforms]. |
| + Future _addNewTransforms(AssetNode node, Set<Transformer> oldTransformers) { |
|
Bob Nystrom
2013/07/31 20:05:41
"new" -> "fresh" to correspond with "stale" above?
nweiz
2013/07/31 22:47:53
Done.
|
| + return Future.wait(_transformers.map((transformer) { |
| + if (oldTransformers.contains(transformer)) return new Future.value(); |
| - return Future.wait(futures); |
| + // If the asset is unavailable, the results of this [_adjustTransformers] |
| + // run will be discarded, so we can just short-circuit. |
| + if (node.asset == null) return new Future.value(); |
| + |
| + // We can safely access [node.asset] here even though it might have |
| + // changed since (as above) if it has, [_adjustTransformers] will just be |
| + // re-run. |
| + // TODO(rnystrom): Catch all errors from isPrimary() and redirect to |
| + // results. |
| + return transformer.isPrimary(node.asset).then((isPrimary) { |
| + if (!isPrimary) return; |
| + _transforms[node.id].add(new TransformNode(this, transformer, node)); |
| + }); |
| + })); |
| } |
| - /// Applies all currently wired up and dirty transforms. |
| + /// Processes this phase. |
| /// |
| - /// Passes their outputs to the next phase. |
| + /// Returns a future that completes when processing is done. If there is |
| + /// nothing to process, returns `null`. |
| + Future process() { |
| + if (_adjustTransformersFutures.isEmpty) return _processTransforms(); |
| + return _waitForInputs().then((_) => _processTransforms()); |
| + } |
| + |
| + Future _waitForInputs() { |
| + if (_adjustTransformersFutures.isEmpty) return new Future.value(); |
| + return Future.wait(_adjustTransformersFutures.values) |
| + .then((_) => _waitForInputs()); |
| + } |
| + |
| + /// Applies all currently wired up and dirty transforms. |
| Future _processTransforms() { |
| // Convert this to a list so we can safely modify _transforms while |
| // iterating over it. |
| - var dirtyTransforms = _transforms.where((transform) => transform.isDirty) |
| - .toList(); |
| + var dirtyTransforms = |
| + flatten(_transforms.values.map((transforms) => transforms.toList())) |
| + .where((transform) => transform.isDirty).toList(); |
| if (dirtyTransforms.isEmpty) return null; |
| - return Future.wait(dirtyTransforms.map((transform) { |
| - if (inputs.containsKey(transform.primary.id)) return transform.apply(); |
| - |
| - // If the primary input for the transform has been removed, get rid of it |
| - // and all its outputs. |
| - _transforms.remove(transform); |
| - return new Future.value( |
| - new TransformOutputs(new AssetSet(), transform.outputs)); |
| - })).then((transformOutputs) { |
| - // Collect all of the outputs. Since the transforms are run in parallel, |
| - // we have to be careful here to ensure that the result is deterministic |
| - // and not influenced by the order that transforms complete. |
| - var updated = new AssetSet(); |
| - var removed = new Set<AssetId>(); |
| - var collisions = new Set<AssetId>(); |
| + return Future.wait(dirtyTransforms.map((transform) => transform.apply())) |
| + .then((allNewOutputs) { |
| + var newOutputs = allNewOutputs.reduce((set1, set2) => set1.union(set2)); |
| - // Handle the generated outputs of all transforms first. |
| - for (var outputs in transformOutputs) { |
| - // Collect the outputs of all transformers together. |
| - for (var asset in outputs.updated) { |
| - if (updated.containsId(asset.id)) { |
| - // Report a collision. |
| - collisions.add(asset.id); |
| - } else { |
| - // TODO(rnystrom): In the case of a collision, the asset that |
| - // "wins" is chosen non-deterministically. Do something better. |
| - updated.add(asset); |
| - } |
| + var collisions = new Set<AssetId>(); |
| + for (var newOutput in newOutputs) { |
| + if (_outputs.contains(newOutput.id)) { |
| + collisions.add(newOutput.id); |
| + } else { |
| + _next.addInput(newOutput); |
| + _outputs.add(newOutput.id); |
| + newOutput.whenRemoved.then((_) => _outputs.remove(newOutput.id)); |
| } |
| - |
| - // Track any assets no longer output by this transform. We don't |
| - // handle the case where *another* transform generates the asset |
| - // no longer generated by this one. updateInputs() handles that. |
| - removed.addAll(outputs.removed); |
| } |
| - // Report any collisions in deterministic order. |
| + // Report collisions in a deterministic order. |
| collisions = collisions.toList(); |
| collisions.sort((a, b) => a.toString().compareTo(b.toString())); |
| for (var collision in collisions) { |
| cascade.reportError(new AssetCollisionException(collision)); |
| // TODO(rnystrom): Define what happens after a collision occurs. |
| } |
| - |
| - // Pass the outputs to the next phase. |
| - _next.updateInputs(updated, removed); |
| }); |
| } |
| } |