OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library barback.graph.transform_node; |
| 6 |
| 7 import 'dart:async'; |
| 8 |
| 9 import '../asset/asset.dart'; |
| 10 import '../asset/asset_id.dart'; |
| 11 import '../asset/asset_node.dart'; |
| 12 import '../asset/asset_node_set.dart'; |
| 13 import '../errors.dart'; |
| 14 import '../log.dart'; |
| 15 import '../transformer/aggregate_transform.dart'; |
| 16 import '../transformer/aggregate_transformer.dart'; |
| 17 import '../transformer/declaring_aggregate_transform.dart'; |
| 18 import '../transformer/declaring_aggregate_transformer.dart'; |
| 19 import '../transformer/lazy_aggregate_transformer.dart'; |
| 20 import '../utils.dart'; |
| 21 import 'node_status.dart'; |
| 22 import 'node_streams.dart'; |
| 23 import 'phase.dart'; |
| 24 import 'transformer_classifier.dart'; |
| 25 |
| 26 /// Describes a transform on a set of assets and its relationship to the build |
| 27 /// dependency graph. |
| 28 /// |
| 29 /// Keeps track of whether it's dirty and needs to be run and which assets it |
| 30 /// depends on. |
| 31 class TransformNode { |
| 32 /// The aggregate key for this node. |
| 33 final String key; |
| 34 |
| 35 /// The [TransformerClassifier] that [this] belongs to. |
| 36 final TransformerClassifier classifier; |
| 37 |
| 38 /// The [Phase] that this transform runs in. |
| 39 Phase get phase => classifier.phase; |
| 40 |
| 41 /// The [AggregateTransformer] to apply to this node's inputs. |
| 42 final AggregateTransformer transformer; |
| 43 |
| 44 /// The primary asset nodes this transform runs on. |
| 45 final _primaries = new AssetNodeSet(); |
| 46 |
| 47 /// A string describing the location of [this] in the transformer graph. |
| 48 final String _location; |
| 49 |
| 50 /// The subscription to the [_primaries]' [AssetNode.onStateChange] streams. |
| 51 final _primarySubscriptions = new Map<AssetId, StreamSubscription>(); |
| 52 |
| 53 /// The subscription to [phase]'s [Phase.onAsset] stream. |
| 54 StreamSubscription<AssetNode> _phaseAssetSubscription; |
| 55 |
| 56 /// The subscription to [phase]'s [Phase.onStatusChange] stream. |
| 57 StreamSubscription<NodeStatus> _phaseStatusSubscription; |
| 58 |
| 59 /// How far along [this] is in processing its assets. |
| 60 NodeStatus get status { |
| 61 if (_state == _State.APPLIED || _state == _State.DECLARED) { |
| 62 return NodeStatus.IDLE; |
| 63 } |
| 64 |
| 65 if (_declaring && _state != _State.DECLARING && |
| 66 _state != _State.NEEDS_DECLARE) { |
| 67 return NodeStatus.MATERIALIZING; |
| 68 } else { |
| 69 return NodeStatus.RUNNING; |
| 70 } |
| 71 } |
| 72 |
| 73 /// The [TransformInfo] describing this node. |
| 74 /// |
| 75 /// [TransformInfo] is the publicly-visible representation of a transform |
| 76 /// node. |
| 77 TransformInfo get info => new TransformInfo(transformer, |
| 78 new AssetId(phase.cascade.package, key)); |
| 79 |
| 80 /// Whether this is a declaring transform. |
| 81 /// |
| 82 /// This is usually identical to `transformer is |
| 83 /// DeclaringAggregateTransformer`, but if a declaring and non-lazy |
| 84 /// transformer emits an error during `declareOutputs` it's treated as though |
| 85 /// it wasn't declaring. |
| 86 bool get _declaring => transformer is DeclaringAggregateTransformer && |
| 87 (_state == _State.DECLARING || _declaredOutputs != null); |
| 88 |
| 89 /// Whether this transform has been forced since it last finished applying. |
| 90 /// |
| 91 /// A transform being forced means it should run until it generates outputs |
| 92 /// and is no longer dirty. This is always true for non-declaring |
| 93 /// transformers, since they always need to eagerly generate outputs. |
| 94 bool _forced; |
| 95 |
| 96 /// The subscriptions to each secondary input's [AssetNode.onStateChange] |
| 97 /// stream. |
| 98 final _secondarySubscriptions = new Map<AssetId, StreamSubscription>(); |
| 99 |
| 100 /// The controllers for the asset nodes emitted by this node. |
| 101 final _outputControllers = new Map<AssetId, AssetNodeController>(); |
| 102 |
| 103 /// The ids of inputs the transformer tried and failed to read last time it |
| 104 /// ran. |
| 105 final _missingInputs = new Set<AssetId>(); |
| 106 |
| 107 /// The controllers that are used to pass each primary input through [this] if |
| 108 /// it's not consumed or overwritten. |
| 109 /// |
| 110 /// This needs an intervening controller to ensure that the output can be |
| 111 /// marked dirty when determining whether [this] will consume or overwrite it, |
| 112 /// and be marked removed if it does. No pass-through controller will exist |
| 113 /// for primary inputs that are not being passed through. |
| 114 final _passThroughControllers = new Map<AssetId, AssetNodeController>(); |
| 115 |
| 116 /// The asset node for this transform. |
| 117 final _streams = new NodeStreams(); |
| 118 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange; |
| 119 Stream<AssetNode> get onAsset => _streams.onAsset; |
| 120 Stream<LogEntry> get onLog => _streams.onLog; |
| 121 |
| 122 /// The current state of [this]. |
| 123 var _state = _State.DECLARED; |
| 124 |
| 125 /// Whether [this] has been marked as removed. |
| 126 bool get _isRemoved => _streams.onAssetController.isClosed; |
| 127 |
| 128 // If [transformer] is declaring but not lazy and [primary] is available, we |
| 129 // can run [apply] even if [force] hasn't been called, since [transformer] |
| 130 // should run eagerly if possible. |
| 131 bool get _canRunDeclaringEagerly => |
| 132 _declaring && transformer is! LazyAggregateTransformer && |
| 133 _primaries.every((input) => input.state.isAvailable); |
| 134 |
| 135 /// Which primary inputs the most recent run of this transform has declared |
| 136 /// that it consumes. |
| 137 /// |
| 138 /// This starts out `null`, indicating that the transform hasn't declared |
| 139 /// anything yet. This is not meaningful unless [_state] is [_State.APPLIED] |
| 140 /// or [_State.DECLARED]. |
| 141 Set<AssetId> _consumedPrimaries; |
| 142 |
| 143 /// The set of output ids that [transformer] declared it would emit. |
| 144 /// |
| 145 /// This is only non-null if [transformer] is a |
| 146 /// [DeclaringAggregateTransformer] and its [declareOutputs] has been run |
| 147 /// successfully. |
| 148 Set<AssetId> _declaredOutputs; |
| 149 |
| 150 /// The controller for the currently-running |
| 151 /// [DeclaringAggregateTransformer.declareOutputs] call's |
| 152 /// [DeclaringAggregateTransform]. |
| 153 /// |
| 154 /// This will be non-`null` when |
| 155 /// [DeclaringAggregateTransformer.declareOutputs] is running. This means that |
| 156 /// it's always non-`null` when [_state] is [_State.DECLARING], sometimes |
| 157 /// non-`null` when it's [_State.NEEDS_DECLARE], and always `null` otherwise. |
| 158 DeclaringAggregateTransformController _declareController; |
| 159 |
| 160 /// The controller for the currently-running [AggregateTransformer.apply] |
| 161 /// call's [AggregateTransform]. |
| 162 /// |
| 163 /// This will be non-`null` when [AggregateTransform.apply] is running, which |
| 164 /// means that it's always non-`null` when [_state] is [_State.APPLYING] or |
| 165 /// [_State.NEEDS_APPLY], sometimes non-`null` when it's |
| 166 /// [_State.NEEDS_DECLARE], and always `null` otherwise. |
| 167 AggregateTransformController _applyController; |
| 168 |
| 169 /// The number of secondary inputs that have been requested but not yet |
| 170 /// produced. |
| 171 int _pendingSecondaryInputs = 0; |
| 172 |
| 173 /// A stopwatch that tracks the total time spent in a transformer's `apply` |
| 174 /// function. |
| 175 final _timeInTransformer = new Stopwatch(); |
| 176 |
| 177 /// A stopwatch that tracks the time in a transformer's `apply` function spent |
| 178 /// waiting for [getInput] calls to complete. |
| 179 final _timeAwaitingInputs = new Stopwatch(); |
| 180 |
| 181 TransformNode(this.classifier, this.transformer, this.key, this._location) { |
| 182 _forced = transformer is! DeclaringAggregateTransformer; |
| 183 |
| 184 _phaseAssetSubscription = phase.previous.onAsset.listen((node) { |
| 185 if (!_missingInputs.contains(node.id)) return; |
| 186 if (_forced) node.force(); |
| 187 _dirty(); |
| 188 }); |
| 189 |
| 190 _phaseStatusSubscription = phase.previous.onStatusChange.listen((status) { |
| 191 if (status == NodeStatus.RUNNING) return; |
| 192 |
| 193 _maybeFinishDeclareController(); |
| 194 _maybeFinishApplyController(); |
| 195 }); |
| 196 |
| 197 classifier.onDoneClassifying.listen((_) { |
| 198 _maybeFinishDeclareController(); |
| 199 _maybeFinishApplyController(); |
| 200 }); |
| 201 |
| 202 _run(); |
| 203 } |
| 204 |
| 205 /// Adds [input] as a primary input for this node. |
| 206 void addPrimary(AssetNode input) { |
| 207 _primaries.add(input); |
| 208 if (_forced) input.force(); |
| 209 |
| 210 _primarySubscriptions[input.id] = input.onStateChange |
| 211 .listen((_) => _onPrimaryStateChange(input)); |
| 212 |
| 213 if (_state == _State.DECLARING && !_declareController.isDone) { |
| 214 // If we're running `declareOutputs` and its id stream isn't closed yet, |
| 215 // pass this in as another id. |
| 216 _declareController.addId(input.id); |
| 217 _maybeFinishDeclareController(); |
| 218 } else if (_state == _State.APPLYING) { |
| 219 // If we're running `apply`, we need to wait until [input] is available |
| 220 // before we pass it into the stream. If it's available now, great; if |
| 221 // not, [_onPrimaryStateChange] will handle it. |
| 222 if (!input.state.isAvailable) { |
| 223 // If we started running eagerly without being forced, abort that run if |
| 224 // a new unavailable asset comes in. |
| 225 if (input.isLazy && !_forced) _restartRun(); |
| 226 return; |
| 227 } |
| 228 |
| 229 _onPrimaryStateChange(input); |
| 230 _maybeFinishApplyController(); |
| 231 } else { |
| 232 // Otherwise, a new input means we'll need to re-run `declareOutputs`. |
| 233 _restartRun(); |
| 234 } |
| 235 } |
| 236 |
| 237 /// Marks this transform as removed. |
| 238 /// |
| 239 /// This causes all of the transform's outputs to be marked as removed as |
| 240 /// well. Normally this will be automatically done internally based on events |
| 241 /// from the primary input, but it's possible for a transform to no longer be |
| 242 /// valid even if its primary input still exists. |
| 243 void remove() { |
| 244 _streams.close(); |
| 245 _phaseAssetSubscription.cancel(); |
| 246 _phaseStatusSubscription.cancel(); |
| 247 if (_declareController != null) _declareController.cancel(); |
| 248 if (_applyController != null) _applyController.cancel(); |
| 249 _clearSecondarySubscriptions(); |
| 250 _clearOutputs(); |
| 251 |
| 252 for (var subscription in _primarySubscriptions.values) { |
| 253 subscription.cancel(); |
| 254 } |
| 255 _primarySubscriptions.clear(); |
| 256 |
| 257 for (var controller in _passThroughControllers.values) { |
| 258 controller.setRemoved(); |
| 259 } |
| 260 _passThroughControllers.clear(); |
| 261 } |
| 262 |
| 263 /// If [this] is deferred, ensures that its concrete outputs will be |
| 264 /// generated. |
| 265 void force() { |
| 266 if (_forced || _state == _State.APPLIED) return; |
| 267 for (var input in _primaries) { |
| 268 input.force(); |
| 269 } |
| 270 |
| 271 _forced = true; |
| 272 if (_state == _State.DECLARED) _apply(); |
| 273 } |
| 274 |
| 275 /// Marks this transform as dirty. |
| 276 /// |
| 277 /// Specifically, this should be called when one of the transform's inputs' |
| 278 /// contents change, or when a secondary input is removed. Primary inputs |
| 279 /// being added or removed are handled by [addInput] and |
| 280 /// [_onPrimaryStateChange]. |
| 281 void _dirty() { |
| 282 if (_state == _State.DECLARING || _state == _State.NEEDS_DECLARE || |
| 283 _state == _State.NEEDS_APPLY) { |
| 284 // If we already know that [_apply] needs to be run, there's nothing to do |
| 285 // here. |
| 286 return; |
| 287 } |
| 288 |
| 289 if (!_forced && !_canRunDeclaringEagerly) { |
| 290 // [forced] should only ever be false for a declaring transformer. |
| 291 assert(_declaring); |
| 292 |
| 293 // If we've finished applying, transition to DECLARED, indicating that we |
| 294 // know what outputs [apply] will emit but we're waiting to emit them |
| 295 // concretely until [force] is called. If we're still applying, we'll |
| 296 // transition to DECLARED once we finish. |
| 297 if (_state == _State.APPLIED) _state = _State.DECLARED; |
| 298 for (var controller in _outputControllers.values) { |
| 299 controller.setLazy(force); |
| 300 } |
| 301 _emitDeclaredOutputs(); |
| 302 return; |
| 303 } |
| 304 |
| 305 if (_state == _State.APPLIED) { |
| 306 if (_declaredOutputs != null) _emitDeclaredOutputs(); |
| 307 _apply(); |
| 308 } else if (_state == _State.DECLARED) { |
| 309 _apply(); |
| 310 } else { |
| 311 _state = _State.NEEDS_APPLY; |
| 312 } |
| 313 } |
| 314 |
| 315 /// The callback called when [input]'s state changes. |
| 316 void _onPrimaryStateChange(AssetNode input) { |
| 317 if (input.state.isRemoved) { |
| 318 _primarySubscriptions.remove(input.id); |
| 319 |
| 320 if (_primaries.isEmpty) { |
| 321 // If there are no more primary inputs, there's no more use for this |
| 322 // node in the graph. It will be re-created by its |
| 323 // [TransformerClassifier] if a new input with [key] is added. |
| 324 remove(); |
| 325 return; |
| 326 } |
| 327 |
| 328 // Any change to the number of primary inputs requires that we re-run the |
| 329 // transformation. |
| 330 _restartRun(); |
| 331 } else if (input.state.isAvailable) { |
| 332 if (_state == _State.DECLARED && _canRunDeclaringEagerly) { |
| 333 // If [this] is fully declared but hasn't started applying, this input |
| 334 // becoming available may mean that all inputs are available, in which |
| 335 // case we can run apply eagerly. |
| 336 _apply(); |
| 337 return; |
| 338 } |
| 339 |
| 340 // If we're not actively passing concrete assets to the transformer, the |
| 341 // distinction between a dirty asset and an available one isn't relevant. |
| 342 if (_state != _State.APPLYING) return; |
| 343 |
| 344 if (_applyController.isDone) { |
| 345 // If we get a new asset after we've closed the asset stream, we need to |
| 346 // re-run declare and then apply. |
| 347 _restartRun(); |
| 348 } else { |
| 349 // If the new asset comes before the asset stream is done, we can just |
| 350 // pass it to the stream. |
| 351 _applyController.addInput(input.asset); |
| 352 _maybeFinishApplyController(); |
| 353 } |
| 354 } else { |
| 355 if (_forced) input.force(); |
| 356 if (_state == _State.APPLYING && !_applyController.addedId(input.id) && |
| 357 (_forced || !input.isLazy)) { |
| 358 // If the input hasn't yet been added to the transform's input stream, |
| 359 // there's no need to consider the transformation dirty. However, if the |
| 360 // input is lazy and we're running eagerly, we need to restart the |
| 361 // transformation. |
| 362 return; |
| 363 } |
| 364 _dirty(); |
| 365 } |
| 366 } |
| 367 |
| 368 /// Run the entire transformation, including both `declareOutputs` (if |
| 369 /// applicable) and `apply`. |
| 370 void _run() { |
| 371 assert(_state != _State.DECLARING); |
| 372 assert(_state != _State.APPLYING); |
| 373 |
| 374 _markOutputsDirty(); |
| 375 _declareOutputs(() { |
| 376 if (_forced || _canRunDeclaringEagerly) { |
| 377 _apply(); |
| 378 } else { |
| 379 _state = _State.DECLARED; |
| 380 _streams.changeStatus(NodeStatus.IDLE); |
| 381 } |
| 382 }); |
| 383 } |
| 384 |
| 385 /// Restart the entire transformation, including `declareOutputs` if |
| 386 /// applicable. |
| 387 void _restartRun() { |
| 388 if (_state == _State.DECLARED || _state == _State.APPLIED) { |
| 389 // If we're currently idle, we can restart the transformation immediately. |
| 390 _run(); |
| 391 return; |
| 392 } |
| 393 |
| 394 // If we're actively running `declareOutputs` or `apply`, cancel the |
| 395 // transforms and transition to `NEEDS_DECLARE`. Once the transformer's |
| 396 // method returns, we'll transition to `DECLARING`. |
| 397 if (_declareController != null) _declareController.cancel(); |
| 398 if (_applyController != null) _applyController.cancel(); |
| 399 _state = _State.NEEDS_DECLARE; |
| 400 } |
| 401 |
| 402 /// Runs [transform.declareOutputs] and emits the resulting assets as dirty |
| 403 /// assets. |
| 404 /// |
| 405 /// Calls [callback] when it's finished. This doesn't return a future so that |
| 406 /// [callback] is called synchronously if there are no outputs to declare. If |
| 407 /// [this] is removed while inputs are being declared, [callback] will not be |
| 408 /// called. |
| 409 void _declareOutputs(void callback()) { |
| 410 if (transformer is! DeclaringAggregateTransformer) { |
| 411 callback(); |
| 412 return; |
| 413 } |
| 414 |
| 415 _state = _State.DECLARING; |
| 416 var controller = new DeclaringAggregateTransformController(this); |
| 417 _declareController = controller; |
| 418 _streams.onLogPool.add(controller.onLog); |
| 419 for (var primary in _primaries) { |
| 420 controller.addId(primary.id); |
| 421 } |
| 422 _maybeFinishDeclareController(); |
| 423 |
| 424 syncFuture(() { |
| 425 return (transformer as DeclaringAggregateTransformer) |
| 426 .declareOutputs(controller.transform); |
| 427 }).whenComplete(() { |
| 428 // Cancel the controller here even if `declareOutputs` wasn't interrupted. |
| 429 // Since the declaration is finished, we want to close out the |
| 430 // controller's streams. |
| 431 controller.cancel(); |
| 432 _declareController = null; |
| 433 }).then((_) { |
| 434 if (_isRemoved) return; |
| 435 if (_state == _State.NEEDS_DECLARE) { |
| 436 _declareOutputs(callback); |
| 437 return; |
| 438 } |
| 439 |
| 440 if (controller.loggedError) { |
| 441 // If `declareOutputs` fails, fall back to treating a declaring |
| 442 // transformer as though it were eager. |
| 443 if (transformer is! LazyAggregateTransformer) _forced = true; |
| 444 callback(); |
| 445 return; |
| 446 } |
| 447 |
| 448 _consumedPrimaries = controller.consumedPrimaries; |
| 449 _declaredOutputs = controller.outputIds; |
| 450 var invalidIds = _declaredOutputs |
| 451 .where((id) => id.package != phase.cascade.package).toSet(); |
| 452 for (var id in invalidIds) { |
| 453 _declaredOutputs.remove(id); |
| 454 // TODO(nweiz): report this as a warning rather than a failing error. |
| 455 phase.cascade.reportError(new InvalidOutputException(info, id)); |
| 456 } |
| 457 |
| 458 for (var primary in _primaries) { |
| 459 if (_declaredOutputs.contains(primary.id)) continue; |
| 460 _passThrough(primary.id); |
| 461 } |
| 462 _emitDeclaredOutputs(); |
| 463 callback(); |
| 464 }).catchError((error, stackTrace) { |
| 465 if (_isRemoved) return; |
| 466 if (transformer is! LazyAggregateTransformer) _forced = true; |
| 467 phase.cascade.reportError(_wrapException(error, stackTrace)); |
| 468 callback(); |
| 469 }); |
| 470 } |
| 471 |
| 472 /// Emits a dirty asset node for all outputs that were declared by the |
| 473 /// transformer. |
| 474 /// |
| 475 /// This won't emit any outputs for which there already exist output |
| 476 /// controllers. It should only be called for transforms that have declared |
| 477 /// their outputs. |
| 478 void _emitDeclaredOutputs() { |
| 479 assert(_declaredOutputs != null); |
| 480 for (var id in _declaredOutputs) { |
| 481 if (_outputControllers.containsKey(id)) continue; |
| 482 var controller = _forced |
| 483 ? new AssetNodeController(id, this) |
| 484 : new AssetNodeController.lazy(id, force, this); |
| 485 _outputControllers[id] = controller; |
| 486 _streams.onAssetController.add(controller.node); |
| 487 } |
| 488 } |
| 489 |
| 490 //// Mark all emitted and passed-through outputs of this transform as dirty. |
| 491 void _markOutputsDirty() { |
| 492 for (var controller in _passThroughControllers.values) { |
| 493 controller.setDirty(); |
| 494 } |
| 495 for (var controller in _outputControllers.values) { |
| 496 if (_forced) { |
| 497 controller.setDirty(); |
| 498 } else { |
| 499 controller.setLazy(force); |
| 500 } |
| 501 } |
| 502 } |
| 503 |
| 504 /// Applies this transform. |
| 505 void _apply() { |
| 506 assert(!_isRemoved); |
| 507 |
| 508 _markOutputsDirty(); |
| 509 _clearSecondarySubscriptions(); |
| 510 _state = _State.APPLYING; |
| 511 _streams.changeStatus(status); |
| 512 _runApply().then((hadError) { |
| 513 if (_isRemoved) return; |
| 514 |
| 515 if (_state == _State.DECLARED) return; |
| 516 |
| 517 if (_state == _State.NEEDS_DECLARE) { |
| 518 _run(); |
| 519 return; |
| 520 } |
| 521 |
| 522 // If an input's contents changed while running `apply`, retry unless the |
| 523 // transformer is deferred and hasn't been forced. |
| 524 if (_state == _State.NEEDS_APPLY) { |
| 525 if (_forced || _canRunDeclaringEagerly) { |
| 526 _apply(); |
| 527 } else { |
| 528 _state = _State.DECLARED; |
| 529 } |
| 530 return; |
| 531 } |
| 532 |
| 533 if (_declaring) _forced = false; |
| 534 |
| 535 assert(_state == _State.APPLYING); |
| 536 if (hadError) { |
| 537 _clearOutputs(); |
| 538 // If the transformer threw an error, we don't want to emit the |
| 539 // pass-through assets in case they'll be overwritten by the |
| 540 // transformer. However, if the transformer declared that it wouldn't |
| 541 // overwrite or consume a pass-through asset, we can safely emit it. |
| 542 if (_declaredOutputs != null) { |
| 543 for (var input in _primaries) { |
| 544 if (_consumedPrimaries.contains(input.id) || |
| 545 _declaredOutputs.contains(input.id)) { |
| 546 _consumePrimary(input.id); |
| 547 } else { |
| 548 _passThrough(input.id); |
| 549 } |
| 550 } |
| 551 } |
| 552 } |
| 553 |
| 554 _state = _State.APPLIED; |
| 555 _streams.changeStatus(NodeStatus.IDLE); |
| 556 }); |
| 557 } |
| 558 |
| 559 /// Gets the asset for an input [id]. |
| 560 /// |
| 561 /// If an input with [id] cannot be found, throws an [AssetNotFoundException]. |
| 562 Future<Asset> getInput(AssetId id) { |
| 563 _timeAwaitingInputs.start(); |
| 564 _pendingSecondaryInputs++; |
| 565 return phase.previous.getOutput(id).then((node) { |
| 566 // Throw if the input isn't found. This ensures the transformer's apply |
| 567 // is exited. We'll then catch this and report it through the proper |
| 568 // results stream. |
| 569 if (node == null) { |
| 570 _missingInputs.add(id); |
| 571 throw new AssetNotFoundException(id); |
| 572 } |
| 573 |
| 574 _secondarySubscriptions.putIfAbsent(node.id, () { |
| 575 return node.onStateChange.listen((_) => _dirty()); |
| 576 }); |
| 577 |
| 578 return node.asset; |
| 579 }).whenComplete(() { |
| 580 _pendingSecondaryInputs--; |
| 581 if (_pendingSecondaryInputs != 0) _timeAwaitingInputs.stop(); |
| 582 }); |
| 583 } |
| 584 |
| 585 /// Run [AggregateTransformer.apply]. |
| 586 /// |
| 587 /// Returns whether or not an error occurred while running the transformer. |
| 588 Future<bool> _runApply() { |
| 589 var controller = new AggregateTransformController(this); |
| 590 _applyController = controller; |
| 591 _streams.onLogPool.add(controller.onLog); |
| 592 for (var primary in _primaries) { |
| 593 if (!primary.state.isAvailable) continue; |
| 594 controller.addInput(primary.asset); |
| 595 } |
| 596 _maybeFinishApplyController(); |
| 597 |
| 598 return syncFuture(() { |
| 599 _timeInTransformer.reset(); |
| 600 _timeAwaitingInputs.reset(); |
| 601 _timeInTransformer.start(); |
| 602 return transformer.apply(controller.transform); |
| 603 }).whenComplete(() { |
| 604 _timeInTransformer.stop(); |
| 605 _timeAwaitingInputs.stop(); |
| 606 |
| 607 // Cancel the controller here even if `apply` wasn't interrupted. Since |
| 608 // the apply is finished, we want to close out the controller's streams. |
| 609 controller.cancel(); |
| 610 _applyController = null; |
| 611 }).then((_) { |
| 612 assert(_state != _State.DECLARED); |
| 613 assert(_state != _State.DECLARING); |
| 614 assert(_state != _State.APPLIED); |
| 615 |
| 616 if (!_forced && _primaries.any((node) => !node.state.isAvailable)) { |
| 617 _state = _State.DECLARED; |
| 618 _streams.changeStatus(NodeStatus.IDLE); |
| 619 return false; |
| 620 } |
| 621 |
| 622 if (_isRemoved) return false; |
| 623 if (_state == _State.NEEDS_APPLY) return false; |
| 624 if (_state == _State.NEEDS_DECLARE) return false; |
| 625 if (controller.loggedError) return true; |
| 626 |
| 627 // If the transformer took long enough, log its duration in fine output. |
| 628 // That way it's not always visible, but users running with "pub serve |
| 629 // --verbose" can see it. |
| 630 var ranLong = _timeInTransformer.elapsed > new Duration(seconds: 1); |
| 631 var ranLongLocally = |
| 632 _timeInTransformer.elapsed - _timeAwaitingInputs.elapsed > |
| 633 new Duration(milliseconds: 200); |
| 634 |
| 635 // Report the transformer's timing information if it spent more than 0.2s |
| 636 // doing things other than waiting for its secondary inputs or if it spent |
| 637 // more than 1s in total. |
| 638 if (ranLongLocally || ranLong) { |
| 639 _streams.onLogController.add(new LogEntry( |
| 640 info, info.primaryId, LogLevel.FINE, |
| 641 "Took ${niceDuration(_timeInTransformer.elapsed)} " |
| 642 "(${niceDuration(_timeAwaitingInputs.elapsed)} awaiting " |
| 643 "secondary inputs).", |
| 644 null)); |
| 645 } |
| 646 |
| 647 _handleApplyResults(controller); |
| 648 return false; |
| 649 }).catchError((error, stackTrace) { |
| 650 // If the transform became dirty while processing, ignore any errors from |
| 651 // it. |
| 652 if (_state == _State.NEEDS_APPLY || _isRemoved) return false; |
| 653 |
| 654 // Catch all transformer errors and pipe them to the results stream. This |
| 655 // is so a broken transformer doesn't take down the whole graph. |
| 656 phase.cascade.reportError(_wrapException(error, stackTrace)); |
| 657 return true; |
| 658 }); |
| 659 } |
| 660 |
| 661 /// Handle the results of running [Transformer.apply]. |
| 662 /// |
| 663 /// [controller] should be the controller for the [AggegateTransform] passed |
| 664 /// to [AggregateTransformer.apply]. |
| 665 void _handleApplyResults(AggregateTransformController controller) { |
| 666 _consumedPrimaries = controller.consumedPrimaries; |
| 667 |
| 668 var newOutputs = controller.outputs; |
| 669 // Any ids that are for a different package are invalid. |
| 670 var invalidIds = newOutputs |
| 671 .map((asset) => asset.id) |
| 672 .where((id) => id.package != phase.cascade.package) |
| 673 .toSet(); |
| 674 for (var id in invalidIds) { |
| 675 newOutputs.removeId(id); |
| 676 // TODO(nweiz): report this as a warning rather than a failing error. |
| 677 phase.cascade.reportError(new InvalidOutputException(info, id)); |
| 678 } |
| 679 |
| 680 // Remove outputs that used to exist but don't anymore. |
| 681 for (var id in _outputControllers.keys.toList()) { |
| 682 if (newOutputs.containsId(id)) continue; |
| 683 _outputControllers.remove(id).setRemoved(); |
| 684 } |
| 685 |
| 686 // Emit or stop emitting pass-through assets between removing and adding |
| 687 // outputs to ensure there are no collisions. |
| 688 for (var id in _primaries.map((node) => node.id)) { |
| 689 if (_consumedPrimaries.contains(id) || newOutputs.containsId(id)) { |
| 690 _consumePrimary(id); |
| 691 } else { |
| 692 _passThrough(id); |
| 693 } |
| 694 } |
| 695 |
| 696 // Store any new outputs or new contents for existing outputs. |
| 697 for (var asset in newOutputs) { |
| 698 var controller = _outputControllers[asset.id]; |
| 699 if (controller != null) { |
| 700 controller.setAvailable(asset); |
| 701 } else { |
| 702 var controller = new AssetNodeController.available(asset, this); |
| 703 _outputControllers[asset.id] = controller; |
| 704 _streams.onAssetController.add(controller.node); |
| 705 } |
| 706 } |
| 707 } |
| 708 |
| 709 /// Cancels all subscriptions to secondary input nodes. |
| 710 void _clearSecondarySubscriptions() { |
| 711 _missingInputs.clear(); |
| 712 for (var subscription in _secondarySubscriptions.values) { |
| 713 subscription.cancel(); |
| 714 } |
| 715 _secondarySubscriptions.clear(); |
| 716 } |
| 717 |
| 718 /// Removes all output assets. |
| 719 void _clearOutputs() { |
| 720 // Remove all the previously-emitted assets. |
| 721 for (var controller in _outputControllers.values) { |
| 722 controller.setRemoved(); |
| 723 } |
| 724 _outputControllers.clear(); |
| 725 } |
| 726 |
| 727 /// Emit the pass-through node for the primary input [id] if it's not being |
| 728 /// emitted already. |
| 729 void _passThrough(AssetId id) { |
| 730 assert(!_outputControllers.containsKey(id)); |
| 731 |
| 732 if (_consumedPrimaries.contains(id)) return; |
| 733 var controller = _passThroughControllers[id]; |
| 734 var primary = _primaries[id]; |
| 735 if (controller == null) { |
| 736 controller = new AssetNodeController.from(primary); |
| 737 _passThroughControllers[id] = controller; |
| 738 _streams.onAssetController.add(controller.node); |
| 739 } else if (primary.state.isDirty) { |
| 740 controller.setDirty(); |
| 741 } else if (!controller.node.state.isAvailable) { |
| 742 controller.setAvailable(primary.asset); |
| 743 } |
| 744 } |
| 745 |
| 746 /// Stops emitting the pass-through node for the primary input [id] if it's |
| 747 /// being emitted. |
| 748 void _consumePrimary(AssetId id) { |
| 749 var controller = _passThroughControllers.remove(id); |
| 750 if (controller == null) return; |
| 751 controller.setRemoved(); |
| 752 } |
| 753 |
| 754 /// If `declareOutputs` is running and all previous phases have declared their |
| 755 /// outputs, mark [_declareController] as done. |
| 756 void _maybeFinishDeclareController() { |
| 757 if (_declareController == null) return; |
| 758 if (classifier.isClassifying) return; |
| 759 if (phase.previous.status == NodeStatus.RUNNING) return; |
| 760 _declareController.done(); |
| 761 } |
| 762 |
| 763 /// If `apply` is running, all previous phases have declared their outputs, |
| 764 /// and all primary inputs are available and thus have been passed to the |
| 765 /// transformer, mark [_applyController] as done. |
| 766 void _maybeFinishApplyController() { |
| 767 if (_applyController == null) return; |
| 768 if (classifier.isClassifying) return; |
| 769 if (_primaries.any((input) => !input.state.isAvailable)) return; |
| 770 if (phase.previous.status == NodeStatus.RUNNING) return; |
| 771 _applyController.done(); |
| 772 } |
| 773 |
| 774 BarbackException _wrapException(error, StackTrace stackTrace) { |
| 775 if (error is! AssetNotFoundException) { |
| 776 return new TransformerException(info, error, stackTrace); |
| 777 } else { |
| 778 return new MissingInputException(info, error.id); |
| 779 } |
| 780 } |
| 781 |
| 782 String toString() => |
| 783 "transform node in $_location for $transformer on ${info.primaryId} " |
| 784 "($_state, $status, ${_forced ? '' : 'un'}forced)"; |
| 785 } |
| 786 |
| 787 /// The enum of states that [TransformNode] can be in. |
| 788 class _State { |
| 789 /// The transform is running [DeclaringAggregateTransformer.declareOutputs]. |
| 790 /// |
| 791 /// If the set of primary inputs changes while in this state, it will |
| 792 /// transition to [NEEDS_DECLARE]. If the [TransformNode] is still in this |
| 793 /// state when `declareOutputs` finishes running, it will transition to |
| 794 /// [APPLYING] if the transform is non-lazy and all of its primary inputs are |
| 795 /// available, and [DECLARED] otherwise. |
| 796 /// |
| 797 /// Non-declaring transformers will transition out of this state and into |
| 798 /// [APPLYING] immediately. |
| 799 static const DECLARING = const _State._("declaring outputs"); |
| 800 |
| 801 /// The transform is running [AggregateTransformer.declareOutputs] or |
| 802 /// [AggregateTransform.apply], but a primary input was added or removed after |
| 803 /// it started, so it will need to re-run `declareOutputs`. |
| 804 /// |
| 805 /// The [TransformNode] will transition to [DECLARING] once `declareOutputs` |
| 806 /// or `apply` finishes running. |
| 807 static const NEEDS_DECLARE = const _State._("needs declare"); |
| 808 |
| 809 /// The transform is deferred and has run |
| 810 /// [DeclaringAggregateTransformer.declareOutputs] but hasn't yet been forced. |
| 811 /// |
| 812 /// The [TransformNode] will transition to [APPLYING] when one of the outputs |
| 813 /// has been forced or if the transformer is non-lazy and all of its primary |
| 814 /// inputs become available. |
| 815 static const DECLARED = const _State._("declared"); |
| 816 |
| 817 /// The transform is running [AggregateTransformer.apply]. |
| 818 /// |
| 819 /// If an input's contents change or a secondary input is added or removed |
| 820 /// while in this state, the [TransformNode] will transition to [NEEDS_APPLY]. |
| 821 /// If a primary input is added or removed, it will transition to |
| 822 /// [NEEDS_DECLARE]. If it's still in this state when `apply` finishes |
| 823 /// running, it will transition to [APPLIED]. |
| 824 static const APPLYING = const _State._("applying"); |
| 825 |
| 826 /// The transform is running [AggregateTransformer.apply], but an input's |
| 827 /// contents changed or a secondary input was added or removed after it |
| 828 /// started, so it will need to re-run `apply`. |
| 829 /// |
| 830 /// If a primary input is added or removed while in this state, the |
| 831 /// [TranformNode] will transition to [NEEDS_DECLARE]. If it's still in this |
| 832 /// state when `apply` finishes running, it will transition to [APPLYING]. |
| 833 static const NEEDS_APPLY = const _State._("needs apply"); |
| 834 |
| 835 /// The transform has finished running [AggregateTransformer.apply], whether |
| 836 /// or not it emitted an error. |
| 837 /// |
| 838 /// If an input's contents change or a secondary input is added or removed, |
| 839 /// the [TransformNode] will transition to [DECLARED] if the transform is |
| 840 /// declaring and [APPLYING] otherwise. If a primary input is added or |
| 841 /// removed, this will transition to [DECLARING]. |
| 842 static const APPLIED = const _State._("applied"); |
| 843 |
| 844 final String name; |
| 845 |
| 846 const _State._(this.name); |
| 847 |
| 848 String toString() => name; |
| 849 } |
OLD | NEW |