Index: third_party/pkg/barback-0.13.0/lib/src/phase.dart |
diff --git a/pkg/barback/lib/src/graph/phase.dart b/third_party/pkg/barback-0.13.0/lib/src/phase.dart |
similarity index 66% |
copy from pkg/barback/lib/src/graph/phase.dart |
copy to third_party/pkg/barback-0.13.0/lib/src/phase.dart |
index a4724ab20e22f0ef3c0c2f4aff84f96842fc272a..aa7dc78ebd01acad469e83de0af134724c6b74ca 100644 |
--- a/pkg/barback/lib/src/graph/phase.dart |
+++ b/third_party/pkg/barback-0.13.0/lib/src/phase.dart |
@@ -2,26 +2,24 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-library barback.graph.phase; |
+library barback.phase; |
import 'dart:async'; |
-import '../asset/asset_id.dart'; |
-import '../asset/asset_node.dart'; |
-import '../asset/asset_node_set.dart'; |
-import '../errors.dart'; |
-import '../log.dart'; |
-import '../transformer/transformer.dart'; |
-import '../transformer/transformer_group.dart'; |
-import '../utils.dart'; |
-import '../utils/multiset.dart'; |
import 'asset_cascade.dart'; |
+import 'asset_id.dart'; |
+import 'asset_node.dart'; |
+import 'errors.dart'; |
import 'group_runner.dart'; |
-import 'node_status.dart'; |
-import 'node_streams.dart'; |
+import 'log.dart'; |
+import 'multiset.dart'; |
import 'phase_forwarder.dart'; |
+import 'phase_input.dart'; |
import 'phase_output.dart'; |
-import 'transformer_classifier.dart'; |
+import 'stream_pool.dart'; |
+import 'transformer.dart'; |
+import 'transformer_group.dart'; |
+import 'utils.dart'; |
/// One phase in the ordered series of transformations in an [AssetCascade]. |
/// |
@@ -45,6 +43,11 @@ class Phase { |
/// The index of [this] in its parent cascade or group. |
final int _index; |
+ /// The transformers that can access [inputs]. |
+ /// |
+ /// Their outputs will be available to the next phase. |
+ final _transformers = new Set<Transformer>(); |
+ |
/// The groups for this phase. |
final _groups = new Map<TransformerGroup, GroupRunner>(); |
@@ -52,10 +55,7 @@ class Phase { |
/// |
/// For the first phase, these will be the source assets. For all other |
/// phases, they will be the outputs from the previous phase. |
- final _inputs = new AssetNodeSet(); |
- |
- /// The transformer classifiers for this phase. |
- final _classifiers = new Map<Transformer, TransformerClassifier>(); |
+ final _inputs = new Map<AssetId, PhaseInput>(); |
/// The forwarders for this phase. |
final _forwarders = new Map<AssetId, PhaseForwarder>(); |
@@ -67,45 +67,52 @@ class Phase { |
/// phase. |
/// |
/// This is used to determine which assets have been passed unmodified through |
- /// [_classifiers] or [_groups]. It's possible that a given asset was consumed |
- /// by a group and not an individual transformer, and so shouldn't be |
- /// forwarded through the phase as a whole. |
+ /// [_inputs] or [_groups]. Each input asset has a PhaseInput in [_inputs]. If |
+ /// that input isn't consumed by any transformers, it will be forwarded |
+ /// through the PhaseInput. However, it's possible that it was consumed by a |
+ /// group, and so shouldn't be forwarded through the phase as a whole. |
/// |
/// In order to detect whether an output has been forwarded through a group or |
- /// a classifier, we must be able to distinguish it from other outputs with |
+ /// a PhaseInput, we must be able to distinguish it from other outputs with |
/// the same id. To do so, we check if its origin is in [_inputOrigins]. If |
/// so, it's been forwarded unmodified. |
final _inputOrigins = new Multiset<AssetNode>(); |
- /// The streams exposed by this phase. |
- final _streams = new NodeStreams(); |
- Stream<NodeStatus> get onStatusChange => _streams.onStatusChange; |
- Stream<AssetNode> get onAsset => _streams.onAsset; |
- Stream<LogEntry> get onLog => _streams.onLog; |
- |
- /// How far along [this] is in processing its assets. |
- NodeStatus get status { |
- // Before any transformers are added, the phase should be dirty if and only |
- // if any input is dirty. |
- if (_classifiers.isEmpty && _groups.isEmpty && previous == null) { |
- return _inputs.any((input) => input.state.isDirty) ? |
- NodeStatus.RUNNING : NodeStatus.IDLE; |
- } |
+ /// A stream that emits an event whenever [this] is no longer dirty. |
+ /// |
+ /// This is synchronous in order to guarantee that it will emit an event as |
+ /// soon as [isDirty] flips from `true` to `false`. |
+ Stream get onDone => _onDoneController.stream; |
+ final _onDoneController = new StreamController.broadcast(sync: true); |
- var classifierStatus = NodeStatus.dirtiest( |
- _classifiers.values.map((classifier) => classifier.status)); |
- var groupStatus = NodeStatus.dirtiest( |
- _groups.values.map((group) => group.status)); |
- return (previous == null ? NodeStatus.IDLE : previous.status) |
- .dirtier(classifierStatus) |
- .dirtier(groupStatus); |
- } |
+ /// A stream that emits any new assets emitted by [this]. |
+ /// |
+ /// Assets are emitted synchronously to ensure that any changes are thoroughly |
+ /// propagated as soon as they occur. Only a phase with no [next] phase will |
+ /// emit assets. |
+ Stream<AssetNode> get onAsset => _onAssetController.stream; |
+ final _onAssetController = |
+ new StreamController<AssetNode>.broadcast(sync: true); |
+ |
+ /// Whether [this] is dirty and still has more processing to do. |
+ /// |
+ /// A phase is considered dirty if any of the previous phases in the same |
+ /// cascade are dirty, since those phases could emit an asset that this phase |
+ /// will then need to process. |
+ bool get isDirty => (previous != null && previous.isDirty) || |
+ _inputs.values.any((input) => input.isDirty) || |
+ _groups.values.any((group) => group.isDirty); |
+ |
+ /// A stream that emits an event whenever any transforms in this phase logs |
+ /// an entry. |
+ Stream<LogEntry> get onLog => _onLogPool.stream; |
+ final _onLogPool = new StreamPool<LogEntry>.broadcast(); |
/// The previous phase in the cascade, or null if this is the first phase. |
final Phase previous; |
- /// The subscription to [previous]'s [onStatusChange] stream. |
- StreamSubscription _previousStatusSubscription; |
+ /// The subscription to [previous]'s [onDone] stream. |
+ StreamSubscription _previousOnDoneSubscription; |
/// The subscription to [previous]'s [onAsset] stream. |
StreamSubscription<AssetNode> _previousOnAssetSubscription; |
@@ -134,16 +141,15 @@ class Phase { |
Phase._(this.cascade, this._location, this._index, [this.previous]) { |
if (previous != null) { |
_previousOnAssetSubscription = previous.onAsset.listen(addInput); |
- _previousStatusSubscription = previous.onStatusChange |
- .listen((_) => _streams.changeStatus(status)); |
+ _previousOnDoneSubscription = previous.onDone.listen((_) { |
+ if (!isDirty) _onDoneController.add(null); |
+ }); |
} |
- onStatusChange.listen((status) { |
- if (status == NodeStatus.RUNNING) return; |
- |
- // All the previous phases have finished declaring or producing their |
- // outputs. If anyone's still waiting for outputs, cut off the wait; we |
- // won't be generating them, at least until a source asset changes. |
+ onDone.listen((_) { |
+ // All the previous phases have finished building. If anyone's still |
+ // waiting for outputs, cut off the wait; we won't be generating them, |
+ // at least until a source asset changes. |
for (var completer in _pendingOutputRequests.values) { |
completer.complete(null); |
} |
@@ -162,10 +168,14 @@ class Phase { |
/// removed and re-created. The phase will automatically handle updated assets |
/// using the [AssetNode.onStateChange] stream. |
void addInput(AssetNode node) { |
+ if (_inputs.containsKey(node.id)) _inputs[node.id].remove(); |
+ |
+ node.force(); |
+ |
// Each group is one channel along which an asset may be forwarded, as is |
// each transformer. |
var forwarder = new PhaseForwarder( |
- node, _classifiers.length, _groups.length); |
+ node, _transformers.length, _groups.length); |
_forwarders[node.id] = forwarder; |
forwarder.onAsset.listen(_handleOutputWithoutForwarder); |
if (forwarder.output != null) { |
@@ -173,17 +183,24 @@ class Phase { |
} |
_inputOrigins.add(node.origin); |
- _inputs.add(node); |
- node.onStateChange.listen((state) { |
- if (state.isRemoved) { |
- _inputOrigins.remove(node.origin); |
- _forwarders.remove(node.id).remove(); |
- } |
- _streams.changeStatus(status); |
+ var input = new PhaseInput(this, node, "$_location.$_index"); |
+ _inputs[node.id] = input; |
+ input.input.whenRemoved(() { |
+ _inputOrigins.remove(node.origin); |
+ _inputs.remove(node.id); |
+ _forwarders.remove(node.id).remove(); |
+ if (!isDirty) _onDoneController.add(null); |
+ }); |
+ input.onAsset.listen(_handleOutput); |
+ _onLogPool.add(input.onLog); |
+ input.onDone.listen((_) { |
+ if (!isDirty) _onDoneController.add(null); |
}); |
- for (var classifier in _classifiers.values) { |
- classifier.addInput(node); |
+ input.updateTransformers(_transformers); |
+ |
+ for (var group in _groups.values) { |
+ group.addInput(node); |
} |
} |
@@ -219,9 +236,9 @@ class Phase { |
}); |
} |
- // If this phase and the previous phases are fully declared or done, the |
- // requested output won't be generated and we can safely return null. |
- if (status != NodeStatus.RUNNING) return null; |
+ // If neither this phase nor the previous phases are dirty, the requested |
+ // output won't be generated and we can safely return null. |
+ if (!isDirty) return null; |
// Otherwise, store a completer for the asset node. If it's generated in |
// the future, we'll complete this completer. |
@@ -233,23 +250,11 @@ class Phase { |
/// Set this phase's transformers to [transformers]. |
void updateTransformers(Iterable transformers) { |
- var newTransformers = transformers.where((op) => op is Transformer) |
- .toSet(); |
- var oldTransformers = _classifiers.keys.toSet(); |
- for (var removed in oldTransformers.difference(newTransformers)) { |
- _classifiers.remove(removed).remove(); |
- } |
- |
- for (var transformer in newTransformers.difference(oldTransformers)) { |
- var classifier = new TransformerClassifier( |
- this, transformer, "$_location.$_index"); |
- _classifiers[transformer] = classifier; |
- classifier.onAsset.listen(_handleOutput); |
- _streams.onLogPool.add(classifier.onLog); |
- classifier.onStatusChange.listen((_) => _streams.changeStatus(status)); |
- for (var input in _inputs) { |
- classifier.addInput(input); |
- } |
+ var actualTransformers = transformers.where((op) => op is Transformer); |
+ _transformers.clear(); |
+ _transformers.addAll(actualTransformers); |
+ for (var input in _inputs.values) { |
+ input.updateTransformers(actualTransformers); |
} |
var newGroups = transformers.where((op) => op is TransformerGroup) |
@@ -260,46 +265,40 @@ class Phase { |
} |
for (var added in newGroups.difference(oldGroups)) { |
- var runner = new GroupRunner(previous, added, "$_location.$_index"); |
+ var runner = new GroupRunner(cascade, added, "$_location.$_index"); |
_groups[added] = runner; |
runner.onAsset.listen(_handleOutput); |
- _streams.onLogPool.add(runner.onLog); |
- runner.onStatusChange.listen((_) => _streams.changeStatus(status)); |
+ _onLogPool.add(runner.onLog); |
+ runner.onDone.listen((_) { |
+ if (!isDirty) _onDoneController.add(null); |
+ }); |
+ for (var input in _inputs.values) { |
+ runner.addInput(input.input); |
+ } |
} |
for (var forwarder in _forwarders.values) { |
- forwarder.updateTransformers(_classifiers.length, _groups.length); |
+ forwarder.updateTransformers(_transformers.length, _groups.length); |
} |
- |
- _streams.changeStatus(status); |
} |
/// Force all [LazyTransformer]s' transforms in this phase to begin producing |
/// concrete assets. |
void forceAllTransforms() { |
- for (var classifier in _classifiers.values) { |
- classifier.forceAllTransforms(); |
- } |
- |
for (var group in _groups.values) { |
group.forceAllTransforms(); |
} |
+ |
+ for (var input in _inputs.values) { |
+ input.forceAllTransforms(); |
+ } |
} |
/// Add a new phase after this one. |
/// |
- /// The new phase will have a location annotation describing its place in the |
- /// package graph. By default, this annotation will describe it as being |
- /// directly after [this]. If [location] is passed, though, it's described as |
- /// being the first phase in that location. |
- Phase addPhase([String location]) { |
- var index = 0; |
- if (location == null) { |
- location = _location; |
- index = _index + 1; |
- } |
- |
- var next = new Phase._(cascade, location, index, this); |
+ /// This may only be called on a phase with no phase following it. |
+ Phase addPhase() { |
+ var next = new Phase._(cascade, _location, _index + 1, this); |
for (var output in _outputs.values.toList()) { |
// Remove [output]'s listeners because now they should get the asset from |
// [next], rather than this phase. Any transforms consuming [output] will |
@@ -313,15 +312,16 @@ class Phase { |
/// |
/// This will remove all the phase's outputs. |
void remove() { |
- for (var classifier in _classifiers.values.toList()) { |
- classifier.remove(); |
+ for (var input in _inputs.values.toList()) { |
+ input.remove(); |
} |
for (var group in _groups.values) { |
group.remove(); |
} |
- _streams.close(); |
- if (_previousStatusSubscription != null) { |
- _previousStatusSubscription.cancel(); |
+ _onAssetController.close(); |
+ _onLogPool.close(); |
+ if (_previousOnDoneSubscription != null) { |
+ _previousOnDoneSubscription.cancel(); |
} |
if (_previousOnAssetSubscription != null) { |
_previousOnAssetSubscription.cancel(); |
@@ -358,7 +358,7 @@ class Phase { |
/// This should be called after [_handleOutput], so that collisions are |
/// resolved. |
void _emit(AssetNode asset) { |
- _streams.onAssetController.add(asset); |
+ _onAssetController.add(asset); |
_providePendingAsset(asset); |
} |