| Index: barback/lib/src/graph/phase.dart
|
| diff --git a/barback/lib/src/graph/phase.dart b/barback/lib/src/graph/phase.dart
|
| deleted file mode 100644
|
| index 4bfc5e19fd1a74decd504ee8e1f1b8aa3a9a0a19..0000000000000000000000000000000000000000
|
| --- a/barback/lib/src/graph/phase.dart
|
| +++ /dev/null
|
| @@ -1,396 +0,0 @@
|
| -// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
|
| -// for details. All rights reserved. Use of this source code is governed by a
|
| -// BSD-style license that can be found in the LICENSE file.
|
| -
|
| -library barback.graph.phase;
|
| -
|
| -import 'dart:async';
|
| -
|
| -import '../asset/asset_id.dart';
|
| -import '../asset/asset_node.dart';
|
| -import '../asset/asset_node_set.dart';
|
| -import '../errors.dart';
|
| -import '../log.dart';
|
| -import '../transformer/aggregate_transformer.dart';
|
| -import '../transformer/transformer.dart';
|
| -import '../transformer/transformer_group.dart';
|
| -import '../utils.dart';
|
| -import '../utils/multiset.dart';
|
| -import 'asset_cascade.dart';
|
| -import 'group_runner.dart';
|
| -import 'node_status.dart';
|
| -import 'node_streams.dart';
|
| -import 'phase_forwarder.dart';
|
| -import 'phase_output.dart';
|
| -import 'transformer_classifier.dart';
|
| -
|
| -/// One phase in the ordered series of transformations in an [AssetCascade].
|
| -///
|
| -/// Each phase can access outputs from previous phases and can in turn pass
|
| -/// outputs to later phases. Phases are processed strictly serially. All
|
| -/// transforms in a phase will be complete before moving on to the next phase.
|
| -/// Within a single phase, all transforms will be run in parallel.
|
| -///
|
| -/// Building can be interrupted between phases. For example, a source is added
|
| -/// which starts the background process. Sometime during, say, phase 2 (which
|
| -/// is running asynchronously) that source is modified. When the process queue
|
| -/// goes to advance to phase 3, it will see that modification and start the
|
| -/// waterfall from the beginning again.
|
| -class Phase {
|
| - /// The cascade that owns this phase.
|
| - final AssetCascade cascade;
|
| -
|
| - /// A string describing the location of [this] in the transformer graph.
|
| - final String _location;
|
| -
|
| - /// The index of [this] in its parent cascade or group.
|
| - final int _index;
|
| -
|
| - /// The groups for this phase.
|
| - final _groups = new Map<TransformerGroup, GroupRunner>();
|
| -
|
| - /// The inputs for this phase.
|
| - ///
|
| - /// For the first phase, these will be the source assets. For all other
|
| - /// phases, they will be the outputs from the previous phase.
|
| - final _inputs = new AssetNodeSet();
|
| -
|
| - /// The transformer classifiers for this phase.
|
| - ///
|
| - /// The keys can be either [Transformer]s or [AggregateTransformer]s.
|
| - final _classifiers = new Map<dynamic, TransformerClassifier>();
|
| -
|
| - /// The forwarders for this phase.
|
| - final _forwarders = new Map<AssetId, PhaseForwarder>();
|
| -
|
| - /// The outputs for this phase.
|
| - final _outputs = new Map<AssetId, PhaseOutput>();
|
| -
|
| - /// The set of all [AssetNode.origin] properties of the input assets for this
|
| - /// phase.
|
| - ///
|
| - /// This is used to determine which assets have been passed unmodified through
|
| - /// [_classifiers] or [_groups]. It's possible that a given asset was consumed
|
| - /// by a group and not an individual transformer, and so shouldn't be
|
| - /// forwarded through the phase as a whole.
|
| - ///
|
| - /// In order to detect whether an output has been forwarded through a group or
|
| - /// a classifier, we must be able to distinguish it from other outputs with
|
| - /// the same id. To do so, we check if its origin is in [_inputOrigins]. If
|
| - /// so, it's been forwarded unmodified.
|
| - final _inputOrigins = new Multiset<AssetNode>();
|
| -
|
| - /// The streams exposed by this phase.
|
| - final _streams = new NodeStreams();
|
| - Stream<NodeStatus> get onStatusChange => _streams.onStatusChange;
|
| - Stream<AssetNode> get onAsset => _streams.onAsset;
|
| - Stream<LogEntry> get onLog => _streams.onLog;
|
| -
|
| - /// How far along [this] is in processing its assets.
|
| - NodeStatus get status {
|
| - // Before any transformers are added, the phase should be dirty if and only
|
| - // if any input is dirty.
|
| - if (_classifiers.isEmpty && _groups.isEmpty && previous == null) {
|
| - return _inputs.any((input) => input.state.isDirty) ?
|
| - NodeStatus.RUNNING : NodeStatus.IDLE;
|
| - }
|
| -
|
| - var classifierStatus = NodeStatus.dirtiest(
|
| - _classifiers.values.map((classifier) => classifier.status));
|
| - var groupStatus = NodeStatus.dirtiest(
|
| - _groups.values.map((group) => group.status));
|
| - return (previous == null ? NodeStatus.IDLE : previous.status)
|
| - .dirtier(classifierStatus)
|
| - .dirtier(groupStatus);
|
| - }
|
| -
|
| - /// The previous phase in the cascade, or null if this is the first phase.
|
| - final Phase previous;
|
| -
|
| - /// The subscription to [previous]'s [onStatusChange] stream.
|
| - StreamSubscription _previousStatusSubscription;
|
| -
|
| - /// The subscription to [previous]'s [onAsset] stream.
|
| - StreamSubscription<AssetNode> _previousOnAssetSubscription;
|
| -
|
| - final _inputSubscriptions = new Set<StreamSubscription>();
|
| -
|
| - /// A map of asset ids to completers for [getInput] requests.
|
| - ///
|
| - /// If an asset node is requested before it's available, we put a completer in
|
| - /// this map to wait for the asset to be generated. If it's not generated, the
|
| - /// completer should complete to `null`.
|
| - final _pendingOutputRequests = new Map<AssetId, Completer<AssetNode>>();
|
| -
|
| - /// Returns all currently-available output assets for this phase.
|
| - Set<AssetNode> get availableOutputs {
|
| - return _outputs.values
|
| - .map((output) => output.output)
|
| - .where((node) => node.state.isAvailable)
|
| - .toSet();
|
| - }
|
| -
|
| - // TODO(nweiz): Rather than passing the cascade and the phase everywhere,
|
| - // create an interface that just exposes [getInput]. Emit errors via
|
| - // [AssetNode]s.
|
| - Phase(AssetCascade cascade, String location)
|
| - : this._(cascade, location, 0);
|
| -
|
| - Phase._(this.cascade, this._location, this._index, [this.previous]) {
|
| - if (previous != null) {
|
| - _previousOnAssetSubscription = previous.onAsset.listen(addInput);
|
| - _previousStatusSubscription = previous.onStatusChange
|
| - .listen((_) => _streams.changeStatus(status));
|
| - }
|
| -
|
| - onStatusChange.listen((status) {
|
| - if (status == NodeStatus.RUNNING) return;
|
| -
|
| - // All the previous phases have finished declaring or producing their
|
| - // outputs. If anyone's still waiting for outputs, cut off the wait; we
|
| - // won't be generating them, at least until a source asset changes.
|
| - for (var completer in _pendingOutputRequests.values) {
|
| - completer.complete(null);
|
| - }
|
| - _pendingOutputRequests.clear();
|
| - });
|
| - }
|
| -
|
| - /// Adds a new asset as an input for this phase.
|
| - ///
|
| - /// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase
|
| - /// will automatically begin determining which transforms can consume it as a
|
| - /// primary input. The transforms themselves won't be applied until [process]
|
| - /// is called, however.
|
| - ///
|
| - /// This should only be used for brand-new assets or assets that have been
|
| - /// removed and re-created. The phase will automatically handle updated assets
|
| - /// using the [AssetNode.onStateChange] stream.
|
| - void addInput(AssetNode node) {
|
| - // Each group is one channel along which an asset may be forwarded, as is
|
| - // each transformer.
|
| - var forwarder = new PhaseForwarder(
|
| - node, _classifiers.length, _groups.length);
|
| - _forwarders[node.id] = forwarder;
|
| - forwarder.onAsset.listen(_handleOutputWithoutForwarder);
|
| - if (forwarder.output != null) {
|
| - _handleOutputWithoutForwarder(forwarder.output);
|
| - }
|
| -
|
| - _inputOrigins.add(node.origin);
|
| - _inputs.add(node);
|
| - _inputSubscriptions.add(node.onStateChange.listen((state) {
|
| - if (state.isRemoved) {
|
| - _inputOrigins.remove(node.origin);
|
| - _forwarders.remove(node.id).remove();
|
| - }
|
| - _streams.changeStatus(status);
|
| - }));
|
| -
|
| - for (var classifier in _classifiers.values) {
|
| - classifier.addInput(node);
|
| - }
|
| - }
|
| -
|
| - // TODO(nweiz): If the output is available when this is called, it's
|
| - // theoretically possible for it to become unavailable between the call and
|
| - // the return. If it does so, it won't trigger the rebuilding process. To
|
| - // avoid this, we should have this and the methods it calls take explicit
|
| - // callbacks, as in [AssetNode.whenAvailable].
|
| - /// Gets the asset node for an output [id].
|
| - ///
|
| - /// If [id] is for a generated or transformed asset, this will wait until it
|
| - /// has been created and return it. This means that the returned asset will
|
| - /// always be [AssetState.AVAILABLE].
|
| - ///
|
| - /// If the output cannot be found, returns null.
|
| - Future<AssetNode> getOutput(AssetId id) {
|
| - return syncFuture(() {
|
| - if (id.package != cascade.package) return cascade.graph.getAssetNode(id);
|
| - if (_outputs.containsKey(id)) {
|
| - var output = _outputs[id].output;
|
| - // If the requested output is available, we can just return it.
|
| - if (output.state.isAvailable) return output;
|
| -
|
| - // If the requested output exists but isn't yet available, wait to see
|
| - // if it becomes available. If it's removed before becoming available,
|
| - // try again, since it could be generated again.
|
| - output.force();
|
| - return output.whenAvailable((_) {
|
| - return output;
|
| - }).catchError((error) {
|
| - if (error is! AssetNotFoundException) throw error;
|
| - return getOutput(id);
|
| - });
|
| - }
|
| -
|
| - // If this phase and the previous phases are fully declared or done, the
|
| - // requested output won't be generated and we can safely return null.
|
| - if (status != NodeStatus.RUNNING) return null;
|
| -
|
| - // Otherwise, store a completer for the asset node. If it's generated in
|
| - // the future, we'll complete this completer.
|
| - var completer = _pendingOutputRequests.putIfAbsent(id,
|
| - () => new Completer.sync());
|
| - return completer.future;
|
| - });
|
| - }
|
| -
|
| - /// Set this phase's transformers to [transformers].
|
| - void updateTransformers(Iterable transformers) {
|
| - var newTransformers = transformers
|
| - .where((op) => op is Transformer || op is AggregateTransformer)
|
| - .toSet();
|
| - var oldTransformers = _classifiers.keys.toSet();
|
| - for (var removed in oldTransformers.difference(newTransformers)) {
|
| - _classifiers.remove(removed).remove();
|
| - }
|
| -
|
| - for (var transformer in newTransformers.difference(oldTransformers)) {
|
| - var classifier = new TransformerClassifier(
|
| - this, transformer, "$_location.$_index");
|
| - _classifiers[transformer] = classifier;
|
| - classifier.onAsset.listen(_handleOutput);
|
| - _streams.onLogPool.add(classifier.onLog);
|
| - classifier.onStatusChange.listen((_) => _streams.changeStatus(status));
|
| - for (var input in _inputs) {
|
| - classifier.addInput(input);
|
| - }
|
| - }
|
| -
|
| - var newGroups = transformers.where((op) => op is TransformerGroup)
|
| - .toSet();
|
| - var oldGroups = _groups.keys.toSet();
|
| - for (var removed in oldGroups.difference(newGroups)) {
|
| - _groups.remove(removed).remove();
|
| - }
|
| -
|
| - for (var added in newGroups.difference(oldGroups)) {
|
| - var runner = new GroupRunner(previous, added, "$_location.$_index");
|
| - _groups[added] = runner;
|
| - runner.onAsset.listen(_handleOutput);
|
| - _streams.onLogPool.add(runner.onLog);
|
| - runner.onStatusChange.listen((_) => _streams.changeStatus(status));
|
| - }
|
| -
|
| - for (var forwarder in _forwarders.values) {
|
| - forwarder.updateTransformers(_classifiers.length, _groups.length);
|
| - }
|
| -
|
| - _streams.changeStatus(status);
|
| - }
|
| -
|
| - /// Force all [LazyTransformer]s' transforms in this phase to begin producing
|
| - /// concrete assets.
|
| - void forceAllTransforms() {
|
| - for (var classifier in _classifiers.values) {
|
| - classifier.forceAllTransforms();
|
| - }
|
| -
|
| - for (var group in _groups.values) {
|
| - group.forceAllTransforms();
|
| - }
|
| - }
|
| -
|
| - /// Add a new phase after this one.
|
| - ///
|
| - /// The new phase will have a location annotation describing its place in the
|
| - /// package graph. By default, this annotation will describe it as being
|
| - /// directly after [this]. If [location] is passed, though, it's described as
|
| - /// being the first phase in that location.
|
| - Phase addPhase([String location]) {
|
| - var index = 0;
|
| - if (location == null) {
|
| - location = _location;
|
| - index = _index + 1;
|
| - }
|
| -
|
| - var next = new Phase._(cascade, location, index, this);
|
| - for (var output in _outputs.values.toList()) {
|
| - // Remove [output]'s listeners because now they should get the asset from
|
| - // [next], rather than this phase. Any transforms consuming [output] will
|
| - // be re-run and will consume the output from the new final phase.
|
| - output.removeListeners();
|
| - }
|
| - return next;
|
| - }
|
| -
|
| - /// Mark this phase as removed.
|
| - ///
|
| - /// This will remove all the phase's outputs.
|
| - void remove() {
|
| - for (var classifier in _classifiers.values.toList()) {
|
| - classifier.remove();
|
| - }
|
| - for (var group in _groups.values) {
|
| - group.remove();
|
| - }
|
| - _streams.close();
|
| - for (var subscription in _inputSubscriptions) {
|
| - subscription.cancel();
|
| - }
|
| - if (_previousStatusSubscription != null) {
|
| - _previousStatusSubscription.cancel();
|
| - }
|
| - if (_previousOnAssetSubscription != null) {
|
| - _previousOnAssetSubscription.cancel();
|
| - }
|
| - }
|
| -
|
| - /// Add [asset] as an output of this phase.
|
| - void _handleOutput(AssetNode asset) {
|
| - if (_inputOrigins.contains(asset.origin)) {
|
| - _forwarders[asset.id].addIntermediateAsset(asset);
|
| - } else {
|
| - _handleOutputWithoutForwarder(asset);
|
| - }
|
| - }
|
| -
|
| - /// Add [asset] as an output of this phase without checking if it's a
|
| - /// forwarded asset.
|
| - void _handleOutputWithoutForwarder(AssetNode asset) {
|
| - if (_outputs.containsKey(asset.id)) {
|
| - _outputs[asset.id].add(asset);
|
| - } else {
|
| - _outputs[asset.id] = new PhaseOutput(this, asset, "$_location.$_index");
|
| - _outputs[asset.id].onAsset.listen(_emit,
|
| - onDone: () => _outputs.remove(asset.id));
|
| - _emit(_outputs[asset.id].output);
|
| - }
|
| -
|
| - var exception = _outputs[asset.id].collisionException;
|
| - if (exception != null) cascade.reportError(exception);
|
| - }
|
| -
|
| - /// Emit [asset] as an output of this phase.
|
| - ///
|
| - /// This should be called after [_handleOutput], so that collisions are
|
| - /// resolved.
|
| - void _emit(AssetNode asset) {
|
| - _streams.onAssetController.add(asset);
|
| - _providePendingAsset(asset);
|
| - }
|
| -
|
| - /// Provide an asset to a pending [getOutput] call.
|
| - void _providePendingAsset(AssetNode asset) {
|
| - // If anyone's waiting for this asset, provide it to them.
|
| - var request = _pendingOutputRequests.remove(asset.id);
|
| - if (request == null) return;
|
| -
|
| - if (asset.state.isAvailable) {
|
| - request.complete(asset);
|
| - return;
|
| - }
|
| -
|
| - // A lazy asset may be emitted while still dirty. If so, we wait until it's
|
| - // either available or removed before trying again to access it.
|
| - assert(asset.state.isDirty);
|
| - asset.force();
|
| - asset.whenStateChanges().then((state) {
|
| - if (state.isRemoved) return getOutput(asset.id);
|
| - return asset;
|
| - }).then(request.complete).catchError(request.completeError);
|
| - }
|
| -
|
| - String toString() => "phase $_location.$_index";
|
| -}
|
|
|