OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library barback.graph.phase; |
| 6 |
| 7 import 'dart:async'; |
| 8 |
| 9 import '../asset/asset_id.dart'; |
| 10 import '../asset/asset_node.dart'; |
| 11 import '../asset/asset_node_set.dart'; |
| 12 import '../errors.dart'; |
| 13 import '../log.dart'; |
| 14 import '../transformer/aggregate_transformer.dart'; |
| 15 import '../transformer/transformer.dart'; |
| 16 import '../transformer/transformer_group.dart'; |
| 17 import '../utils.dart'; |
| 18 import '../utils/multiset.dart'; |
| 19 import 'asset_cascade.dart'; |
| 20 import 'group_runner.dart'; |
| 21 import 'node_status.dart'; |
| 22 import 'node_streams.dart'; |
| 23 import 'phase_forwarder.dart'; |
| 24 import 'phase_output.dart'; |
| 25 import 'transformer_classifier.dart'; |
| 26 |
| 27 /// One phase in the ordered series of transformations in an [AssetCascade]. |
| 28 /// |
| 29 /// Each phase can access outputs from previous phases and can in turn pass |
| 30 /// outputs to later phases. Phases are processed strictly serially. All |
| 31 /// transforms in a phase will be complete before moving on to the next phase. |
| 32 /// Within a single phase, all transforms will be run in parallel. |
| 33 /// |
| 34 /// Building can be interrupted between phases. For example, a source is added |
| 35 /// which starts the background process. Sometime during, say, phase 2 (which |
| 36 /// is running asynchronously) that source is modified. When the process queue |
| 37 /// goes to advance to phase 3, it will see that modification and start the |
| 38 /// waterfall from the beginning again. |
| 39 class Phase { |
| 40 /// The cascade that owns this phase. |
| 41 final AssetCascade cascade; |
| 42 |
| 43 /// A string describing the location of [this] in the transformer graph. |
| 44 final String _location; |
| 45 |
| 46 /// The index of [this] in its parent cascade or group. |
| 47 final int _index; |
| 48 |
| 49 /// The groups for this phase. |
| 50 final _groups = new Map<TransformerGroup, GroupRunner>(); |
| 51 |
| 52 /// The inputs for this phase. |
| 53 /// |
| 54 /// For the first phase, these will be the source assets. For all other |
| 55 /// phases, they will be the outputs from the previous phase. |
| 56 final _inputs = new AssetNodeSet(); |
| 57 |
| 58 /// The transformer classifiers for this phase. |
| 59 /// |
| 60 /// The keys can be either [Transformer]s or [AggregateTransformer]s. |
| 61 final _classifiers = new Map<dynamic, TransformerClassifier>(); |
| 62 |
| 63 /// The forwarders for this phase. |
| 64 final _forwarders = new Map<AssetId, PhaseForwarder>(); |
| 65 |
| 66 /// The outputs for this phase. |
| 67 final _outputs = new Map<AssetId, PhaseOutput>(); |
| 68 |
| 69 /// The set of all [AssetNode.origin] properties of the input assets for this |
| 70 /// phase. |
| 71 /// |
| 72 /// This is used to determine which assets have been passed unmodified through |
| 73 /// [_classifiers] or [_groups]. It's possible that a given asset was consumed |
| 74 /// by a group and not an individual transformer, and so shouldn't be |
| 75 /// forwarded through the phase as a whole. |
| 76 /// |
| 77 /// In order to detect whether an output has been forwarded through a group or |
| 78 /// a classifier, we must be able to distinguish it from other outputs with |
| 79 /// the same id. To do so, we check if its origin is in [_inputOrigins]. If |
| 80 /// so, it's been forwarded unmodified. |
| 81 final _inputOrigins = new Multiset<AssetNode>(); |
| 82 |
| 83 /// The streams exposed by this phase. |
| 84 final _streams = new NodeStreams(); |
| 85 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange; |
| 86 Stream<AssetNode> get onAsset => _streams.onAsset; |
| 87 Stream<LogEntry> get onLog => _streams.onLog; |
| 88 |
| 89 /// How far along [this] is in processing its assets. |
| 90 NodeStatus get status { |
| 91 // Before any transformers are added, the phase should be dirty if and only |
| 92 // if any input is dirty. |
| 93 if (_classifiers.isEmpty && _groups.isEmpty && previous == null) { |
| 94 return _inputs.any((input) => input.state.isDirty) ? |
| 95 NodeStatus.RUNNING : NodeStatus.IDLE; |
| 96 } |
| 97 |
| 98 var classifierStatus = NodeStatus.dirtiest( |
| 99 _classifiers.values.map((classifier) => classifier.status)); |
| 100 var groupStatus = NodeStatus.dirtiest( |
| 101 _groups.values.map((group) => group.status)); |
| 102 return (previous == null ? NodeStatus.IDLE : previous.status) |
| 103 .dirtier(classifierStatus) |
| 104 .dirtier(groupStatus); |
| 105 } |
| 106 |
| 107 /// The previous phase in the cascade, or null if this is the first phase. |
| 108 final Phase previous; |
| 109 |
| 110 /// The subscription to [previous]'s [onStatusChange] stream. |
| 111 StreamSubscription _previousStatusSubscription; |
| 112 |
| 113 /// The subscription to [previous]'s [onAsset] stream. |
| 114 StreamSubscription<AssetNode> _previousOnAssetSubscription; |
| 115 |
| 116 final _inputSubscriptions = new Set<StreamSubscription>(); |
| 117 |
| 118 /// A map of asset ids to completers for [getInput] requests. |
| 119 /// |
| 120 /// If an asset node is requested before it's available, we put a completer in |
| 121 /// this map to wait for the asset to be generated. If it's not generated, the |
| 122 /// completer should complete to `null`. |
| 123 final _pendingOutputRequests = new Map<AssetId, Completer<AssetNode>>(); |
| 124 |
| 125 /// Returns all currently-available output assets for this phase. |
| 126 Set<AssetNode> get availableOutputs { |
| 127 return _outputs.values |
| 128 .map((output) => output.output) |
| 129 .where((node) => node.state.isAvailable) |
| 130 .toSet(); |
| 131 } |
| 132 |
| 133 // TODO(nweiz): Rather than passing the cascade and the phase everywhere, |
| 134 // create an interface that just exposes [getInput]. Emit errors via |
| 135 // [AssetNode]s. |
| 136 Phase(AssetCascade cascade, String location) |
| 137 : this._(cascade, location, 0); |
| 138 |
| 139 Phase._(this.cascade, this._location, this._index, [this.previous]) { |
| 140 if (previous != null) { |
| 141 _previousOnAssetSubscription = previous.onAsset.listen(addInput); |
| 142 _previousStatusSubscription = previous.onStatusChange |
| 143 .listen((_) => _streams.changeStatus(status)); |
| 144 } |
| 145 |
| 146 onStatusChange.listen((status) { |
| 147 if (status == NodeStatus.RUNNING) return; |
| 148 |
| 149 // All the previous phases have finished declaring or producing their |
| 150 // outputs. If anyone's still waiting for outputs, cut off the wait; we |
| 151 // won't be generating them, at least until a source asset changes. |
| 152 for (var completer in _pendingOutputRequests.values) { |
| 153 completer.complete(null); |
| 154 } |
| 155 _pendingOutputRequests.clear(); |
| 156 }); |
| 157 } |
| 158 |
| 159 /// Adds a new asset as an input for this phase. |
| 160 /// |
| 161 /// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase |
| 162 /// will automatically begin determining which transforms can consume it as a |
| 163 /// primary input. The transforms themselves won't be applied until [process] |
| 164 /// is called, however. |
| 165 /// |
| 166 /// This should only be used for brand-new assets or assets that have been |
| 167 /// removed and re-created. The phase will automatically handle updated assets |
| 168 /// using the [AssetNode.onStateChange] stream. |
| 169 void addInput(AssetNode node) { |
| 170 // Each group is one channel along which an asset may be forwarded, as is |
| 171 // each transformer. |
| 172 var forwarder = new PhaseForwarder( |
| 173 node, _classifiers.length, _groups.length); |
| 174 _forwarders[node.id] = forwarder; |
| 175 forwarder.onAsset.listen(_handleOutputWithoutForwarder); |
| 176 if (forwarder.output != null) { |
| 177 _handleOutputWithoutForwarder(forwarder.output); |
| 178 } |
| 179 |
| 180 _inputOrigins.add(node.origin); |
| 181 _inputs.add(node); |
| 182 _inputSubscriptions.add(node.onStateChange.listen((state) { |
| 183 if (state.isRemoved) { |
| 184 _inputOrigins.remove(node.origin); |
| 185 _forwarders.remove(node.id).remove(); |
| 186 } |
| 187 _streams.changeStatus(status); |
| 188 })); |
| 189 |
| 190 for (var classifier in _classifiers.values) { |
| 191 classifier.addInput(node); |
| 192 } |
| 193 } |
| 194 |
| 195 // TODO(nweiz): If the output is available when this is called, it's |
| 196 // theoretically possible for it to become unavailable between the call and |
| 197 // the return. If it does so, it won't trigger the rebuilding process. To |
| 198 // avoid this, we should have this and the methods it calls take explicit |
| 199 // callbacks, as in [AssetNode.whenAvailable]. |
| 200 /// Gets the asset node for an output [id]. |
| 201 /// |
| 202 /// If [id] is for a generated or transformed asset, this will wait until it |
| 203 /// has been created and return it. This means that the returned asset will |
| 204 /// always be [AssetState.AVAILABLE]. |
| 205 /// |
| 206 /// If the output cannot be found, returns null. |
| 207 Future<AssetNode> getOutput(AssetId id) { |
| 208 return syncFuture(() { |
| 209 if (id.package != cascade.package) return cascade.graph.getAssetNode(id); |
| 210 if (_outputs.containsKey(id)) { |
| 211 var output = _outputs[id].output; |
| 212 // If the requested output is available, we can just return it. |
| 213 if (output.state.isAvailable) return output; |
| 214 |
| 215 // If the requested output exists but isn't yet available, wait to see |
| 216 // if it becomes available. If it's removed before becoming available, |
| 217 // try again, since it could be generated again. |
| 218 output.force(); |
| 219 return output.whenAvailable((_) { |
| 220 return output; |
| 221 }).catchError((error) { |
| 222 if (error is! AssetNotFoundException) throw error; |
| 223 return getOutput(id); |
| 224 }); |
| 225 } |
| 226 |
| 227 // If this phase and the previous phases are fully declared or done, the |
| 228 // requested output won't be generated and we can safely return null. |
| 229 if (status != NodeStatus.RUNNING) return null; |
| 230 |
| 231 // Otherwise, store a completer for the asset node. If it's generated in |
| 232 // the future, we'll complete this completer. |
| 233 var completer = _pendingOutputRequests.putIfAbsent(id, |
| 234 () => new Completer.sync()); |
| 235 return completer.future; |
| 236 }); |
| 237 } |
| 238 |
| 239 /// Set this phase's transformers to [transformers]. |
| 240 void updateTransformers(Iterable transformers) { |
| 241 var newTransformers = transformers |
| 242 .where((op) => op is Transformer || op is AggregateTransformer) |
| 243 .toSet(); |
| 244 var oldTransformers = _classifiers.keys.toSet(); |
| 245 for (var removed in oldTransformers.difference(newTransformers)) { |
| 246 _classifiers.remove(removed).remove(); |
| 247 } |
| 248 |
| 249 for (var transformer in newTransformers.difference(oldTransformers)) { |
| 250 var classifier = new TransformerClassifier( |
| 251 this, transformer, "$_location.$_index"); |
| 252 _classifiers[transformer] = classifier; |
| 253 classifier.onAsset.listen(_handleOutput); |
| 254 _streams.onLogPool.add(classifier.onLog); |
| 255 classifier.onStatusChange.listen((_) => _streams.changeStatus(status)); |
| 256 for (var input in _inputs) { |
| 257 classifier.addInput(input); |
| 258 } |
| 259 } |
| 260 |
| 261 var newGroups = transformers.where((op) => op is TransformerGroup) |
| 262 .toSet(); |
| 263 var oldGroups = _groups.keys.toSet(); |
| 264 for (var removed in oldGroups.difference(newGroups)) { |
| 265 _groups.remove(removed).remove(); |
| 266 } |
| 267 |
| 268 for (var added in newGroups.difference(oldGroups)) { |
| 269 var runner = new GroupRunner(previous, added, "$_location.$_index"); |
| 270 _groups[added] = runner; |
| 271 runner.onAsset.listen(_handleOutput); |
| 272 _streams.onLogPool.add(runner.onLog); |
| 273 runner.onStatusChange.listen((_) => _streams.changeStatus(status)); |
| 274 } |
| 275 |
| 276 for (var forwarder in _forwarders.values) { |
| 277 forwarder.updateTransformers(_classifiers.length, _groups.length); |
| 278 } |
| 279 |
| 280 _streams.changeStatus(status); |
| 281 } |
| 282 |
| 283 /// Force all [LazyTransformer]s' transforms in this phase to begin producing |
| 284 /// concrete assets. |
| 285 void forceAllTransforms() { |
| 286 for (var classifier in _classifiers.values) { |
| 287 classifier.forceAllTransforms(); |
| 288 } |
| 289 |
| 290 for (var group in _groups.values) { |
| 291 group.forceAllTransforms(); |
| 292 } |
| 293 } |
| 294 |
| 295 /// Add a new phase after this one. |
| 296 /// |
| 297 /// The new phase will have a location annotation describing its place in the |
| 298 /// package graph. By default, this annotation will describe it as being |
| 299 /// directly after [this]. If [location] is passed, though, it's described as |
| 300 /// being the first phase in that location. |
| 301 Phase addPhase([String location]) { |
| 302 var index = 0; |
| 303 if (location == null) { |
| 304 location = _location; |
| 305 index = _index + 1; |
| 306 } |
| 307 |
| 308 var next = new Phase._(cascade, location, index, this); |
| 309 for (var output in _outputs.values.toList()) { |
| 310 // Remove [output]'s listeners because now they should get the asset from |
| 311 // [next], rather than this phase. Any transforms consuming [output] will |
| 312 // be re-run and will consume the output from the new final phase. |
| 313 output.removeListeners(); |
| 314 } |
| 315 return next; |
| 316 } |
| 317 |
| 318 /// Mark this phase as removed. |
| 319 /// |
| 320 /// This will remove all the phase's outputs. |
| 321 void remove() { |
| 322 for (var classifier in _classifiers.values.toList()) { |
| 323 classifier.remove(); |
| 324 } |
| 325 for (var group in _groups.values) { |
| 326 group.remove(); |
| 327 } |
| 328 _streams.close(); |
| 329 for (var subscription in _inputSubscriptions) { |
| 330 subscription.cancel(); |
| 331 } |
| 332 if (_previousStatusSubscription != null) { |
| 333 _previousStatusSubscription.cancel(); |
| 334 } |
| 335 if (_previousOnAssetSubscription != null) { |
| 336 _previousOnAssetSubscription.cancel(); |
| 337 } |
| 338 } |
| 339 |
| 340 /// Add [asset] as an output of this phase. |
| 341 void _handleOutput(AssetNode asset) { |
| 342 if (_inputOrigins.contains(asset.origin)) { |
| 343 _forwarders[asset.id].addIntermediateAsset(asset); |
| 344 } else { |
| 345 _handleOutputWithoutForwarder(asset); |
| 346 } |
| 347 } |
| 348 |
| 349 /// Add [asset] as an output of this phase without checking if it's a |
| 350 /// forwarded asset. |
| 351 void _handleOutputWithoutForwarder(AssetNode asset) { |
| 352 if (_outputs.containsKey(asset.id)) { |
| 353 _outputs[asset.id].add(asset); |
| 354 } else { |
| 355 _outputs[asset.id] = new PhaseOutput(this, asset, "$_location.$_index"); |
| 356 _outputs[asset.id].onAsset.listen(_emit, |
| 357 onDone: () => _outputs.remove(asset.id)); |
| 358 _emit(_outputs[asset.id].output); |
| 359 } |
| 360 |
| 361 var exception = _outputs[asset.id].collisionException; |
| 362 if (exception != null) cascade.reportError(exception); |
| 363 } |
| 364 |
| 365 /// Emit [asset] as an output of this phase. |
| 366 /// |
| 367 /// This should be called after [_handleOutput], so that collisions are |
| 368 /// resolved. |
| 369 void _emit(AssetNode asset) { |
| 370 _streams.onAssetController.add(asset); |
| 371 _providePendingAsset(asset); |
| 372 } |
| 373 |
| 374 /// Provide an asset to a pending [getOutput] call. |
| 375 void _providePendingAsset(AssetNode asset) { |
| 376 // If anyone's waiting for this asset, provide it to them. |
| 377 var request = _pendingOutputRequests.remove(asset.id); |
| 378 if (request == null) return; |
| 379 |
| 380 if (asset.state.isAvailable) { |
| 381 request.complete(asset); |
| 382 return; |
| 383 } |
| 384 |
| 385 // A lazy asset may be emitted while still dirty. If so, we wait until it's |
| 386 // either available or removed before trying again to access it. |
| 387 assert(asset.state.isDirty); |
| 388 asset.force(); |
| 389 asset.whenStateChanges().then((state) { |
| 390 if (state.isRemoved) return getOutput(asset.id); |
| 391 return asset; |
| 392 }).then(request.complete).catchError(request.completeError); |
| 393 } |
| 394 |
| 395 String toString() => "phase $_location.$_index"; |
| 396 } |
OLD | NEW |