Index: mojo/public/dart/third_party/barback/lib/src/graph/phase.dart |
diff --git a/mojo/public/dart/third_party/barback/lib/src/graph/phase.dart b/mojo/public/dart/third_party/barback/lib/src/graph/phase.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..4bfc5e19fd1a74decd504ee8e1f1b8aa3a9a0a19 |
--- /dev/null |
+++ b/mojo/public/dart/third_party/barback/lib/src/graph/phase.dart |
@@ -0,0 +1,396 @@ |
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library barback.graph.phase; |
+ |
+import 'dart:async'; |
+ |
+import '../asset/asset_id.dart'; |
+import '../asset/asset_node.dart'; |
+import '../asset/asset_node_set.dart'; |
+import '../errors.dart'; |
+import '../log.dart'; |
+import '../transformer/aggregate_transformer.dart'; |
+import '../transformer/transformer.dart'; |
+import '../transformer/transformer_group.dart'; |
+import '../utils.dart'; |
+import '../utils/multiset.dart'; |
+import 'asset_cascade.dart'; |
+import 'group_runner.dart'; |
+import 'node_status.dart'; |
+import 'node_streams.dart'; |
+import 'phase_forwarder.dart'; |
+import 'phase_output.dart'; |
+import 'transformer_classifier.dart'; |
+ |
+/// One phase in the ordered series of transformations in an [AssetCascade]. |
+/// |
+/// Each phase can access outputs from previous phases and can in turn pass |
+/// outputs to later phases. Phases are processed strictly serially. All |
+/// transforms in a phase will be complete before moving on to the next phase. |
+/// Within a single phase, all transforms will be run in parallel. |
+/// |
+/// Building can be interrupted between phases. For example, a source is added |
+/// which starts the background process. Sometime during, say, phase 2 (which |
+/// is running asynchronously) that source is modified. When the process queue |
+/// goes to advance to phase 3, it will see that modification and start the |
+/// waterfall from the beginning again. |
+class Phase { |
+ /// The cascade that owns this phase. |
+ final AssetCascade cascade; |
+ |
+ /// A string describing the location of [this] in the transformer graph. |
+ final String _location; |
+ |
+ /// The index of [this] in its parent cascade or group. |
+ final int _index; |
+ |
+ /// The groups for this phase. |
+ final _groups = new Map<TransformerGroup, GroupRunner>(); |
+ |
+ /// The inputs for this phase. |
+ /// |
+ /// For the first phase, these will be the source assets. For all other |
+ /// phases, they will be the outputs from the previous phase. |
+ final _inputs = new AssetNodeSet(); |
+ |
+ /// The transformer classifiers for this phase. |
+ /// |
+ /// The keys can be either [Transformer]s or [AggregateTransformer]s. |
+ final _classifiers = new Map<dynamic, TransformerClassifier>(); |
+ |
+ /// The forwarders for this phase. |
+ final _forwarders = new Map<AssetId, PhaseForwarder>(); |
+ |
+ /// The outputs for this phase. |
+ final _outputs = new Map<AssetId, PhaseOutput>(); |
+ |
+ /// The set of all [AssetNode.origin] properties of the input assets for this |
+ /// phase. |
+ /// |
+ /// This is used to determine which assets have been passed unmodified through |
+ /// [_classifiers] or [_groups]. It's possible that a given asset was consumed |
+ /// by a group and not an individual transformer, and so shouldn't be |
+ /// forwarded through the phase as a whole. |
+ /// |
+ /// In order to detect whether an output has been forwarded through a group or |
+ /// a classifier, we must be able to distinguish it from other outputs with |
+ /// the same id. To do so, we check if its origin is in [_inputOrigins]. If |
+ /// so, it's been forwarded unmodified. |
+ final _inputOrigins = new Multiset<AssetNode>(); |
+ |
+ /// The streams exposed by this phase. |
+ final _streams = new NodeStreams(); |
+ Stream<NodeStatus> get onStatusChange => _streams.onStatusChange; |
+ Stream<AssetNode> get onAsset => _streams.onAsset; |
+ Stream<LogEntry> get onLog => _streams.onLog; |
+ |
+ /// How far along [this] is in processing its assets. |
+ NodeStatus get status { |
+ // Before any transformers are added, the phase should be dirty if and only |
+ // if any input is dirty. |
+ if (_classifiers.isEmpty && _groups.isEmpty && previous == null) { |
+ return _inputs.any((input) => input.state.isDirty) ? |
+ NodeStatus.RUNNING : NodeStatus.IDLE; |
+ } |
+ |
+ var classifierStatus = NodeStatus.dirtiest( |
+ _classifiers.values.map((classifier) => classifier.status)); |
+ var groupStatus = NodeStatus.dirtiest( |
+ _groups.values.map((group) => group.status)); |
+ return (previous == null ? NodeStatus.IDLE : previous.status) |
+ .dirtier(classifierStatus) |
+ .dirtier(groupStatus); |
+ } |
+ |
+ /// The previous phase in the cascade, or null if this is the first phase. |
+ final Phase previous; |
+ |
+ /// The subscription to [previous]'s [onStatusChange] stream. |
+ StreamSubscription _previousStatusSubscription; |
+ |
+ /// The subscription to [previous]'s [onAsset] stream. |
+ StreamSubscription<AssetNode> _previousOnAssetSubscription; |
+ |
+ final _inputSubscriptions = new Set<StreamSubscription>(); |
+ |
+ /// A map of asset ids to completers for [getInput] requests. |
+ /// |
+ /// If an asset node is requested before it's available, we put a completer in |
+ /// this map to wait for the asset to be generated. If it's not generated, the |
+ /// completer should complete to `null`. |
+ final _pendingOutputRequests = new Map<AssetId, Completer<AssetNode>>(); |
+ |
+ /// Returns all currently-available output assets for this phase. |
+ Set<AssetNode> get availableOutputs { |
+ return _outputs.values |
+ .map((output) => output.output) |
+ .where((node) => node.state.isAvailable) |
+ .toSet(); |
+ } |
+ |
+ // TODO(nweiz): Rather than passing the cascade and the phase everywhere, |
+ // create an interface that just exposes [getInput]. Emit errors via |
+ // [AssetNode]s. |
+ Phase(AssetCascade cascade, String location) |
+ : this._(cascade, location, 0); |
+ |
+ Phase._(this.cascade, this._location, this._index, [this.previous]) { |
+ if (previous != null) { |
+ _previousOnAssetSubscription = previous.onAsset.listen(addInput); |
+ _previousStatusSubscription = previous.onStatusChange |
+ .listen((_) => _streams.changeStatus(status)); |
+ } |
+ |
+ onStatusChange.listen((status) { |
+ if (status == NodeStatus.RUNNING) return; |
+ |
+ // All the previous phases have finished declaring or producing their |
+ // outputs. If anyone's still waiting for outputs, cut off the wait; we |
+ // won't be generating them, at least until a source asset changes. |
+ for (var completer in _pendingOutputRequests.values) { |
+ completer.complete(null); |
+ } |
+ _pendingOutputRequests.clear(); |
+ }); |
+ } |
+ |
+ /// Adds a new asset as an input for this phase. |
+ /// |
+ /// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase |
+ /// will automatically begin determining which transforms can consume it as a |
+ /// primary input. The transforms themselves won't be applied until [process] |
+ /// is called, however. |
+ /// |
+ /// This should only be used for brand-new assets or assets that have been |
+ /// removed and re-created. The phase will automatically handle updated assets |
+ /// using the [AssetNode.onStateChange] stream. |
+ void addInput(AssetNode node) { |
+ // Each group is one channel along which an asset may be forwarded, as is |
+ // each transformer. |
+ var forwarder = new PhaseForwarder( |
+ node, _classifiers.length, _groups.length); |
+ _forwarders[node.id] = forwarder; |
+ forwarder.onAsset.listen(_handleOutputWithoutForwarder); |
+ if (forwarder.output != null) { |
+ _handleOutputWithoutForwarder(forwarder.output); |
+ } |
+ |
+ _inputOrigins.add(node.origin); |
+ _inputs.add(node); |
+ _inputSubscriptions.add(node.onStateChange.listen((state) { |
+ if (state.isRemoved) { |
+ _inputOrigins.remove(node.origin); |
+ _forwarders.remove(node.id).remove(); |
+ } |
+ _streams.changeStatus(status); |
+ })); |
+ |
+ for (var classifier in _classifiers.values) { |
+ classifier.addInput(node); |
+ } |
+ } |
+ |
+ // TODO(nweiz): If the output is available when this is called, it's |
+ // theoretically possible for it to become unavailable between the call and |
+ // the return. If it does so, it won't trigger the rebuilding process. To |
+ // avoid this, we should have this and the methods it calls take explicit |
+ // callbacks, as in [AssetNode.whenAvailable]. |
+ /// Gets the asset node for an output [id]. |
+ /// |
+ /// If [id] is for a generated or transformed asset, this will wait until it |
+ /// has been created and return it. This means that the returned asset will |
+ /// always be [AssetState.AVAILABLE]. |
+ /// |
+ /// If the output cannot be found, returns null. |
+ Future<AssetNode> getOutput(AssetId id) { |
+ return syncFuture(() { |
+ if (id.package != cascade.package) return cascade.graph.getAssetNode(id); |
+ if (_outputs.containsKey(id)) { |
+ var output = _outputs[id].output; |
+ // If the requested output is available, we can just return it. |
+ if (output.state.isAvailable) return output; |
+ |
+ // If the requested output exists but isn't yet available, wait to see |
+ // if it becomes available. If it's removed before becoming available, |
+ // try again, since it could be generated again. |
+ output.force(); |
+ return output.whenAvailable((_) { |
+ return output; |
+ }).catchError((error) { |
+ if (error is! AssetNotFoundException) throw error; |
+ return getOutput(id); |
+ }); |
+ } |
+ |
+ // If this phase and the previous phases are fully declared or done, the |
+ // requested output won't be generated and we can safely return null. |
+ if (status != NodeStatus.RUNNING) return null; |
+ |
+ // Otherwise, store a completer for the asset node. If it's generated in |
+ // the future, we'll complete this completer. |
+ var completer = _pendingOutputRequests.putIfAbsent(id, |
+ () => new Completer.sync()); |
+ return completer.future; |
+ }); |
+ } |
+ |
+ /// Set this phase's transformers to [transformers]. |
+ void updateTransformers(Iterable transformers) { |
+ var newTransformers = transformers |
+ .where((op) => op is Transformer || op is AggregateTransformer) |
+ .toSet(); |
+ var oldTransformers = _classifiers.keys.toSet(); |
+ for (var removed in oldTransformers.difference(newTransformers)) { |
+ _classifiers.remove(removed).remove(); |
+ } |
+ |
+ for (var transformer in newTransformers.difference(oldTransformers)) { |
+ var classifier = new TransformerClassifier( |
+ this, transformer, "$_location.$_index"); |
+ _classifiers[transformer] = classifier; |
+ classifier.onAsset.listen(_handleOutput); |
+ _streams.onLogPool.add(classifier.onLog); |
+ classifier.onStatusChange.listen((_) => _streams.changeStatus(status)); |
+ for (var input in _inputs) { |
+ classifier.addInput(input); |
+ } |
+ } |
+ |
+ var newGroups = transformers.where((op) => op is TransformerGroup) |
+ .toSet(); |
+ var oldGroups = _groups.keys.toSet(); |
+ for (var removed in oldGroups.difference(newGroups)) { |
+ _groups.remove(removed).remove(); |
+ } |
+ |
+ for (var added in newGroups.difference(oldGroups)) { |
+ var runner = new GroupRunner(previous, added, "$_location.$_index"); |
+ _groups[added] = runner; |
+ runner.onAsset.listen(_handleOutput); |
+ _streams.onLogPool.add(runner.onLog); |
+ runner.onStatusChange.listen((_) => _streams.changeStatus(status)); |
+ } |
+ |
+ for (var forwarder in _forwarders.values) { |
+ forwarder.updateTransformers(_classifiers.length, _groups.length); |
+ } |
+ |
+ _streams.changeStatus(status); |
+ } |
+ |
+ /// Force all [LazyTransformer]s' transforms in this phase to begin producing |
+ /// concrete assets. |
+ void forceAllTransforms() { |
+ for (var classifier in _classifiers.values) { |
+ classifier.forceAllTransforms(); |
+ } |
+ |
+ for (var group in _groups.values) { |
+ group.forceAllTransforms(); |
+ } |
+ } |
+ |
+ /// Add a new phase after this one. |
+ /// |
+ /// The new phase will have a location annotation describing its place in the |
+ /// package graph. By default, this annotation will describe it as being |
+ /// directly after [this]. If [location] is passed, though, it's described as |
+ /// being the first phase in that location. |
+ Phase addPhase([String location]) { |
+ var index = 0; |
+ if (location == null) { |
+ location = _location; |
+ index = _index + 1; |
+ } |
+ |
+ var next = new Phase._(cascade, location, index, this); |
+ for (var output in _outputs.values.toList()) { |
+ // Remove [output]'s listeners because now they should get the asset from |
+ // [next], rather than this phase. Any transforms consuming [output] will |
+ // be re-run and will consume the output from the new final phase. |
+ output.removeListeners(); |
+ } |
+ return next; |
+ } |
+ |
+ /// Mark this phase as removed. |
+ /// |
+ /// This will remove all the phase's outputs. |
+ void remove() { |
+ for (var classifier in _classifiers.values.toList()) { |
+ classifier.remove(); |
+ } |
+ for (var group in _groups.values) { |
+ group.remove(); |
+ } |
+ _streams.close(); |
+ for (var subscription in _inputSubscriptions) { |
+ subscription.cancel(); |
+ } |
+ if (_previousStatusSubscription != null) { |
+ _previousStatusSubscription.cancel(); |
+ } |
+ if (_previousOnAssetSubscription != null) { |
+ _previousOnAssetSubscription.cancel(); |
+ } |
+ } |
+ |
+ /// Add [asset] as an output of this phase. |
+ void _handleOutput(AssetNode asset) { |
+ if (_inputOrigins.contains(asset.origin)) { |
+ _forwarders[asset.id].addIntermediateAsset(asset); |
+ } else { |
+ _handleOutputWithoutForwarder(asset); |
+ } |
+ } |
+ |
+ /// Add [asset] as an output of this phase without checking if it's a |
+ /// forwarded asset. |
+ void _handleOutputWithoutForwarder(AssetNode asset) { |
+ if (_outputs.containsKey(asset.id)) { |
+ _outputs[asset.id].add(asset); |
+ } else { |
+ _outputs[asset.id] = new PhaseOutput(this, asset, "$_location.$_index"); |
+ _outputs[asset.id].onAsset.listen(_emit, |
+ onDone: () => _outputs.remove(asset.id)); |
+ _emit(_outputs[asset.id].output); |
+ } |
+ |
+ var exception = _outputs[asset.id].collisionException; |
+ if (exception != null) cascade.reportError(exception); |
+ } |
+ |
+ /// Emit [asset] as an output of this phase. |
+ /// |
+ /// This should be called after [_handleOutput], so that collisions are |
+ /// resolved. |
+ void _emit(AssetNode asset) { |
+ _streams.onAssetController.add(asset); |
+ _providePendingAsset(asset); |
+ } |
+ |
+ /// Provide an asset to a pending [getOutput] call. |
+ void _providePendingAsset(AssetNode asset) { |
+ // If anyone's waiting for this asset, provide it to them. |
+ var request = _pendingOutputRequests.remove(asset.id); |
+ if (request == null) return; |
+ |
+ if (asset.state.isAvailable) { |
+ request.complete(asset); |
+ return; |
+ } |
+ |
+ // A lazy asset may be emitted while still dirty. If so, we wait until it's |
+ // either available or removed before trying again to access it. |
+ assert(asset.state.isDirty); |
+ asset.force(); |
+ asset.whenStateChanges().then((state) { |
+ if (state.isRemoved) return getOutput(asset.id); |
+ return asset; |
+ }).then(request.complete).catchError(request.completeError); |
+ } |
+ |
+ String toString() => "phase $_location.$_index"; |
+} |