Chromium Code Reviews

Unified Diff: third_party/pkg/barback-0.13.0/lib/src/phase.dart

Issue 291843011: Run pub tests against older versions of barback. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: code review Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View side-by-side diff with in-line comments
Index: third_party/pkg/barback-0.13.0/lib/src/phase.dart
diff --git a/pkg/barback/lib/src/graph/phase.dart b/third_party/pkg/barback-0.13.0/lib/src/phase.dart
similarity index 66%
copy from pkg/barback/lib/src/graph/phase.dart
copy to third_party/pkg/barback-0.13.0/lib/src/phase.dart
index a4724ab20e22f0ef3c0c2f4aff84f96842fc272a..aa7dc78ebd01acad469e83de0af134724c6b74ca 100644
--- a/pkg/barback/lib/src/graph/phase.dart
+++ b/third_party/pkg/barback-0.13.0/lib/src/phase.dart
@@ -2,26 +2,24 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-library barback.graph.phase;
+library barback.phase;
import 'dart:async';
-import '../asset/asset_id.dart';
-import '../asset/asset_node.dart';
-import '../asset/asset_node_set.dart';
-import '../errors.dart';
-import '../log.dart';
-import '../transformer/transformer.dart';
-import '../transformer/transformer_group.dart';
-import '../utils.dart';
-import '../utils/multiset.dart';
import 'asset_cascade.dart';
+import 'asset_id.dart';
+import 'asset_node.dart';
+import 'errors.dart';
import 'group_runner.dart';
-import 'node_status.dart';
-import 'node_streams.dart';
+import 'log.dart';
+import 'multiset.dart';
import 'phase_forwarder.dart';
+import 'phase_input.dart';
import 'phase_output.dart';
-import 'transformer_classifier.dart';
+import 'stream_pool.dart';
+import 'transformer.dart';
+import 'transformer_group.dart';
+import 'utils.dart';
/// One phase in the ordered series of transformations in an [AssetCascade].
///
@@ -45,6 +43,11 @@ class Phase {
/// The index of [this] in its parent cascade or group.
final int _index;
+ /// The transformers that can access [inputs].
+ ///
+ /// Their outputs will be available to the next phase.
+ final _transformers = new Set<Transformer>();
+
/// The groups for this phase.
final _groups = new Map<TransformerGroup, GroupRunner>();
@@ -52,10 +55,7 @@ class Phase {
///
/// For the first phase, these will be the source assets. For all other
/// phases, they will be the outputs from the previous phase.
- final _inputs = new AssetNodeSet();
-
- /// The transformer classifiers for this phase.
- final _classifiers = new Map<Transformer, TransformerClassifier>();
+ final _inputs = new Map<AssetId, PhaseInput>();
/// The forwarders for this phase.
final _forwarders = new Map<AssetId, PhaseForwarder>();
@@ -67,45 +67,52 @@ class Phase {
/// phase.
///
/// This is used to determine which assets have been passed unmodified through
- /// [_classifiers] or [_groups]. It's possible that a given asset was consumed
- /// by a group and not an individual transformer, and so shouldn't be
- /// forwarded through the phase as a whole.
+ /// [_inputs] or [_groups]. Each input asset has a PhaseInput in [_inputs]. If
+ /// that input isn't consumed by any transformers, it will be forwarded
+ /// through the PhaseInput. However, it's possible that it was consumed by a
+ /// group, and so shouldn't be forwarded through the phase as a whole.
///
/// In order to detect whether an output has been forwarded through a group or
- /// a classifier, we must be able to distinguish it from other outputs with
+ /// a PhaseInput, we must be able to distinguish it from other outputs with
/// the same id. To do so, we check if its origin is in [_inputOrigins]. If
/// so, it's been forwarded unmodified.
final _inputOrigins = new Multiset<AssetNode>();
- /// The streams exposed by this phase.
- final _streams = new NodeStreams();
- Stream<NodeStatus> get onStatusChange => _streams.onStatusChange;
- Stream<AssetNode> get onAsset => _streams.onAsset;
- Stream<LogEntry> get onLog => _streams.onLog;
-
- /// How far along [this] is in processing its assets.
- NodeStatus get status {
- // Before any transformers are added, the phase should be dirty if and only
- // if any input is dirty.
- if (_classifiers.isEmpty && _groups.isEmpty && previous == null) {
- return _inputs.any((input) => input.state.isDirty) ?
- NodeStatus.RUNNING : NodeStatus.IDLE;
- }
+ /// A stream that emits an event whenever [this] is no longer dirty.
+ ///
+ /// This is synchronous in order to guarantee that it will emit an event as
+ /// soon as [isDirty] flips from `true` to `false`.
+ Stream get onDone => _onDoneController.stream;
+ final _onDoneController = new StreamController.broadcast(sync: true);
- var classifierStatus = NodeStatus.dirtiest(
- _classifiers.values.map((classifier) => classifier.status));
- var groupStatus = NodeStatus.dirtiest(
- _groups.values.map((group) => group.status));
- return (previous == null ? NodeStatus.IDLE : previous.status)
- .dirtier(classifierStatus)
- .dirtier(groupStatus);
- }
+ /// A stream that emits any new assets emitted by [this].
+ ///
+ /// Assets are emitted synchronously to ensure that any changes are thoroughly
+ /// propagated as soon as they occur. Only a phase with no [next] phase will
+ /// emit assets.
+ Stream<AssetNode> get onAsset => _onAssetController.stream;
+ final _onAssetController =
+ new StreamController<AssetNode>.broadcast(sync: true);
+
+ /// Whether [this] is dirty and still has more processing to do.
+ ///
+ /// A phase is considered dirty if any of the previous phases in the same
+ /// cascade are dirty, since those phases could emit an asset that this phase
+ /// will then need to process.
+ bool get isDirty => (previous != null && previous.isDirty) ||
+ _inputs.values.any((input) => input.isDirty) ||
+ _groups.values.any((group) => group.isDirty);
+
+ /// A stream that emits an event whenever any transforms in this phase logs
+ /// an entry.
+ Stream<LogEntry> get onLog => _onLogPool.stream;
+ final _onLogPool = new StreamPool<LogEntry>.broadcast();
/// The previous phase in the cascade, or null if this is the first phase.
final Phase previous;
- /// The subscription to [previous]'s [onStatusChange] stream.
- StreamSubscription _previousStatusSubscription;
+ /// The subscription to [previous]'s [onDone] stream.
+ StreamSubscription _previousOnDoneSubscription;
/// The subscription to [previous]'s [onAsset] stream.
StreamSubscription<AssetNode> _previousOnAssetSubscription;
@@ -134,16 +141,15 @@ class Phase {
Phase._(this.cascade, this._location, this._index, [this.previous]) {
if (previous != null) {
_previousOnAssetSubscription = previous.onAsset.listen(addInput);
- _previousStatusSubscription = previous.onStatusChange
- .listen((_) => _streams.changeStatus(status));
+ _previousOnDoneSubscription = previous.onDone.listen((_) {
+ if (!isDirty) _onDoneController.add(null);
+ });
}
- onStatusChange.listen((status) {
- if (status == NodeStatus.RUNNING) return;
-
- // All the previous phases have finished declaring or producing their
- // outputs. If anyone's still waiting for outputs, cut off the wait; we
- // won't be generating them, at least until a source asset changes.
+ onDone.listen((_) {
+ // All the previous phases have finished building. If anyone's still
+ // waiting for outputs, cut off the wait; we won't be generating them,
+ // at least until a source asset changes.
for (var completer in _pendingOutputRequests.values) {
completer.complete(null);
}
@@ -162,10 +168,14 @@ class Phase {
/// removed and re-created. The phase will automatically handle updated assets
/// using the [AssetNode.onStateChange] stream.
void addInput(AssetNode node) {
+ if (_inputs.containsKey(node.id)) _inputs[node.id].remove();
+
+ node.force();
+
// Each group is one channel along which an asset may be forwarded, as is
// each transformer.
var forwarder = new PhaseForwarder(
- node, _classifiers.length, _groups.length);
+ node, _transformers.length, _groups.length);
_forwarders[node.id] = forwarder;
forwarder.onAsset.listen(_handleOutputWithoutForwarder);
if (forwarder.output != null) {
@@ -173,17 +183,24 @@ class Phase {
}
_inputOrigins.add(node.origin);
- _inputs.add(node);
- node.onStateChange.listen((state) {
- if (state.isRemoved) {
- _inputOrigins.remove(node.origin);
- _forwarders.remove(node.id).remove();
- }
- _streams.changeStatus(status);
+ var input = new PhaseInput(this, node, "$_location.$_index");
+ _inputs[node.id] = input;
+ input.input.whenRemoved(() {
+ _inputOrigins.remove(node.origin);
+ _inputs.remove(node.id);
+ _forwarders.remove(node.id).remove();
+ if (!isDirty) _onDoneController.add(null);
+ });
+ input.onAsset.listen(_handleOutput);
+ _onLogPool.add(input.onLog);
+ input.onDone.listen((_) {
+ if (!isDirty) _onDoneController.add(null);
});
- for (var classifier in _classifiers.values) {
- classifier.addInput(node);
+ input.updateTransformers(_transformers);
+
+ for (var group in _groups.values) {
+ group.addInput(node);
}
}
@@ -219,9 +236,9 @@ class Phase {
});
}
- // If this phase and the previous phases are fully declared or done, the
- // requested output won't be generated and we can safely return null.
- if (status != NodeStatus.RUNNING) return null;
+ // If neither this phase nor the previous phases are dirty, the requested
+ // output won't be generated and we can safely return null.
+ if (!isDirty) return null;
// Otherwise, store a completer for the asset node. If it's generated in
// the future, we'll complete this completer.
@@ -233,23 +250,11 @@ class Phase {
/// Set this phase's transformers to [transformers].
void updateTransformers(Iterable transformers) {
- var newTransformers = transformers.where((op) => op is Transformer)
- .toSet();
- var oldTransformers = _classifiers.keys.toSet();
- for (var removed in oldTransformers.difference(newTransformers)) {
- _classifiers.remove(removed).remove();
- }
-
- for (var transformer in newTransformers.difference(oldTransformers)) {
- var classifier = new TransformerClassifier(
- this, transformer, "$_location.$_index");
- _classifiers[transformer] = classifier;
- classifier.onAsset.listen(_handleOutput);
- _streams.onLogPool.add(classifier.onLog);
- classifier.onStatusChange.listen((_) => _streams.changeStatus(status));
- for (var input in _inputs) {
- classifier.addInput(input);
- }
+ var actualTransformers = transformers.where((op) => op is Transformer);
+ _transformers.clear();
+ _transformers.addAll(actualTransformers);
+ for (var input in _inputs.values) {
+ input.updateTransformers(actualTransformers);
}
var newGroups = transformers.where((op) => op is TransformerGroup)
@@ -260,46 +265,40 @@ class Phase {
}
for (var added in newGroups.difference(oldGroups)) {
- var runner = new GroupRunner(previous, added, "$_location.$_index");
+ var runner = new GroupRunner(cascade, added, "$_location.$_index");
_groups[added] = runner;
runner.onAsset.listen(_handleOutput);
- _streams.onLogPool.add(runner.onLog);
- runner.onStatusChange.listen((_) => _streams.changeStatus(status));
+ _onLogPool.add(runner.onLog);
+ runner.onDone.listen((_) {
+ if (!isDirty) _onDoneController.add(null);
+ });
+ for (var input in _inputs.values) {
+ runner.addInput(input.input);
+ }
}
for (var forwarder in _forwarders.values) {
- forwarder.updateTransformers(_classifiers.length, _groups.length);
+ forwarder.updateTransformers(_transformers.length, _groups.length);
}
-
- _streams.changeStatus(status);
}
/// Force all [LazyTransformer]s' transforms in this phase to begin producing
/// concrete assets.
void forceAllTransforms() {
- for (var classifier in _classifiers.values) {
- classifier.forceAllTransforms();
- }
-
for (var group in _groups.values) {
group.forceAllTransforms();
}
+
+ for (var input in _inputs.values) {
+ input.forceAllTransforms();
+ }
}
/// Add a new phase after this one.
///
- /// The new phase will have a location annotation describing its place in the
- /// package graph. By default, this annotation will describe it as being
- /// directly after [this]. If [location] is passed, though, it's described as
- /// being the first phase in that location.
- Phase addPhase([String location]) {
- var index = 0;
- if (location == null) {
- location = _location;
- index = _index + 1;
- }
-
- var next = new Phase._(cascade, location, index, this);
+ /// This may only be called on a phase with no phase following it.
+ Phase addPhase() {
+ var next = new Phase._(cascade, _location, _index + 1, this);
for (var output in _outputs.values.toList()) {
// Remove [output]'s listeners because now they should get the asset from
// [next], rather than this phase. Any transforms consuming [output] will
@@ -313,15 +312,16 @@ class Phase {
///
/// This will remove all the phase's outputs.
void remove() {
- for (var classifier in _classifiers.values.toList()) {
- classifier.remove();
+ for (var input in _inputs.values.toList()) {
+ input.remove();
}
for (var group in _groups.values) {
group.remove();
}
- _streams.close();
- if (_previousStatusSubscription != null) {
- _previousStatusSubscription.cancel();
+ _onAssetController.close();
+ _onLogPool.close();
+ if (_previousOnDoneSubscription != null) {
+ _previousOnDoneSubscription.cancel();
}
if (_previousOnAssetSubscription != null) {
_previousOnAssetSubscription.cancel();
@@ -358,7 +358,7 @@ class Phase {
/// This should be called after [_handleOutput], so that collisions are
/// resolved.
void _emit(AssetNode asset) {
- _streams.onAssetController.add(asset);
+ _onAssetController.add(asset);
_providePendingAsset(asset);
}

Powered by Google App Engine