| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library barback.graph.transform_node; | |
| 6 | |
| 7 import 'dart:async'; | |
| 8 | |
| 9 import '../asset/asset.dart'; | |
| 10 import '../asset/asset_id.dart'; | |
| 11 import '../asset/asset_node.dart'; | |
| 12 import '../asset/asset_node_set.dart'; | |
| 13 import '../errors.dart'; | |
| 14 import '../log.dart'; | |
| 15 import '../transformer/aggregate_transform.dart'; | |
| 16 import '../transformer/aggregate_transformer.dart'; | |
| 17 import '../transformer/declaring_aggregate_transform.dart'; | |
| 18 import '../transformer/declaring_aggregate_transformer.dart'; | |
| 19 import '../transformer/lazy_aggregate_transformer.dart'; | |
| 20 import '../utils.dart'; | |
| 21 import 'node_status.dart'; | |
| 22 import 'node_streams.dart'; | |
| 23 import 'phase.dart'; | |
| 24 import 'transformer_classifier.dart'; | |
| 25 | |
| 26 /// Every `_applyLogDuration`, we will issue a fine log entry letting the user | |
| 27 /// know that the transform is still executing. | |
| 28 const _applyLogDuration = const Duration(seconds: 10); | |
| 29 | |
| 30 /// Describes a transform on a set of assets and its relationship to the build | |
| 31 /// dependency graph. | |
| 32 /// | |
| 33 /// Keeps track of whether it's dirty and needs to be run and which assets it | |
| 34 /// depends on. | |
| 35 class TransformNode { | |
| 36 /// The aggregate key for this node. | |
| 37 final String key; | |
| 38 | |
| 39 /// The [TransformerClassifier] that [this] belongs to. | |
| 40 final TransformerClassifier classifier; | |
| 41 | |
| 42 /// The [Phase] that this transform runs in. | |
| 43 Phase get phase => classifier.phase; | |
| 44 | |
| 45 /// The [AggregateTransformer] to apply to this node's inputs. | |
| 46 final AggregateTransformer transformer; | |
| 47 | |
| 48 /// The primary asset nodes this transform runs on. | |
| 49 final _primaries = new AssetNodeSet(); | |
| 50 | |
| 51 /// A string describing the location of [this] in the transformer graph. | |
| 52 final String _location; | |
| 53 | |
| 54 /// The subscription to the [_primaries]' [AssetNode.onStateChange] streams. | |
| 55 final _primarySubscriptions = new Map<AssetId, StreamSubscription>(); | |
| 56 | |
| 57 /// The subscription to [phase]'s [Phase.onAsset] stream. | |
| 58 StreamSubscription<AssetNode> _phaseAssetSubscription; | |
| 59 | |
| 60 /// The subscription to [phase]'s [Phase.onStatusChange] stream. | |
| 61 StreamSubscription<NodeStatus> _phaseStatusSubscription; | |
| 62 | |
| 63 /// How far along [this] is in processing its assets. | |
| 64 NodeStatus get status { | |
| 65 if (_state == _State.APPLIED || _state == _State.DECLARED) { | |
| 66 return NodeStatus.IDLE; | |
| 67 } | |
| 68 | |
| 69 if (_declaring && _state != _State.DECLARING && | |
| 70 _state != _State.NEEDS_DECLARE) { | |
| 71 return NodeStatus.MATERIALIZING; | |
| 72 } else { | |
| 73 return NodeStatus.RUNNING; | |
| 74 } | |
| 75 } | |
| 76 | |
| 77 /// The [TransformInfo] describing this node. | |
| 78 /// | |
| 79 /// [TransformInfo] is the publicly-visible representation of a transform | |
| 80 /// node. | |
| 81 TransformInfo get info => new TransformInfo(transformer, | |
| 82 new AssetId(phase.cascade.package, key)); | |
| 83 | |
| 84 /// Whether this is a declaring transform. | |
| 85 /// | |
| 86 /// This is usually identical to `transformer is | |
| 87 /// DeclaringAggregateTransformer`, but if a declaring and non-lazy | |
| 88 /// transformer emits an error during `declareOutputs` it's treated as though | |
| 89 /// it wasn't declaring. | |
| 90 bool get _declaring => transformer is DeclaringAggregateTransformer && | |
| 91 (_state == _State.DECLARING || _declaredOutputs != null); | |
| 92 | |
| 93 /// Whether this transform has been forced since it last finished applying. | |
| 94 /// | |
| 95 /// A transform being forced means it should run until it generates outputs | |
| 96 /// and is no longer dirty. This is always true for non-declaring | |
| 97 /// transformers, since they always need to eagerly generate outputs. | |
| 98 bool _forced; | |
| 99 | |
| 100 /// The subscriptions to each secondary input's [AssetNode.onStateChange] | |
| 101 /// stream. | |
| 102 final _secondarySubscriptions = new Map<AssetId, StreamSubscription>(); | |
| 103 | |
| 104 /// The subscriptions to the [AssetCascade.onAsset] stream for cascades that | |
| 105 /// might generate assets in [_missingInputs]. | |
| 106 final _missingExternalInputSubscriptions = | |
| 107 new Map<String, StreamSubscription>(); | |
| 108 | |
| 109 /// The controllers for the asset nodes emitted by this node. | |
| 110 final _outputControllers = new Map<AssetId, AssetNodeController>(); | |
| 111 | |
| 112 /// The ids of inputs the transformer tried and failed to read last time it | |
| 113 /// ran. | |
| 114 final _missingInputs = new Set<AssetId>(); | |
| 115 | |
| 116 /// The controllers that are used to pass each primary input through [this] if | |
| 117 /// it's not consumed or overwritten. | |
| 118 /// | |
| 119 /// This needs an intervening controller to ensure that the output can be | |
| 120 /// marked dirty when determining whether [this] will consume or overwrite it, | |
| 121 /// and be marked removed if it does. No pass-through controller will exist | |
| 122 /// for primary inputs that are not being passed through. | |
| 123 final _passThroughControllers = new Map<AssetId, AssetNodeController>(); | |
| 124 | |
| 125 /// The asset node for this transform. | |
| 126 final _streams = new NodeStreams(); | |
| 127 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange; | |
| 128 Stream<AssetNode> get onAsset => _streams.onAsset; | |
| 129 Stream<LogEntry> get onLog => _streams.onLog; | |
| 130 | |
| 131 /// The current state of [this]. | |
| 132 var _state = _State.DECLARED; | |
| 133 | |
| 134 /// Whether [this] has been marked as removed. | |
| 135 bool get _isRemoved => _streams.onAssetController.isClosed; | |
| 136 | |
| 137 // If [transformer] is declaring but not lazy and [primary] is available, we | |
| 138 // can run [apply] even if [force] hasn't been called, since [transformer] | |
| 139 // should run eagerly if possible. | |
| 140 bool get _canRunDeclaringEagerly => | |
| 141 _declaring && transformer is! LazyAggregateTransformer && | |
| 142 _primaries.every((input) => input.state.isAvailable); | |
| 143 | |
| 144 /// Which primary inputs the most recent run of this transform has declared | |
| 145 /// that it consumes. | |
| 146 /// | |
| 147 /// This starts out `null`, indicating that the transform hasn't declared | |
| 148 /// anything yet. This is not meaningful unless [_state] is [_State.APPLIED] | |
| 149 /// or [_State.DECLARED]. | |
| 150 Set<AssetId> _consumedPrimaries; | |
| 151 | |
| 152 /// The set of output ids that [transformer] declared it would emit. | |
| 153 /// | |
| 154 /// This is only non-null if [transformer] is a | |
| 155 /// [DeclaringAggregateTransformer] and its [declareOutputs] has been run | |
| 156 /// successfully. | |
| 157 Set<AssetId> _declaredOutputs; | |
| 158 | |
| 159 /// The controller for the currently-running | |
| 160 /// [DeclaringAggregateTransformer.declareOutputs] call's | |
| 161 /// [DeclaringAggregateTransform]. | |
| 162 /// | |
| 163 /// This will be non-`null` when | |
| 164 /// [DeclaringAggregateTransformer.declareOutputs] is running. This means that | |
| 165 /// it's always non-`null` when [_state] is [_State.DECLARING], sometimes | |
| 166 /// non-`null` when it's [_State.NEEDS_DECLARE], and always `null` otherwise. | |
| 167 DeclaringAggregateTransformController _declareController; | |
| 168 | |
| 169 /// The controller for the currently-running [AggregateTransformer.apply] | |
| 170 /// call's [AggregateTransform]. | |
| 171 /// | |
| 172 /// This will be non-`null` when [AggregateTransform.apply] is running, which | |
| 173 /// means that it's always non-`null` when [_state] is [_State.APPLYING] or | |
| 174 /// [_State.NEEDS_APPLY], sometimes non-`null` when it's | |
| 175 /// [_State.NEEDS_DECLARE], and always `null` otherwise. | |
| 176 AggregateTransformController _applyController; | |
| 177 | |
| 178 /// Map to track pending requests for secondary inputs. | |
| 179 /// | |
| 180 /// Keys are the secondary inputs that have been requested but not yet | |
| 181 /// produced. Values are the number of requests for that input. | |
| 182 final _pendingSecondaryInputs = <AssetId, int>{}; | |
| 183 | |
| 184 /// A stopwatch that tracks the total time spent in a transformer's `apply` | |
| 185 /// function. | |
| 186 final _timeInTransformer = new Stopwatch(); | |
| 187 | |
| 188 /// A stopwatch that tracks the time in a transformer's `apply` function spent | |
| 189 /// waiting for [getInput] calls to complete. | |
| 190 final _timeAwaitingInputs = new Stopwatch(); | |
| 191 | |
| 192 TransformNode(this.classifier, this.transformer, this.key, this._location) { | |
| 193 _forced = transformer is! DeclaringAggregateTransformer; | |
| 194 | |
| 195 _phaseAssetSubscription = phase.previous.onAsset.listen((node) { | |
| 196 if (!_missingInputs.contains(node.id)) return; | |
| 197 if (_forced) node.force(); | |
| 198 _dirty(); | |
| 199 }); | |
| 200 | |
| 201 _phaseStatusSubscription = phase.previous.onStatusChange.listen((status) { | |
| 202 if (status == NodeStatus.RUNNING) return; | |
| 203 | |
| 204 _maybeFinishDeclareController(); | |
| 205 _maybeFinishApplyController(); | |
| 206 }); | |
| 207 | |
| 208 classifier.onDoneClassifying.listen((_) { | |
| 209 _maybeFinishDeclareController(); | |
| 210 _maybeFinishApplyController(); | |
| 211 }); | |
| 212 | |
| 213 _run(); | |
| 214 } | |
| 215 | |
| 216 /// Adds [input] as a primary input for this node. | |
| 217 void addPrimary(AssetNode input) { | |
| 218 _primaries.add(input); | |
| 219 if (_forced) input.force(); | |
| 220 | |
| 221 _primarySubscriptions[input.id] = input.onStateChange | |
| 222 .listen((_) => _onPrimaryStateChange(input)); | |
| 223 | |
| 224 if (_state == _State.DECLARING && !_declareController.isDone) { | |
| 225 // If we're running `declareOutputs` and its id stream isn't closed yet, | |
| 226 // pass this in as another id. | |
| 227 _declareController.addId(input.id); | |
| 228 _maybeFinishDeclareController(); | |
| 229 } else if (_state == _State.APPLYING) { | |
| 230 // If we're running `apply`, we need to wait until [input] is available | |
| 231 // before we pass it into the stream. If it's available now, great; if | |
| 232 // not, [_onPrimaryStateChange] will handle it. | |
| 233 if (!input.state.isAvailable) { | |
| 234 // If we started running eagerly without being forced, abort that run if | |
| 235 // a new unavailable asset comes in. | |
| 236 if (input.isLazy && !_forced) _restartRun(); | |
| 237 return; | |
| 238 } | |
| 239 | |
| 240 _onPrimaryStateChange(input); | |
| 241 _maybeFinishApplyController(); | |
| 242 } else { | |
| 243 // Otherwise, a new input means we'll need to re-run `declareOutputs`. | |
| 244 _restartRun(); | |
| 245 } | |
| 246 } | |
| 247 | |
| 248 /// Marks this transform as removed. | |
| 249 /// | |
| 250 /// This causes all of the transform's outputs to be marked as removed as | |
| 251 /// well. Normally this will be automatically done internally based on events | |
| 252 /// from the primary input, but it's possible for a transform to no longer be | |
| 253 /// valid even if its primary input still exists. | |
| 254 void remove() { | |
| 255 _streams.close(); | |
| 256 _phaseAssetSubscription.cancel(); | |
| 257 _phaseStatusSubscription.cancel(); | |
| 258 if (_declareController != null) _declareController.cancel(); | |
| 259 if (_applyController != null) _applyController.cancel(); | |
| 260 _clearSecondarySubscriptions(); | |
| 261 _clearOutputs(); | |
| 262 | |
| 263 for (var subscription in _primarySubscriptions.values) { | |
| 264 subscription.cancel(); | |
| 265 } | |
| 266 _primarySubscriptions.clear(); | |
| 267 | |
| 268 for (var controller in _passThroughControllers.values) { | |
| 269 controller.setRemoved(); | |
| 270 } | |
| 271 _passThroughControllers.clear(); | |
| 272 } | |
| 273 | |
| 274 /// If [this] is deferred, ensures that its concrete outputs will be | |
| 275 /// generated. | |
| 276 void force() { | |
| 277 if (_forced || _state == _State.APPLIED) return; | |
| 278 for (var input in _primaries) { | |
| 279 input.force(); | |
| 280 } | |
| 281 | |
| 282 _forced = true; | |
| 283 if (_state == _State.DECLARED) _apply(); | |
| 284 } | |
| 285 | |
| 286 /// Marks this transform as dirty. | |
| 287 /// | |
| 288 /// Specifically, this should be called when one of the transform's inputs' | |
| 289 /// contents change, or when a secondary input is removed. Primary inputs | |
| 290 /// being added or removed are handled by [addInput] and | |
| 291 /// [_onPrimaryStateChange]. | |
| 292 void _dirty() { | |
| 293 if (_state == _State.DECLARING || _state == _State.NEEDS_DECLARE || | |
| 294 _state == _State.NEEDS_APPLY) { | |
| 295 // If we already know that [_apply] needs to be run, there's nothing to do | |
| 296 // here. | |
| 297 return; | |
| 298 } | |
| 299 | |
| 300 if (!_forced && !_canRunDeclaringEagerly) { | |
| 301 // [forced] should only ever be false for a declaring transformer. | |
| 302 assert(_declaring); | |
| 303 | |
| 304 // If we've finished applying, transition to DECLARED, indicating that we | |
| 305 // know what outputs [apply] will emit but we're waiting to emit them | |
| 306 // concretely until [force] is called. If we're still applying, we'll | |
| 307 // transition to DECLARED once we finish. | |
| 308 if (_state == _State.APPLIED) _state = _State.DECLARED; | |
| 309 for (var controller in _outputControllers.values) { | |
| 310 controller.setLazy(force); | |
| 311 } | |
| 312 _emitDeclaredOutputs(); | |
| 313 return; | |
| 314 } | |
| 315 | |
| 316 if (_state == _State.APPLIED) { | |
| 317 if (_declaredOutputs != null) _emitDeclaredOutputs(); | |
| 318 _apply(); | |
| 319 } else if (_state == _State.DECLARED) { | |
| 320 _apply(); | |
| 321 } else { | |
| 322 _state = _State.NEEDS_APPLY; | |
| 323 } | |
| 324 } | |
| 325 | |
| 326 /// The callback called when [input]'s state changes. | |
| 327 void _onPrimaryStateChange(AssetNode input) { | |
| 328 if (input.state.isRemoved) { | |
| 329 _primarySubscriptions.remove(input.id); | |
| 330 | |
| 331 if (_primaries.isEmpty) { | |
| 332 // If there are no more primary inputs, there's no more use for this | |
| 333 // node in the graph. It will be re-created by its | |
| 334 // [TransformerClassifier] if a new input with [key] is added. | |
| 335 remove(); | |
| 336 return; | |
| 337 } | |
| 338 | |
| 339 // Any change to the number of primary inputs requires that we re-run the | |
| 340 // transformation. | |
| 341 _restartRun(); | |
| 342 } else if (input.state.isAvailable) { | |
| 343 if (_state == _State.DECLARED) { | |
| 344 // If we're passing through this input and its contents don't matter, | |
| 345 // update the pass-through controller. | |
| 346 var controller = _passThroughControllers[input.id]; | |
| 347 if (controller != null) controller.setAvailable(input.asset); | |
| 348 } | |
| 349 | |
| 350 if (_state == _State.DECLARED && _canRunDeclaringEagerly) { | |
| 351 // If [this] is fully declared but hasn't started applying, this input | |
| 352 // becoming available may mean that all inputs are available, in which | |
| 353 // case we can run apply eagerly. | |
| 354 _apply(); | |
| 355 return; | |
| 356 } | |
| 357 | |
| 358 // If we're not actively passing concrete assets to the transformer, the | |
| 359 // distinction between a dirty asset and an available one isn't relevant. | |
| 360 if (_state != _State.APPLYING) return; | |
| 361 | |
| 362 if (_applyController.isDone) { | |
| 363 // If we get a new asset after we've closed the asset stream, we need to | |
| 364 // re-run declare and then apply. | |
| 365 _restartRun(); | |
| 366 } else { | |
| 367 // If the new asset comes before the asset stream is done, we can just | |
| 368 // pass it to the stream. | |
| 369 _applyController.addInput(input.asset); | |
| 370 _maybeFinishApplyController(); | |
| 371 } | |
| 372 } else { | |
| 373 if (_forced) input.force(); | |
| 374 | |
| 375 var controller = _passThroughControllers[input.id]; | |
| 376 if (controller != null) controller.setDirty(); | |
| 377 | |
| 378 if (_state == _State.APPLYING && !_applyController.addedId(input.id) && | |
| 379 (_forced || !input.isLazy)) { | |
| 380 // If the input hasn't yet been added to the transform's input stream, | |
| 381 // there's no need to consider the transformation dirty. However, if the | |
| 382 // input is lazy and we're running eagerly, we need to restart the | |
| 383 // transformation. | |
| 384 return; | |
| 385 } | |
| 386 _dirty(); | |
| 387 } | |
| 388 } | |
| 389 | |
| 390 /// Run the entire transformation, including both `declareOutputs` (if | |
| 391 /// applicable) and `apply`. | |
| 392 void _run() { | |
| 393 assert(_state != _State.DECLARING); | |
| 394 assert(_state != _State.APPLYING); | |
| 395 | |
| 396 _markOutputsDirty(); | |
| 397 _declareOutputs(() { | |
| 398 if (_forced || _canRunDeclaringEagerly) { | |
| 399 _apply(); | |
| 400 } else { | |
| 401 _state = _State.DECLARED; | |
| 402 _streams.changeStatus(NodeStatus.IDLE); | |
| 403 } | |
| 404 }); | |
| 405 } | |
| 406 | |
| 407 /// Restart the entire transformation, including `declareOutputs` if | |
| 408 /// applicable. | |
| 409 void _restartRun() { | |
| 410 if (_state == _State.DECLARED || _state == _State.APPLIED) { | |
| 411 // If we're currently idle, we can restart the transformation immediately. | |
| 412 _run(); | |
| 413 return; | |
| 414 } | |
| 415 | |
| 416 // If we're actively running `declareOutputs` or `apply`, cancel the | |
| 417 // transforms and transition to `NEEDS_DECLARE`. Once the transformer's | |
| 418 // method returns, we'll transition to `DECLARING`. | |
| 419 if (_declareController != null) _declareController.cancel(); | |
| 420 if (_applyController != null) _applyController.cancel(); | |
| 421 _state = _State.NEEDS_DECLARE; | |
| 422 } | |
| 423 | |
| 424 /// Runs [transform.declareOutputs] and emits the resulting assets as dirty | |
| 425 /// assets. | |
| 426 /// | |
| 427 /// Calls [callback] when it's finished. This doesn't return a future so that | |
| 428 /// [callback] is called synchronously if there are no outputs to declare. If | |
| 429 /// [this] is removed while inputs are being declared, [callback] will not be | |
| 430 /// called. | |
| 431 void _declareOutputs(void callback()) { | |
| 432 if (transformer is! DeclaringAggregateTransformer) { | |
| 433 callback(); | |
| 434 return; | |
| 435 } | |
| 436 | |
| 437 _state = _State.DECLARING; | |
| 438 var controller = new DeclaringAggregateTransformController(this); | |
| 439 _declareController = controller; | |
| 440 _streams.onLogPool.add(controller.onLog); | |
| 441 for (var primary in _primaries) { | |
| 442 controller.addId(primary.id); | |
| 443 } | |
| 444 _maybeFinishDeclareController(); | |
| 445 | |
| 446 syncFuture(() { | |
| 447 return (transformer as DeclaringAggregateTransformer) | |
| 448 .declareOutputs(controller.transform); | |
| 449 }).whenComplete(() { | |
| 450 // Cancel the controller here even if `declareOutputs` wasn't interrupted. | |
| 451 // Since the declaration is finished, we want to close out the | |
| 452 // controller's streams. | |
| 453 controller.cancel(); | |
| 454 _declareController = null; | |
| 455 }).then((_) { | |
| 456 if (_isRemoved) return; | |
| 457 if (_state == _State.NEEDS_DECLARE) { | |
| 458 _declareOutputs(callback); | |
| 459 return; | |
| 460 } | |
| 461 | |
| 462 if (controller.loggedError) { | |
| 463 // If `declareOutputs` fails, fall back to treating a declaring | |
| 464 // transformer as though it were eager. | |
| 465 if (transformer is! LazyAggregateTransformer) _forced = true; | |
| 466 callback(); | |
| 467 return; | |
| 468 } | |
| 469 | |
| 470 _consumedPrimaries = controller.consumedPrimaries; | |
| 471 _declaredOutputs = controller.outputIds; | |
| 472 var invalidIds = _declaredOutputs | |
| 473 .where((id) => id.package != phase.cascade.package).toSet(); | |
| 474 for (var id in invalidIds) { | |
| 475 _declaredOutputs.remove(id); | |
| 476 // TODO(nweiz): report this as a warning rather than a failing error. | |
| 477 phase.cascade.reportError(new InvalidOutputException(info, id)); | |
| 478 } | |
| 479 | |
| 480 for (var primary in _primaries) { | |
| 481 if (_declaredOutputs.contains(primary.id)) continue; | |
| 482 _passThrough(primary.id); | |
| 483 } | |
| 484 _emitDeclaredOutputs(); | |
| 485 callback(); | |
| 486 }).catchError((error, stackTrace) { | |
| 487 if (_isRemoved) return; | |
| 488 if (transformer is! LazyAggregateTransformer) _forced = true; | |
| 489 phase.cascade.reportError(_wrapException(error, stackTrace)); | |
| 490 callback(); | |
| 491 }); | |
| 492 } | |
| 493 | |
| 494 /// Emits a dirty asset node for all outputs that were declared by the | |
| 495 /// transformer. | |
| 496 /// | |
| 497 /// This won't emit any outputs for which there already exist output | |
| 498 /// controllers. It should only be called for transforms that have declared | |
| 499 /// their outputs. | |
| 500 void _emitDeclaredOutputs() { | |
| 501 assert(_declaredOutputs != null); | |
| 502 for (var id in _declaredOutputs) { | |
| 503 if (_outputControllers.containsKey(id)) continue; | |
| 504 var controller = _forced | |
| 505 ? new AssetNodeController(id, this) | |
| 506 : new AssetNodeController.lazy(id, force, this); | |
| 507 _outputControllers[id] = controller; | |
| 508 _streams.onAssetController.add(controller.node); | |
| 509 } | |
| 510 } | |
| 511 | |
| 512 //// Mark all emitted and passed-through outputs of this transform as dirty. | |
| 513 void _markOutputsDirty() { | |
| 514 for (var controller in _passThroughControllers.values) { | |
| 515 controller.setDirty(); | |
| 516 } | |
| 517 for (var controller in _outputControllers.values) { | |
| 518 if (_forced) { | |
| 519 controller.setDirty(); | |
| 520 } else { | |
| 521 controller.setLazy(force); | |
| 522 } | |
| 523 } | |
| 524 } | |
| 525 | |
| 526 /// Applies this transform. | |
| 527 void _apply() { | |
| 528 assert(!_isRemoved); | |
| 529 | |
| 530 _markOutputsDirty(); | |
| 531 _clearSecondarySubscriptions(); | |
| 532 _state = _State.APPLYING; | |
| 533 _streams.changeStatus(status); | |
| 534 _runApply().then((hadError) { | |
| 535 if (_isRemoved) return; | |
| 536 | |
| 537 if (_state == _State.DECLARED) return; | |
| 538 | |
| 539 if (_state == _State.NEEDS_DECLARE) { | |
| 540 _run(); | |
| 541 return; | |
| 542 } | |
| 543 | |
| 544 // If an input's contents changed while running `apply`, retry unless the | |
| 545 // transformer is deferred and hasn't been forced. | |
| 546 if (_state == _State.NEEDS_APPLY) { | |
| 547 if (_forced || _canRunDeclaringEagerly) { | |
| 548 _apply(); | |
| 549 } else { | |
| 550 _state = _State.DECLARED; | |
| 551 } | |
| 552 return; | |
| 553 } | |
| 554 | |
| 555 if (_declaring) _forced = false; | |
| 556 | |
| 557 assert(_state == _State.APPLYING); | |
| 558 if (hadError) { | |
| 559 _clearOutputs(); | |
| 560 // If the transformer threw an error, we don't want to emit the | |
| 561 // pass-through assets in case they'll be overwritten by the | |
| 562 // transformer. However, if the transformer declared that it wouldn't | |
| 563 // overwrite or consume a pass-through asset, we can safely emit it. | |
| 564 if (_declaredOutputs != null) { | |
| 565 for (var input in _primaries) { | |
| 566 if (_consumedPrimaries.contains(input.id) || | |
| 567 _declaredOutputs.contains(input.id)) { | |
| 568 _consumePrimary(input.id); | |
| 569 } else { | |
| 570 _passThrough(input.id); | |
| 571 } | |
| 572 } | |
| 573 } | |
| 574 } | |
| 575 | |
| 576 _state = _State.APPLIED; | |
| 577 _streams.changeStatus(NodeStatus.IDLE); | |
| 578 }); | |
| 579 } | |
| 580 | |
| 581 /// Gets the asset for an input [id]. | |
| 582 /// | |
| 583 /// If an input with [id] cannot be found, throws an [AssetNotFoundException]. | |
| 584 Future<Asset> getInput(AssetId id) { | |
| 585 _timeAwaitingInputs.start(); | |
| 586 _pendingSecondaryInputs[id] = _pendingSecondaryInputs.containsKey(id) | |
| 587 ? _pendingSecondaryInputs[id] + 1 | |
| 588 : 1; | |
| 589 return phase.previous.getOutput(id).then((node) { | |
| 590 // Throw if the input isn't found. This ensures the transformer's apply | |
| 591 // is exited. We'll then catch this and report it through the proper | |
| 592 // results stream. | |
| 593 if (node == null) { | |
| 594 _missingInputs.add(id); | |
| 595 | |
| 596 // If this id is for an asset in another package, subscribe to that | |
| 597 // package's asset cascade so when it starts emitting the id we know to | |
| 598 // re-run the transformer. | |
| 599 if (id.package != phase.cascade.package) { | |
| 600 var stream = phase.cascade.graph.onAssetFor(id.package); | |
| 601 if (stream != null) { | |
| 602 _missingExternalInputSubscriptions.putIfAbsent(id.package, () { | |
| 603 return stream.listen((node) { | |
| 604 if (!_missingInputs.contains(node.id)) return; | |
| 605 if (_forced) node.force(); | |
| 606 _dirty(); | |
| 607 }); | |
| 608 }); | |
| 609 } | |
| 610 } | |
| 611 | |
| 612 throw new AssetNotFoundException(id); | |
| 613 } | |
| 614 | |
| 615 _secondarySubscriptions.putIfAbsent(node.id, () { | |
| 616 return node.onStateChange.listen((_) => _dirty()); | |
| 617 }); | |
| 618 | |
| 619 return node.asset; | |
| 620 }).whenComplete(() { | |
| 621 assert(_pendingSecondaryInputs.containsKey(id)); | |
| 622 if (_pendingSecondaryInputs[id] == 1) { | |
| 623 _pendingSecondaryInputs.remove(id); | |
| 624 } else { | |
| 625 _pendingSecondaryInputs[id]--; | |
| 626 } | |
| 627 if (_pendingSecondaryInputs.isEmpty) _timeAwaitingInputs.stop(); | |
| 628 }); | |
| 629 } | |
| 630 | |
| 631 /// Run [AggregateTransformer.apply]. | |
| 632 /// | |
| 633 /// Returns whether or not an error occurred while running the transformer. | |
| 634 Future<bool> _runApply() { | |
| 635 var controller = new AggregateTransformController(this); | |
| 636 _applyController = controller; | |
| 637 _streams.onLogPool.add(controller.onLog); | |
| 638 for (var primary in _primaries) { | |
| 639 if (!primary.state.isAvailable) continue; | |
| 640 controller.addInput(primary.asset); | |
| 641 } | |
| 642 _maybeFinishApplyController(); | |
| 643 | |
| 644 var transformCounterTimer; | |
| 645 | |
| 646 return syncFuture(() { | |
| 647 _timeInTransformer.reset(); | |
| 648 _timeAwaitingInputs.reset(); | |
| 649 _timeInTransformer.start(); | |
| 650 | |
| 651 transformCounterTimer = new Timer.periodic(_applyLogDuration, (_) { | |
| 652 if (_streams.onLogController.isClosed || | |
| 653 !_timeInTransformer.isRunning) { | |
| 654 return; | |
| 655 } | |
| 656 | |
| 657 var message = new StringBuffer("Not yet complete after " | |
| 658 "${niceDuration(_timeInTransformer.elapsed)}"); | |
| 659 if (_pendingSecondaryInputs.isNotEmpty) { | |
| 660 message.write(", waiting on input(s) " | |
| 661 "${_pendingSecondaryInputs.keys.join(", ")}"); | |
| 662 } | |
| 663 _streams.onLogController.add(new LogEntry( | |
| 664 info, | |
| 665 info.primaryId, | |
| 666 LogLevel.FINE, | |
| 667 message.toString(), | |
| 668 null)); | |
| 669 }); | |
| 670 | |
| 671 return transformer.apply(controller.transform); | |
| 672 }).whenComplete(() { | |
| 673 transformCounterTimer.cancel(); | |
| 674 _timeInTransformer.stop(); | |
| 675 _timeAwaitingInputs.stop(); | |
| 676 | |
| 677 // Cancel the controller here even if `apply` wasn't interrupted. Since | |
| 678 // the apply is finished, we want to close out the controller's streams. | |
| 679 controller.cancel(); | |
| 680 _applyController = null; | |
| 681 }).then((_) { | |
| 682 assert(_state != _State.DECLARED); | |
| 683 assert(_state != _State.DECLARING); | |
| 684 assert(_state != _State.APPLIED); | |
| 685 | |
| 686 if (!_forced && _primaries.any((node) => !node.state.isAvailable)) { | |
| 687 _state = _State.DECLARED; | |
| 688 _streams.changeStatus(NodeStatus.IDLE); | |
| 689 return false; | |
| 690 } | |
| 691 | |
| 692 if (_isRemoved) return false; | |
| 693 if (_state == _State.NEEDS_APPLY) return false; | |
| 694 if (_state == _State.NEEDS_DECLARE) return false; | |
| 695 if (controller.loggedError) return true; | |
| 696 | |
| 697 // If the transformer took long enough, log its duration in fine output. | |
| 698 // That way it's not always visible, but users running with "pub serve | |
| 699 // --verbose" can see it. | |
| 700 var ranLong = _timeInTransformer.elapsed > new Duration(seconds: 1); | |
| 701 var ranLongLocally = | |
| 702 _timeInTransformer.elapsed - _timeAwaitingInputs.elapsed > | |
| 703 new Duration(milliseconds: 200); | |
| 704 | |
| 705 // Report the transformer's timing information if it spent more than 0.2s | |
| 706 // doing things other than waiting for its secondary inputs or if it spent | |
| 707 // more than 1s in total. | |
| 708 if (ranLongLocally || ranLong) { | |
| 709 _streams.onLogController.add(new LogEntry( | |
| 710 info, info.primaryId, LogLevel.FINE, | |
| 711 "Took ${niceDuration(_timeInTransformer.elapsed)} " | |
| 712 "(${niceDuration(_timeAwaitingInputs.elapsed)} awaiting " | |
| 713 "secondary inputs).", | |
| 714 null)); | |
| 715 } | |
| 716 | |
| 717 _handleApplyResults(controller); | |
| 718 return false; | |
| 719 }).catchError((error, stackTrace) { | |
| 720 // If the transform became dirty while processing, ignore any errors from | |
| 721 // it. | |
| 722 if (_state == _State.NEEDS_APPLY || _isRemoved) return false; | |
| 723 | |
| 724 // Catch all transformer errors and pipe them to the results stream. This | |
| 725 // is so a broken transformer doesn't take down the whole graph. | |
| 726 phase.cascade.reportError(_wrapException(error, stackTrace)); | |
| 727 return true; | |
| 728 }); | |
| 729 } | |
| 730 | |
| 731 /// Handle the results of running [Transformer.apply]. | |
| 732 /// | |
| 733 /// [controller] should be the controller for the [AggegateTransform] passed | |
| 734 /// to [AggregateTransformer.apply]. | |
| 735 void _handleApplyResults(AggregateTransformController controller) { | |
| 736 _consumedPrimaries = controller.consumedPrimaries; | |
| 737 | |
| 738 var newOutputs = controller.outputs; | |
| 739 // Any ids that are for a different package are invalid. | |
| 740 var invalidIds = newOutputs | |
| 741 .map((asset) => asset.id) | |
| 742 .where((id) => id.package != phase.cascade.package) | |
| 743 .toSet(); | |
| 744 for (var id in invalidIds) { | |
| 745 newOutputs.removeId(id); | |
| 746 // TODO(nweiz): report this as a warning rather than a failing error. | |
| 747 phase.cascade.reportError(new InvalidOutputException(info, id)); | |
| 748 } | |
| 749 | |
| 750 // Remove outputs that used to exist but don't anymore. | |
| 751 for (var id in _outputControllers.keys.toList()) { | |
| 752 if (newOutputs.containsId(id)) continue; | |
| 753 _outputControllers.remove(id).setRemoved(); | |
| 754 } | |
| 755 | |
| 756 // Emit or stop emitting pass-through assets between removing and adding | |
| 757 // outputs to ensure there are no collisions. | |
| 758 for (var id in _primaries.map((node) => node.id)) { | |
| 759 if (_consumedPrimaries.contains(id) || newOutputs.containsId(id)) { | |
| 760 _consumePrimary(id); | |
| 761 } else { | |
| 762 _passThrough(id); | |
| 763 } | |
| 764 } | |
| 765 | |
| 766 // Store any new outputs or new contents for existing outputs. | |
| 767 for (var asset in newOutputs) { | |
| 768 var controller = _outputControllers[asset.id]; | |
| 769 if (controller != null) { | |
| 770 controller.setAvailable(asset); | |
| 771 } else { | |
| 772 var controller = new AssetNodeController.available(asset, this); | |
| 773 _outputControllers[asset.id] = controller; | |
| 774 _streams.onAssetController.add(controller.node); | |
| 775 } | |
| 776 } | |
| 777 } | |
| 778 | |
| 779 /// Cancels all subscriptions to secondary input nodes and other cascades. | |
| 780 void _clearSecondarySubscriptions() { | |
| 781 _missingInputs.clear(); | |
| 782 for (var subscription in _secondarySubscriptions.values) { | |
| 783 subscription.cancel(); | |
| 784 } | |
| 785 for (var subscription in _missingExternalInputSubscriptions.values) { | |
| 786 subscription.cancel(); | |
| 787 } | |
| 788 _secondarySubscriptions.clear(); | |
| 789 _missingExternalInputSubscriptions.clear(); | |
| 790 } | |
| 791 | |
| 792 /// Removes all output assets. | |
| 793 void _clearOutputs() { | |
| 794 // Remove all the previously-emitted assets. | |
| 795 for (var controller in _outputControllers.values) { | |
| 796 controller.setRemoved(); | |
| 797 } | |
| 798 _outputControllers.clear(); | |
| 799 } | |
| 800 | |
| 801 /// Emit the pass-through node for the primary input [id] if it's not being | |
| 802 /// emitted already. | |
| 803 void _passThrough(AssetId id) { | |
| 804 assert(!_outputControllers.containsKey(id)); | |
| 805 | |
| 806 if (_consumedPrimaries.contains(id)) return; | |
| 807 var controller = _passThroughControllers[id]; | |
| 808 var primary = _primaries[id]; | |
| 809 if (controller == null) { | |
| 810 controller = new AssetNodeController.from(primary); | |
| 811 _passThroughControllers[id] = controller; | |
| 812 _streams.onAssetController.add(controller.node); | |
| 813 } else if (primary.state.isDirty) { | |
| 814 controller.setDirty(); | |
| 815 } else if (!controller.node.state.isAvailable) { | |
| 816 controller.setAvailable(primary.asset); | |
| 817 } | |
| 818 } | |
| 819 | |
| 820 /// Stops emitting the pass-through node for the primary input [id] if it's | |
| 821 /// being emitted. | |
| 822 void _consumePrimary(AssetId id) { | |
| 823 var controller = _passThroughControllers.remove(id); | |
| 824 if (controller == null) return; | |
| 825 controller.setRemoved(); | |
| 826 } | |
| 827 | |
| 828 /// If `declareOutputs` is running and all previous phases have declared their | |
| 829 /// outputs, mark [_declareController] as done. | |
| 830 void _maybeFinishDeclareController() { | |
| 831 if (_declareController == null) return; | |
| 832 if (classifier.isClassifying) return; | |
| 833 if (phase.previous.status == NodeStatus.RUNNING) return; | |
| 834 _declareController.done(); | |
| 835 } | |
| 836 | |
| 837 /// If `apply` is running, all previous phases have declared their outputs, | |
| 838 /// and all primary inputs are available and thus have been passed to the | |
| 839 /// transformer, mark [_applyController] as done. | |
| 840 void _maybeFinishApplyController() { | |
| 841 if (_applyController == null) return; | |
| 842 if (classifier.isClassifying) return; | |
| 843 if (_primaries.any((input) => !input.state.isAvailable)) return; | |
| 844 if (phase.previous.status == NodeStatus.RUNNING) return; | |
| 845 _applyController.done(); | |
| 846 } | |
| 847 | |
| 848 BarbackException _wrapException(error, StackTrace stackTrace) { | |
| 849 if (error is! AssetNotFoundException) { | |
| 850 return new TransformerException(info, error, stackTrace); | |
| 851 } else { | |
| 852 return new MissingInputException(info, error.id); | |
| 853 } | |
| 854 } | |
| 855 | |
| 856 String toString() => | |
| 857 "transform node in $_location for $transformer on ${info.primaryId} " | |
| 858 "($_state, $status, ${_forced ? '' : 'un'}forced)"; | |
| 859 } | |
| 860 | |
| 861 /// The enum of states that [TransformNode] can be in. | |
| 862 class _State { | |
| 863 /// The transform is running [DeclaringAggregateTransformer.declareOutputs]. | |
| 864 /// | |
| 865 /// If the set of primary inputs changes while in this state, it will | |
| 866 /// transition to [NEEDS_DECLARE]. If the [TransformNode] is still in this | |
| 867 /// state when `declareOutputs` finishes running, it will transition to | |
| 868 /// [APPLYING] if the transform is non-lazy and all of its primary inputs are | |
| 869 /// available, and [DECLARED] otherwise. | |
| 870 /// | |
| 871 /// Non-declaring transformers will transition out of this state and into | |
| 872 /// [APPLYING] immediately. | |
| 873 static const DECLARING = const _State._("declaring outputs"); | |
| 874 | |
| 875 /// The transform is running [AggregateTransformer.declareOutputs] or | |
| 876 /// [AggregateTransform.apply], but a primary input was added or removed after | |
| 877 /// it started, so it will need to re-run `declareOutputs`. | |
| 878 /// | |
| 879 /// The [TransformNode] will transition to [DECLARING] once `declareOutputs` | |
| 880 /// or `apply` finishes running. | |
| 881 static const NEEDS_DECLARE = const _State._("needs declare"); | |
| 882 | |
| 883 /// The transform is deferred and has run | |
| 884 /// [DeclaringAggregateTransformer.declareOutputs] but hasn't yet been forced. | |
| 885 /// | |
| 886 /// The [TransformNode] will transition to [APPLYING] when one of the outputs | |
| 887 /// has been forced or if the transformer is non-lazy and all of its primary | |
| 888 /// inputs become available. | |
| 889 static const DECLARED = const _State._("declared"); | |
| 890 | |
| 891 /// The transform is running [AggregateTransformer.apply]. | |
| 892 /// | |
| 893 /// If an input's contents change or a secondary input is added or removed | |
| 894 /// while in this state, the [TransformNode] will transition to [NEEDS_APPLY]. | |
| 895 /// If a primary input is added or removed, it will transition to | |
| 896 /// [NEEDS_DECLARE]. If it's still in this state when `apply` finishes | |
| 897 /// running, it will transition to [APPLIED]. | |
| 898 static const APPLYING = const _State._("applying"); | |
| 899 | |
| 900 /// The transform is running [AggregateTransformer.apply], but an input's | |
| 901 /// contents changed or a secondary input was added or removed after it | |
| 902 /// started, so it will need to re-run `apply`. | |
| 903 /// | |
| 904 /// If a primary input is added or removed while in this state, the | |
| 905 /// [TranformNode] will transition to [NEEDS_DECLARE]. If it's still in this | |
| 906 /// state when `apply` finishes running, it will transition to [APPLYING]. | |
| 907 static const NEEDS_APPLY = const _State._("needs apply"); | |
| 908 | |
| 909 /// The transform has finished running [AggregateTransformer.apply], whether | |
| 910 /// or not it emitted an error. | |
| 911 /// | |
| 912 /// If an input's contents change or a secondary input is added or removed, | |
| 913 /// the [TransformNode] will transition to [DECLARED] if the transform is | |
| 914 /// declaring and [APPLYING] otherwise. If a primary input is added or | |
| 915 /// removed, this will transition to [DECLARING]. | |
| 916 static const APPLIED = const _State._("applied"); | |
| 917 | |
| 918 final String name; | |
| 919 | |
| 920 const _State._(this.name); | |
| 921 | |
| 922 String toString() => name; | |
| 923 } | |
| OLD | NEW |