| Index: mojo/public/dart/third_party/barback/lib/src/graph/phase.dart
|
| diff --git a/mojo/public/dart/third_party/barback/lib/src/graph/phase.dart b/mojo/public/dart/third_party/barback/lib/src/graph/phase.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..4bfc5e19fd1a74decd504ee8e1f1b8aa3a9a0a19
|
| --- /dev/null
|
| +++ b/mojo/public/dart/third_party/barback/lib/src/graph/phase.dart
|
| @@ -0,0 +1,396 @@
|
| +// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +library barback.graph.phase;
|
| +
|
| +import 'dart:async';
|
| +
|
| +import '../asset/asset_id.dart';
|
| +import '../asset/asset_node.dart';
|
| +import '../asset/asset_node_set.dart';
|
| +import '../errors.dart';
|
| +import '../log.dart';
|
| +import '../transformer/aggregate_transformer.dart';
|
| +import '../transformer/transformer.dart';
|
| +import '../transformer/transformer_group.dart';
|
| +import '../utils.dart';
|
| +import '../utils/multiset.dart';
|
| +import 'asset_cascade.dart';
|
| +import 'group_runner.dart';
|
| +import 'node_status.dart';
|
| +import 'node_streams.dart';
|
| +import 'phase_forwarder.dart';
|
| +import 'phase_output.dart';
|
| +import 'transformer_classifier.dart';
|
| +
|
| +/// One phase in the ordered series of transformations in an [AssetCascade].
|
| +///
|
| +/// Each phase can access outputs from previous phases and can in turn pass
|
| +/// outputs to later phases. Phases are processed strictly serially. All
|
| +/// transforms in a phase will be complete before moving on to the next phase.
|
| +/// Within a single phase, all transforms will be run in parallel.
|
| +///
|
| +/// Building can be interrupted between phases. For example, a source is added
|
| +/// which starts the background process. Sometime during, say, phase 2 (which
|
| +/// is running asynchronously) that source is modified. When the process queue
|
| +/// goes to advance to phase 3, it will see that modification and start the
|
| +/// waterfall from the beginning again.
|
| +class Phase {
|
| + /// The cascade that owns this phase.
|
| + final AssetCascade cascade;
|
| +
|
| + /// A string describing the location of [this] in the transformer graph.
|
| + final String _location;
|
| +
|
| + /// The index of [this] in its parent cascade or group.
|
| + final int _index;
|
| +
|
| + /// The groups for this phase.
|
| + final _groups = new Map<TransformerGroup, GroupRunner>();
|
| +
|
| + /// The inputs for this phase.
|
| + ///
|
| + /// For the first phase, these will be the source assets. For all other
|
| + /// phases, they will be the outputs from the previous phase.
|
| + final _inputs = new AssetNodeSet();
|
| +
|
| + /// The transformer classifiers for this phase.
|
| + ///
|
| + /// The keys can be either [Transformer]s or [AggregateTransformer]s.
|
| + final _classifiers = new Map<dynamic, TransformerClassifier>();
|
| +
|
| + /// The forwarders for this phase.
|
| + final _forwarders = new Map<AssetId, PhaseForwarder>();
|
| +
|
| + /// The outputs for this phase.
|
| + final _outputs = new Map<AssetId, PhaseOutput>();
|
| +
|
| + /// The set of all [AssetNode.origin] properties of the input assets for this
|
| + /// phase.
|
| + ///
|
| + /// This is used to determine which assets have been passed unmodified through
|
| + /// [_classifiers] or [_groups]. It's possible that a given asset was consumed
|
| + /// by a group and not an individual transformer, and so shouldn't be
|
| + /// forwarded through the phase as a whole.
|
| + ///
|
| + /// In order to detect whether an output has been forwarded through a group or
|
| + /// a classifier, we must be able to distinguish it from other outputs with
|
| + /// the same id. To do so, we check if its origin is in [_inputOrigins]. If
|
| + /// so, it's been forwarded unmodified.
|
| + final _inputOrigins = new Multiset<AssetNode>();
|
| +
|
| + /// The streams exposed by this phase.
|
| + final _streams = new NodeStreams();
|
| + Stream<NodeStatus> get onStatusChange => _streams.onStatusChange;
|
| + Stream<AssetNode> get onAsset => _streams.onAsset;
|
| + Stream<LogEntry> get onLog => _streams.onLog;
|
| +
|
| + /// How far along [this] is in processing its assets.
|
| + NodeStatus get status {
|
| + // Before any transformers are added, the phase should be dirty if and only
|
| + // if any input is dirty.
|
| + if (_classifiers.isEmpty && _groups.isEmpty && previous == null) {
|
| + return _inputs.any((input) => input.state.isDirty) ?
|
| + NodeStatus.RUNNING : NodeStatus.IDLE;
|
| + }
|
| +
|
| + var classifierStatus = NodeStatus.dirtiest(
|
| + _classifiers.values.map((classifier) => classifier.status));
|
| + var groupStatus = NodeStatus.dirtiest(
|
| + _groups.values.map((group) => group.status));
|
| + return (previous == null ? NodeStatus.IDLE : previous.status)
|
| + .dirtier(classifierStatus)
|
| + .dirtier(groupStatus);
|
| + }
|
| +
|
| + /// The previous phase in the cascade, or null if this is the first phase.
|
| + final Phase previous;
|
| +
|
| + /// The subscription to [previous]'s [onStatusChange] stream.
|
| + StreamSubscription _previousStatusSubscription;
|
| +
|
| + /// The subscription to [previous]'s [onAsset] stream.
|
| + StreamSubscription<AssetNode> _previousOnAssetSubscription;
|
| +
|
| + final _inputSubscriptions = new Set<StreamSubscription>();
|
| +
|
| + /// A map of asset ids to completers for [getInput] requests.
|
| + ///
|
| + /// If an asset node is requested before it's available, we put a completer in
|
| + /// this map to wait for the asset to be generated. If it's not generated, the
|
| + /// completer should complete to `null`.
|
| + final _pendingOutputRequests = new Map<AssetId, Completer<AssetNode>>();
|
| +
|
| + /// Returns all currently-available output assets for this phase.
|
| + Set<AssetNode> get availableOutputs {
|
| + return _outputs.values
|
| + .map((output) => output.output)
|
| + .where((node) => node.state.isAvailable)
|
| + .toSet();
|
| + }
|
| +
|
| + // TODO(nweiz): Rather than passing the cascade and the phase everywhere,
|
| + // create an interface that just exposes [getInput]. Emit errors via
|
| + // [AssetNode]s.
|
| + Phase(AssetCascade cascade, String location)
|
| + : this._(cascade, location, 0);
|
| +
|
| + Phase._(this.cascade, this._location, this._index, [this.previous]) {
|
| + if (previous != null) {
|
| + _previousOnAssetSubscription = previous.onAsset.listen(addInput);
|
| + _previousStatusSubscription = previous.onStatusChange
|
| + .listen((_) => _streams.changeStatus(status));
|
| + }
|
| +
|
| + onStatusChange.listen((status) {
|
| + if (status == NodeStatus.RUNNING) return;
|
| +
|
| + // All the previous phases have finished declaring or producing their
|
| + // outputs. If anyone's still waiting for outputs, cut off the wait; we
|
| + // won't be generating them, at least until a source asset changes.
|
| + for (var completer in _pendingOutputRequests.values) {
|
| + completer.complete(null);
|
| + }
|
| + _pendingOutputRequests.clear();
|
| + });
|
| + }
|
| +
|
| + /// Adds a new asset as an input for this phase.
|
| + ///
|
| + /// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase
|
| + /// will automatically begin determining which transforms can consume it as a
|
| + /// primary input. The transforms themselves won't be applied until [process]
|
| + /// is called, however.
|
| + ///
|
| + /// This should only be used for brand-new assets or assets that have been
|
| + /// removed and re-created. The phase will automatically handle updated assets
|
| + /// using the [AssetNode.onStateChange] stream.
|
| + void addInput(AssetNode node) {
|
| + // Each group is one channel along which an asset may be forwarded, as is
|
| + // each transformer.
|
| + var forwarder = new PhaseForwarder(
|
| + node, _classifiers.length, _groups.length);
|
| + _forwarders[node.id] = forwarder;
|
| + forwarder.onAsset.listen(_handleOutputWithoutForwarder);
|
| + if (forwarder.output != null) {
|
| + _handleOutputWithoutForwarder(forwarder.output);
|
| + }
|
| +
|
| + _inputOrigins.add(node.origin);
|
| + _inputs.add(node);
|
| + _inputSubscriptions.add(node.onStateChange.listen((state) {
|
| + if (state.isRemoved) {
|
| + _inputOrigins.remove(node.origin);
|
| + _forwarders.remove(node.id).remove();
|
| + }
|
| + _streams.changeStatus(status);
|
| + }));
|
| +
|
| + for (var classifier in _classifiers.values) {
|
| + classifier.addInput(node);
|
| + }
|
| + }
|
| +
|
| + // TODO(nweiz): If the output is available when this is called, it's
|
| + // theoretically possible for it to become unavailable between the call and
|
| + // the return. If it does so, it won't trigger the rebuilding process. To
|
| + // avoid this, we should have this and the methods it calls take explicit
|
| + // callbacks, as in [AssetNode.whenAvailable].
|
| + /// Gets the asset node for an output [id].
|
| + ///
|
| + /// If [id] is for a generated or transformed asset, this will wait until it
|
| + /// has been created and return it. This means that the returned asset will
|
| + /// always be [AssetState.AVAILABLE].
|
| + ///
|
| + /// If the output cannot be found, returns null.
|
| + Future<AssetNode> getOutput(AssetId id) {
|
| + return syncFuture(() {
|
| + if (id.package != cascade.package) return cascade.graph.getAssetNode(id);
|
| + if (_outputs.containsKey(id)) {
|
| + var output = _outputs[id].output;
|
| + // If the requested output is available, we can just return it.
|
| + if (output.state.isAvailable) return output;
|
| +
|
| + // If the requested output exists but isn't yet available, wait to see
|
| + // if it becomes available. If it's removed before becoming available,
|
| + // try again, since it could be generated again.
|
| + output.force();
|
| + return output.whenAvailable((_) {
|
| + return output;
|
| + }).catchError((error) {
|
| + if (error is! AssetNotFoundException) throw error;
|
| + return getOutput(id);
|
| + });
|
| + }
|
| +
|
| + // If this phase and the previous phases are fully declared or done, the
|
| + // requested output won't be generated and we can safely return null.
|
| + if (status != NodeStatus.RUNNING) return null;
|
| +
|
| + // Otherwise, store a completer for the asset node. If it's generated in
|
| + // the future, we'll complete this completer.
|
| + var completer = _pendingOutputRequests.putIfAbsent(id,
|
| + () => new Completer.sync());
|
| + return completer.future;
|
| + });
|
| + }
|
| +
|
| + /// Set this phase's transformers to [transformers].
|
| + void updateTransformers(Iterable transformers) {
|
| + var newTransformers = transformers
|
| + .where((op) => op is Transformer || op is AggregateTransformer)
|
| + .toSet();
|
| + var oldTransformers = _classifiers.keys.toSet();
|
| + for (var removed in oldTransformers.difference(newTransformers)) {
|
| + _classifiers.remove(removed).remove();
|
| + }
|
| +
|
| + for (var transformer in newTransformers.difference(oldTransformers)) {
|
| + var classifier = new TransformerClassifier(
|
| + this, transformer, "$_location.$_index");
|
| + _classifiers[transformer] = classifier;
|
| + classifier.onAsset.listen(_handleOutput);
|
| + _streams.onLogPool.add(classifier.onLog);
|
| + classifier.onStatusChange.listen((_) => _streams.changeStatus(status));
|
| + for (var input in _inputs) {
|
| + classifier.addInput(input);
|
| + }
|
| + }
|
| +
|
| + var newGroups = transformers.where((op) => op is TransformerGroup)
|
| + .toSet();
|
| + var oldGroups = _groups.keys.toSet();
|
| + for (var removed in oldGroups.difference(newGroups)) {
|
| + _groups.remove(removed).remove();
|
| + }
|
| +
|
| + for (var added in newGroups.difference(oldGroups)) {
|
| + var runner = new GroupRunner(previous, added, "$_location.$_index");
|
| + _groups[added] = runner;
|
| + runner.onAsset.listen(_handleOutput);
|
| + _streams.onLogPool.add(runner.onLog);
|
| + runner.onStatusChange.listen((_) => _streams.changeStatus(status));
|
| + }
|
| +
|
| + for (var forwarder in _forwarders.values) {
|
| + forwarder.updateTransformers(_classifiers.length, _groups.length);
|
| + }
|
| +
|
| + _streams.changeStatus(status);
|
| + }
|
| +
|
| + /// Force all [LazyTransformer]s' transforms in this phase to begin producing
|
| + /// concrete assets.
|
| + void forceAllTransforms() {
|
| + for (var classifier in _classifiers.values) {
|
| + classifier.forceAllTransforms();
|
| + }
|
| +
|
| + for (var group in _groups.values) {
|
| + group.forceAllTransforms();
|
| + }
|
| + }
|
| +
|
| + /// Add a new phase after this one.
|
| + ///
|
| + /// The new phase will have a location annotation describing its place in the
|
| + /// package graph. By default, this annotation will describe it as being
|
| + /// directly after [this]. If [location] is passed, though, it's described as
|
| + /// being the first phase in that location.
|
| + Phase addPhase([String location]) {
|
| + var index = 0;
|
| + if (location == null) {
|
| + location = _location;
|
| + index = _index + 1;
|
| + }
|
| +
|
| + var next = new Phase._(cascade, location, index, this);
|
| + for (var output in _outputs.values.toList()) {
|
| + // Remove [output]'s listeners because now they should get the asset from
|
| + // [next], rather than this phase. Any transforms consuming [output] will
|
| + // be re-run and will consume the output from the new final phase.
|
| + output.removeListeners();
|
| + }
|
| + return next;
|
| + }
|
| +
|
| + /// Mark this phase as removed.
|
| + ///
|
| + /// This will remove all the phase's outputs.
|
| + void remove() {
|
| + for (var classifier in _classifiers.values.toList()) {
|
| + classifier.remove();
|
| + }
|
| + for (var group in _groups.values) {
|
| + group.remove();
|
| + }
|
| + _streams.close();
|
| + for (var subscription in _inputSubscriptions) {
|
| + subscription.cancel();
|
| + }
|
| + if (_previousStatusSubscription != null) {
|
| + _previousStatusSubscription.cancel();
|
| + }
|
| + if (_previousOnAssetSubscription != null) {
|
| + _previousOnAssetSubscription.cancel();
|
| + }
|
| + }
|
| +
|
| + /// Add [asset] as an output of this phase.
|
| + void _handleOutput(AssetNode asset) {
|
| + if (_inputOrigins.contains(asset.origin)) {
|
| + _forwarders[asset.id].addIntermediateAsset(asset);
|
| + } else {
|
| + _handleOutputWithoutForwarder(asset);
|
| + }
|
| + }
|
| +
|
| + /// Add [asset] as an output of this phase without checking if it's a
|
| + /// forwarded asset.
|
| + void _handleOutputWithoutForwarder(AssetNode asset) {
|
| + if (_outputs.containsKey(asset.id)) {
|
| + _outputs[asset.id].add(asset);
|
| + } else {
|
| + _outputs[asset.id] = new PhaseOutput(this, asset, "$_location.$_index");
|
| + _outputs[asset.id].onAsset.listen(_emit,
|
| + onDone: () => _outputs.remove(asset.id));
|
| + _emit(_outputs[asset.id].output);
|
| + }
|
| +
|
| + var exception = _outputs[asset.id].collisionException;
|
| + if (exception != null) cascade.reportError(exception);
|
| + }
|
| +
|
| + /// Emit [asset] as an output of this phase.
|
| + ///
|
| + /// This should be called after [_handleOutput], so that collisions are
|
| + /// resolved.
|
| + void _emit(AssetNode asset) {
|
| + _streams.onAssetController.add(asset);
|
| + _providePendingAsset(asset);
|
| + }
|
| +
|
| + /// Provide an asset to a pending [getOutput] call.
|
| + void _providePendingAsset(AssetNode asset) {
|
| + // If anyone's waiting for this asset, provide it to them.
|
| + var request = _pendingOutputRequests.remove(asset.id);
|
| + if (request == null) return;
|
| +
|
| + if (asset.state.isAvailable) {
|
| + request.complete(asset);
|
| + return;
|
| + }
|
| +
|
| + // A lazy asset may be emitted while still dirty. If so, we wait until it's
|
| + // either available or removed before trying again to access it.
|
| + assert(asset.state.isDirty);
|
| + asset.force();
|
| + asset.whenStateChanges().then((state) {
|
| + if (state.isRemoved) return getOutput(asset.id);
|
| + return asset;
|
| + }).then(request.complete).catchError(request.completeError);
|
| + }
|
| +
|
| + String toString() => "phase $_location.$_index";
|
| +}
|
|
|