Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(122)

Side by Side Diff: pkg/barback/lib/src/graph/transform_node.dart

Issue 267393009: Transition barback's infrastructure to an aggregate-based model. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: code review Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/barback/lib/src/errors.dart ('k') | pkg/barback/lib/src/graph/transformer_classifier.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library barback.graph.transform_node; 5 library barback.graph.transform_node;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 8
9 import '../asset/asset.dart'; 9 import '../asset/asset.dart';
10 import '../asset/asset_id.dart'; 10 import '../asset/asset_id.dart';
11 import '../asset/asset_node.dart'; 11 import '../asset/asset_node.dart';
12 import '../asset/asset_node_set.dart';
12 import '../errors.dart'; 13 import '../errors.dart';
13 import '../log.dart'; 14 import '../log.dart';
14 import '../transformer/aggregate_transform.dart'; 15 import '../transformer/aggregate_transform.dart';
16 import '../transformer/aggregate_transformer.dart';
15 import '../transformer/declaring_aggregate_transform.dart'; 17 import '../transformer/declaring_aggregate_transform.dart';
16 import '../transformer/declaring_transform.dart'; 18 import '../transformer/declaring_aggregate_transformer.dart';
17 import '../transformer/declaring_transformer.dart'; 19 import '../transformer/lazy_aggregate_transformer.dart';
18 import '../transformer/lazy_transformer.dart'; 20 import '../utils.dart';
19 import '../transformer/transform.dart';
20 import '../transformer/transformer.dart';
21 import 'node_status.dart'; 21 import 'node_status.dart';
22 import 'node_streams.dart'; 22 import 'node_streams.dart';
23 import 'phase.dart'; 23 import 'phase.dart';
24 24
25 /// Describes a transform on a set of assets and its relationship to the build 25 /// Describes a transform on a set of assets and its relationship to the build
26 /// dependency graph. 26 /// dependency graph.
27 /// 27 ///
28 /// Keeps track of whether it's dirty and needs to be run and which assets it 28 /// Keeps track of whether it's dirty and needs to be run and which assets it
29 /// depends on. 29 /// depends on.
30 class TransformNode { 30 class TransformNode {
31 /// The aggregate key for this node. 31 /// The aggregate key for this node.
32 final String key; 32 final String key;
33 33
34 /// The [Phase] that this transform runs in. 34 /// The [Phase] that this transform runs in.
35 final Phase phase; 35 final Phase phase;
36 36
37 /// The [Transformer] to apply to this node's inputs. 37 /// The [AggregateTransformer] to apply to this node's inputs.
38 final Transformer transformer; 38 final AggregateTransformer transformer;
39 39
40 /// The node for the primary asset this transform depends on. 40 /// The primary asset nodes this transform runs on.
41 final AssetNode primary; 41 final _primaries = new AssetNodeSet();
42 42
43 /// A string describing the location of [this] in the transformer graph. 43 /// A string describing the location of [this] in the transformer graph.
44 final String _location; 44 final String _location;
45 45
46 /// The subscription to [primary]'s [AssetNode.onStateChange] stream. 46 /// The subscription to the [_primaries]' [AssetNode.onStateChange] streams.
47 StreamSubscription _primarySubscription; 47 final _primarySubscriptions = new Map<AssetId, StreamSubscription>();
48 48
49 /// The subscription to [phase]'s [Phase.onAsset] stream. 49 /// The subscription to [phase]'s [Phase.onAsset] stream.
50 StreamSubscription<AssetNode> _phaseSubscription; 50 StreamSubscription<AssetNode> _phaseAssetSubscription;
51
52 /// The subscription to [phase]'s [Phase.onStatusChange] stream.
53 StreamSubscription<NodeStatus> _phaseStatusSubscription;
51 54
52 /// How far along [this] is in processing its assets. 55 /// How far along [this] is in processing its assets.
53 NodeStatus get status { 56 NodeStatus get status {
54 if (_state == _State.APPLIED || _state == _State.DECLARED) { 57 if (_state == _State.APPLIED || _state == _State.DECLARED) {
55 return NodeStatus.IDLE; 58 return NodeStatus.IDLE;
56 } 59 }
57 60
58 if (_declaring && _state != _State.DECLARING) { 61 if (_declaring && _state != _State.DECLARING &&
62 _state != _State.NEEDS_DECLARE) {
59 return NodeStatus.MATERIALIZING; 63 return NodeStatus.MATERIALIZING;
60 } else { 64 } else {
61 return NodeStatus.RUNNING; 65 return NodeStatus.RUNNING;
62 } 66 }
63 } 67 }
64 68
69 /// The [TransformInfo] describing this node.
70 ///
71 /// [TransformInfo] is the publicly-visible representation of a transform
72 /// node.
73 TransformInfo get info => new TransformInfo(transformer,
74 new AssetId(phase.cascade.package, key));
75
65 /// Whether this is a declaring transform. 76 /// Whether this is a declaring transform.
66 /// 77 ///
67 /// This is usually identical to `transformer is DeclaringTransformer`, but if 78 /// This is usually identical to `transformer is
68 /// a declaring and non-lazy transformer emits an error during 79 /// DeclaringAggregateTransformer`, but if a declaring and non-lazy
69 /// `declareOutputs` it's treated as though it wasn't declaring. 80 /// transformer emits an error during `declareOutputs` it's treated as though
70 bool get _declaring => transformer is DeclaringTransformer && 81 /// it wasn't declaring.
82 bool get _declaring => transformer is DeclaringAggregateTransformer &&
71 (_state == _State.DECLARING || _declaredOutputs != null); 83 (_state == _State.DECLARING || _declaredOutputs != null);
72 84
73 /// Whether this transform has been forced since it last finished applying. 85 /// Whether this transform has been forced since it last finished applying.
74 /// 86 ///
75 /// A transform being forced means it should run until it generates outputs 87 /// A transform being forced means it should run until it generates outputs
76 /// and is no longer dirty. This is always true for non-declaring 88 /// and is no longer dirty. This is always true for non-declaring
77 /// transformers, since they always need to eagerly generate outputs. 89 /// transformers, since they always need to eagerly generate outputs.
78 bool _forced; 90 bool _forced;
79 91
80 /// The subscriptions to each input's [AssetNode.onStateChange] stream. 92 /// The subscriptions to each secondary input's [AssetNode.onStateChange]
81 final _inputSubscriptions = new Map<AssetId, StreamSubscription>(); 93 /// stream.
94 final _secondarySubscriptions = new Map<AssetId, StreamSubscription>();
82 95
83 /// The controllers for the asset nodes emitted by this node. 96 /// The controllers for the asset nodes emitted by this node.
84 final _outputControllers = new Map<AssetId, AssetNodeController>(); 97 final _outputControllers = new Map<AssetId, AssetNodeController>();
85 98
86 /// The ids of inputs the transformer tried and failed to read last time it 99 /// The ids of inputs the transformer tried and failed to read last time it
87 /// ran. 100 /// ran.
88 final _missingInputs = new Set<AssetId>(); 101 final _missingInputs = new Set<AssetId>();
89 102
90 /// The controller that's used to pass [primary] through [this] if it's not 103 /// The controllers that are used to pass each primary input through [this] if
91 /// consumed or overwritten. 104 /// it's not consumed or overwritten.
92 /// 105 ///
93 /// This needs an intervening controller to ensure that the output can be 106 /// This needs an intervening controller to ensure that the output can be
94 /// marked dirty when determining whether [this] will consume or overwrite it, 107 /// marked dirty when determining whether [this] will consume or overwrite it,
95 /// and be marked removed if it does. [_passThroughController] will be null 108 /// and be marked removed if it does. No pass-through controller will exist
96 /// if the asset is not being passed through. 109 /// for primary inputs that are not being passed through.
97 AssetNodeController _passThroughController; 110 final _passThroughControllers = new Map<AssetId, AssetNodeController>();
98 111
99 /// The asset node for this transform. 112 /// The asset node for this transform.
100 final _streams = new NodeStreams(); 113 final _streams = new NodeStreams();
101 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange; 114 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange;
102 Stream<AssetNode> get onAsset => _streams.onAsset; 115 Stream<AssetNode> get onAsset => _streams.onAsset;
103 Stream<LogEntry> get onLog => _streams.onLog; 116 Stream<LogEntry> get onLog => _streams.onLog;
104 117
105 /// The current state of [this]. 118 /// The current state of [this].
106 var _state = _State.DECLARING; 119 var _state = _State.DECLARED;
107 120
108 /// Whether [this] has been marked as removed. 121 /// Whether [this] has been marked as removed.
109 bool get _isRemoved => _streams.onAssetController.isClosed; 122 bool get _isRemoved => _streams.onAssetController.isClosed;
110 123
111 // If [transformer] is declaring but not lazy and [primary] is available, we 124 // If [transformer] is declaring but not lazy and [primary] is available, we
112 // can run [apply] even if [force] hasn't been called, since [transformer] 125 // can run [apply] even if [force] hasn't been called, since [transformer]
113 // should run eagerly if possible. 126 // should run eagerly if possible.
114 bool get _canRunDeclaringEagerly => 127 bool get _canRunDeclaringEagerly =>
115 _declaring && transformer is! LazyTransformer && 128 _declaring && transformer is! LazyAggregateTransformer &&
116 primary.state.isAvailable; 129 _primaries.every((input) => input.state.isAvailable);
117 130
118 /// Whether the most recent run of this transform has declared that it 131 /// Which primary inputs the most recent run of this transform has declared
119 /// consumes the primary input. 132 /// that it consumes.
120 /// 133 ///
121 /// Defaults to `false`. This is not meaningful unless [_state] is 134 /// This starts out `null`, indicating that the transform hasn't declared
122 /// [_State.APPLIED] or [_State.DECLARED]. 135 /// anything yet. This is not meaningful unless [_state] is [_State.APPLIED]
123 bool _consumePrimary = false; 136 /// or [_State.DECLARED].
137 Set<AssetId> _consumedPrimaries;
124 138
125 /// The set of output ids that [transformer] declared it would emit. 139 /// The set of output ids that [transformer] declared it would emit.
126 /// 140 ///
127 /// This is only non-null if [transformer] is a [DeclaringTransformer] and its 141 /// This is only non-null if [transformer] is a
128 /// [declareOutputs] has been run successfully. 142 /// [DeclaringAggregateTransformer] and its [declareOutputs] has been run
143 /// successfully.
129 Set<AssetId> _declaredOutputs; 144 Set<AssetId> _declaredOutputs;
130 145
131 TransformNode(this.phase, Transformer transformer, AssetNode primary, 146 /// The controller for the currently-running
132 this._location) 147 /// [DeclaringAggregateTransformer.declareOutputs] call's
133 : key = primary.id.path, 148 /// [DeclaringAggregateTransform].
134 transformer = transformer, 149 ///
135 primary = primary { 150 /// This will be non-`null` when
136 _forced = transformer is! DeclaringTransformer; 151 /// [DeclaringAggregateTransformer.declareOutputs] is running. This means that
137 if (_forced) primary.force(); 152 /// it's always non-`null` when [_state] is [_State.DECLARING], sometimes
153 /// non-`null` when it's [_State.NEEDS_DECLARE], and always `null` otherwise.
154 DeclaringAggregateTransformController _declareController;
138 155
139 _primarySubscription = primary.onStateChange.listen((state) { 156 /// The controller for the currently-running [AggregateTransformer.apply]
140 if (state.isRemoved) { 157 /// call's [AggregateTransform].
141 remove(); 158 ///
142 } else { 159 /// This will be non-`null` when [AggregateTransform.apply] is running, which
143 if (_forced) primary.force(); 160 /// means that it's always non-`null` when [_state] is [_State.APPLYING] or
144 _dirty(); 161 /// [_State.NEEDS_APPLY], sometimes non-`null` when it's
145 } 162 /// [_State.NEEDS_DECLARE], and always `null` otherwise.
146 }); 163 AggregateTransformController _applyController;
147 164
148 _phaseSubscription = phase.previous.onAsset.listen((node) { 165 TransformNode(this.phase, this.transformer, this.key, this._location) {
166 _forced = transformer is! DeclaringAggregateTransformer;
167
168 _phaseAssetSubscription = phase.previous.onAsset.listen((node) {
149 if (!_missingInputs.contains(node.id)) return; 169 if (!_missingInputs.contains(node.id)) return;
150 if (_forced) node.force(); 170 if (_forced) node.force();
151 _dirty(); 171 _dirty();
152 }); 172 });
153 173
154 _declareOutputs().then((_) { 174 _phaseStatusSubscription = phase.previous.onStatusChange.listen((status) {
155 if (_forced || _canRunDeclaringEagerly) { 175 if (status == NodeStatus.RUNNING) return;
156 _apply(); 176
157 } else { 177 _maybeFinishDeclareController();
158 _state = _State.DECLARED; 178 _maybeFinishApplyController();
159 _streams.changeStatus(NodeStatus.IDLE);
160 }
161 }); 179 });
180
181 _run();
162 } 182 }
163 183
164 /// The [TransformInfo] describing this node. 184 /// Adds [input] as a primary input for this node.
165 /// 185 void addPrimary(AssetNode input) {
166 /// [TransformInfo] is the publicly-visible representation of a transform 186 _primaries.add(input);
167 /// node. 187 if (_forced) input.force();
168 TransformInfo get info => new TransformInfo(transformer, primary.id); 188
189 _primarySubscriptions[input.id] = input.onStateChange
190 .listen((_) => _onPrimaryStateChange(input));
191
192 if (_state == _State.DECLARING && !_declareController.isDone) {
193 // If we're running `declareOutputs` and its id stream isn't closed yet,
194 // pass this in as another id.
195 _declareController.addId(input.id);
196 _maybeFinishDeclareController();
197 } else if (_state == _State.APPLYING) {
198 // If we're running `apply`, we need to wait until [input] is available
199 // before we pass it into the stream. If it's available now, great; if
200 // not, [_onPrimaryStateChange] will handle it.
201 if (!input.state.isAvailable) return;
202 _onPrimaryStateChange(input);
203 _maybeFinishApplyController();
204 } else {
205 // Otherwise, a new input means we'll need to re-run `declareOutputs`.
206 _restartRun();
207 }
208 }
169 209
170 /// Marks this transform as removed. 210 /// Marks this transform as removed.
171 /// 211 ///
172 /// This causes all of the transform's outputs to be marked as removed as 212 /// This causes all of the transform's outputs to be marked as removed as
173 /// well. Normally this will be automatically done internally based on events 213 /// well. Normally this will be automatically done internally based on events
174 /// from the primary input, but it's possible for a transform to no longer be 214 /// from the primary input, but it's possible for a transform to no longer be
175 /// valid even if its primary input still exists. 215 /// valid even if its primary input still exists.
176 void remove() { 216 void remove() {
177 _streams.close(); 217 _streams.close();
178 _primarySubscription.cancel(); 218 _phaseAssetSubscription.cancel();
179 _phaseSubscription.cancel(); 219 _phaseStatusSubscription.cancel();
180 _clearInputSubscriptions(); 220 if (_declareController != null) _declareController.cancel();
221 if (_applyController != null) _applyController.cancel();
222 _clearSecondarySubscriptions();
181 _clearOutputs(); 223 _clearOutputs();
182 if (_passThroughController != null) { 224
183 _passThroughController.setRemoved(); 225 for (var subscription in _primarySubscriptions.values) {
184 _passThroughController = null; 226 subscription.cancel();
185 } 227 }
228 _primarySubscriptions.clear();
229
230 for (var controller in _passThroughControllers.values) {
231 controller.setRemoved();
232 }
233 _passThroughControllers.clear();
186 } 234 }
187 235
188 /// If [this] is deferred, ensures that its concrete outputs will be 236 /// If [this] is deferred, ensures that its concrete outputs will be
189 /// generated. 237 /// generated.
190 void force() { 238 void force() {
191 if (_forced || _state == _State.APPLIED) return; 239 if (_forced || _state == _State.APPLIED) return;
192 primary.force(); 240 for (var input in _primaries) {
241 input.force();
242 }
243
193 _forced = true; 244 _forced = true;
194 if (_state == _State.DECLARED) _dirty(); 245 if (_state == _State.DECLARED) _apply();
195 } 246 }
196 247
197 /// Marks this transform as dirty. 248 /// Marks this transform as dirty.
198 /// 249 ///
199 /// This causes all of the transform's outputs to be marked as dirty as well. 250 /// Specifically, this should be called when one of the transform's inputs'
251 /// contents change, or when a secondary input is removed. Primary inputs
252 /// being added or removed are handled by [addInput] and
253 /// [_onPrimaryStateChange].
200 void _dirty() { 254 void _dirty() {
201 // If we're in the process of running [declareOutputs], we already know that 255 if (_state == _State.DECLARING || _state == _State.NEEDS_DECLARE ||
202 // [apply] needs to be run so there's nothing we need to mark as dirty. 256 _state == _State.NEEDS_APPLY) {
203 if (_state == _State.DECLARING) return; 257 // If we already know that [_apply] needs to be run, there's nothing to do
258 // here.
259 return;
260 }
204 261
205 if (!_forced && !_canRunDeclaringEagerly) { 262 if (!_forced && !_canRunDeclaringEagerly) {
206 // [forced] should only ever be false for a declaring transformer. 263 // [forced] should only ever be false for a declaring transformer.
207 assert(_declaring); 264 assert(_declaring);
208 265
209 // If we've finished applying, transition to MATERIALIZING, indicating 266 // If we've finished applying, transition to DECLARED, indicating that we
210 // that we know what outputs [apply] will emit but we're waiting to emit 267 // know what outputs [apply] will emit but we're waiting to emit them
211 // them concretely until [force] is called. If we're still applying, we'll 268 // concretely until [force] is called. If we're still applying, we'll
212 // transition to MATERIALIZING once we finish. 269 // transition to DECLARED once we finish.
213 if (_state == _State.APPLIED) _state = _State.DECLARED; 270 if (_state == _State.APPLIED) _state = _State.DECLARED;
214 for (var controller in _outputControllers.values) { 271 for (var controller in _outputControllers.values) {
215 controller.setLazy(force); 272 controller.setLazy(force);
216 } 273 }
217 _emitDeclaredOutputs(); 274 _emitDeclaredOutputs();
218 return; 275 return;
219 } 276 }
220 277
221 if (_passThroughController != null) _passThroughController.setDirty();
222 for (var controller in _outputControllers.values) {
223 controller.setDirty();
224 }
225
226 if (_state == _State.APPLIED) { 278 if (_state == _State.APPLIED) {
227 if (_declaredOutputs != null) _emitDeclaredOutputs(); 279 if (_declaredOutputs != null) _emitDeclaredOutputs();
228 _apply(); 280 _apply();
229 } else if (_state == _State.DECLARED) { 281 } else if (_state == _State.DECLARED) {
230 _apply(); 282 _apply();
231 } else { 283 } else {
232 _state = _State.NEEDS_APPLY; 284 _state = _State.NEEDS_APPLY;
233 } 285 }
234 } 286 }
235 287
288 /// The callback called when [input]'s state changes.
289 void _onPrimaryStateChange(AssetNode input) {
290 if (input.state.isRemoved) {
291 _primarySubscriptions.remove(input.id);
292
293 if (_primaries.isEmpty) {
294 // If there are no more primary inputs, there's no more use for this
295 // node in the graph. It will be re-created by its
296 // [TransformerClassifier] if a new input with [key] is added.
297 remove();
298 return;
299 }
300
301 // Any change to the number of primary inputs requires that we re-run the
302 // transformation.
303 _restartRun();
304 } else if (input.state.isAvailable) {
305 if (_state == _State.DECLARED && _canRunDeclaringEagerly) {
306 // If [this] is fully declared but hasn't started applying, this input
307 // becoming available may mean that all inputs are available, in which
308 // case we can run apply eagerly.
309 _apply();
310 return;
311 }
312
313 // If we're not actively passing concrete assets to the transformer, the
314 // distinction between a dirty asset and an available one isn't relevant.
315 if (_state != _State.APPLYING) return;
316
317 if (_applyController.isDone) {
318 // If we get a new asset after we've closed the asset stream, we need to
319 // re-run declare and then apply.
320 _restartRun();
321 } else {
322 // If the new asset comes before the asset stream is done, we can just
323 // pass it to the stream.
324 _applyController.addInput(input.asset);
325 _maybeFinishApplyController();
326 }
327 } else {
328 if (_forced) input.force();
329 if (_state == _State.APPLYING && !_applyController.addedId(input.id)) {
330 // If the input hasn't yet been added to the transform's input stream,
331 // there's no need to consider the transformation dirty.
332 return;
333 }
334 _dirty();
335 }
336 }
337
338 /// Run the entire transformation, including both `declareOutputs` (if
339 /// applicable) and `apply`.
340 void _run() {
341 assert(_state != _State.DECLARING);
342 assert(_state != _State.APPLYING);
343
344 _markOutputsDirty();
345 _declareOutputs(() {
346 if (_forced || _canRunDeclaringEagerly) {
347 _apply();
348 } else {
349 _state = _State.DECLARED;
350 _streams.changeStatus(NodeStatus.IDLE);
351 }
352 });
353 }
354
355 /// Restart the entire transformation, including `declareOutputs` if
356 /// applicable.
357 void _restartRun() {
358 if (_state == _State.DECLARED || _state == _State.APPLIED) {
359 // If we're currently idle, we can restart the transformation immediately.
360 _run();
361 return;
362 }
363
364 // If we're actively running `declareOutputs` or `apply`, cancel the
365 // transforms and transition to `NEEDS_DECLARE`. Once the transformer's
366 // method returns, we'll transition to `DECLARING`.
367 if (_declareController != null) _declareController.cancel();
368 if (_applyController != null) _applyController.cancel();
369 _state = _State.NEEDS_DECLARE;
370 }
371
236 /// Runs [transform.declareOutputs] and emits the resulting assets as dirty 372 /// Runs [transform.declareOutputs] and emits the resulting assets as dirty
237 /// assets. 373 /// assets.
238 Future _declareOutputs() { 374 ///
239 if (transformer is! DeclaringTransformer) return new Future.value(); 375 /// Calls [callback] when it's finished. This doesn't return a future so that
376 /// [callback] is called synchronously if there are no outputs to declare. If
377 /// [this] is removed while inputs are being declared, [callback] will not be
378 /// called.
379 void _declareOutputs(void callback()) {
380 if (transformer is! DeclaringAggregateTransformer) {
381 callback();
382 return;
383 }
240 384
385 _state = _State.DECLARING;
241 var controller = new DeclaringAggregateTransformController(this); 386 var controller = new DeclaringAggregateTransformController(this);
387 _declareController = controller;
242 _streams.onLogPool.add(controller.onLog); 388 _streams.onLogPool.add(controller.onLog);
243 controller.idController.add(primary.id); 389 for (var primary in _primaries) {
244 return newDeclaringTransform(controller.transform).then((transform) { 390 controller.addId(primary.id);
245 return (transformer as DeclaringTransformer).declareOutputs(transform); 391 }
392
393 syncFuture(() {
394 return (transformer as DeclaringAggregateTransformer)
395 .declareOutputs(controller.transform);
396 }).whenComplete(() {
397 // Cancel the controller here even if `declareOutputs` wasn't interrupted.
398 // Since the declaration is finished, we want to close out the
399 // controller's streams.
400 controller.cancel();
401 _declareController = null;
246 }).then((_) { 402 }).then((_) {
247 if (_isRemoved) return; 403 if (_isRemoved) return;
248 if (controller.loggedError) return; 404 if (_state == _State.NEEDS_DECLARE) {
405 _declareOutputs(callback);
406 return;
407 }
249 408
250 _consumePrimary = controller.consumedPrimaries.contains(primary.id); 409 if (controller.loggedError) {
410 // If `declareOutputs` fails, fall back to treating a declaring
411 // transformer as though it were eager.
412 if (transformer is! LazyAggregateTransformer) _forced = true;
413 callback();
414 return;
415 }
416
417 _consumedPrimaries = controller.consumedPrimaries;
251 _declaredOutputs = controller.outputIds; 418 _declaredOutputs = controller.outputIds;
252 var invalidIds = _declaredOutputs 419 var invalidIds = _declaredOutputs
253 .where((id) => id.package != phase.cascade.package).toSet(); 420 .where((id) => id.package != phase.cascade.package).toSet();
254 for (var id in invalidIds) { 421 for (var id in invalidIds) {
255 _declaredOutputs.remove(id); 422 _declaredOutputs.remove(id);
256 // TODO(nweiz): report this as a warning rather than a failing error. 423 // TODO(nweiz): report this as a warning rather than a failing error.
257 phase.cascade.reportError(new InvalidOutputException(info, id)); 424 phase.cascade.reportError(new InvalidOutputException(info, id));
258 } 425 }
259 426
260 if (!_declaredOutputs.contains(primary.id)) _emitPassThrough(); 427 for (var primary in _primaries) {
428 if (_declaredOutputs.contains(primary.id)) continue;
429 _passThrough(primary.id);
430 }
261 _emitDeclaredOutputs(); 431 _emitDeclaredOutputs();
432 callback();
262 }).catchError((error, stackTrace) { 433 }).catchError((error, stackTrace) {
263 if (_isRemoved) return; 434 if (_isRemoved) return;
264 if (transformer is! LazyTransformer) _forced = true; 435 if (transformer is! LazyAggregateTransformer) _forced = true;
265 phase.cascade.reportError(_wrapException(error, stackTrace)); 436 phase.cascade.reportError(_wrapException(error, stackTrace));
437 callback();
266 }); 438 });
267 } 439 }
268 440
269 /// Emits a dirty asset node for all outputs that were declared by the 441 /// Emits a dirty asset node for all outputs that were declared by the
270 /// transformer. 442 /// transformer.
271 /// 443 ///
272 /// This won't emit any outputs for which there already exist output 444 /// This won't emit any outputs for which there already exist output
273 /// controllers. It should only be called for transforms that have declared 445 /// controllers. It should only be called for transforms that have declared
274 /// their outputs. 446 /// their outputs.
275 void _emitDeclaredOutputs() { 447 void _emitDeclaredOutputs() {
276 assert(_declaredOutputs != null); 448 assert(_declaredOutputs != null);
277 for (var id in _declaredOutputs) { 449 for (var id in _declaredOutputs) {
278 if (_outputControllers.containsKey(id)) continue; 450 if (_outputControllers.containsKey(id)) continue;
279 var controller = _forced 451 var controller = _forced
280 ? new AssetNodeController(id, this) 452 ? new AssetNodeController(id, this)
281 : new AssetNodeController.lazy(id, force, this); 453 : new AssetNodeController.lazy(id, force, this);
282 _outputControllers[id] = controller; 454 _outputControllers[id] = controller;
283 _streams.onAssetController.add(controller.node); 455 _streams.onAssetController.add(controller.node);
284 } 456 }
285 } 457 }
286 458
459 //// Mark all emitted and passed-through outputs of this transform as dirty.
460 void _markOutputsDirty() {
461 for (var controller in _passThroughControllers.values) {
462 controller.setDirty();
463 }
464 for (var controller in _outputControllers.values) {
465 if (_forced) {
466 controller.setDirty();
467 } else {
468 controller.setLazy(force);
469 }
470 }
471 }
472
287 /// Applies this transform. 473 /// Applies this transform.
288 void _apply() { 474 void _apply() {
289 assert(!_isRemoved); 475 assert(!_isRemoved);
290 476
291 // Clear input subscriptions here as well as in [_process] because [_apply] 477 _markOutputsDirty();
292 // may be restarted independently if only a secondary input changes. 478 _clearSecondarySubscriptions();
293 _clearInputSubscriptions();
294 _state = _State.APPLYING; 479 _state = _State.APPLYING;
295 _streams.changeStatus(status); 480 _streams.changeStatus(status);
296 _runApply().then((hadError) { 481 _runApply().then((hadError) {
297 if (_isRemoved) return; 482 if (_isRemoved) return;
298 483
299 if (_state == _State.DECLARED) return; 484 if (_state == _State.DECLARED) return;
300 485
301 if (_state == _State.NEEDS_APPLY) { 486 if (_state == _State.NEEDS_DECLARE) {
302 _apply(); 487 _run();
303 return; 488 return;
304 } 489 }
305 490
491 // If an input's contents changed while running `apply`, retry unless the
492 // transformer is deferred and hasn't been forced.
493 if (_state == _State.NEEDS_APPLY) {
494 if (_forced || _canRunDeclaringEagerly) {
495 _apply();
496 } else {
497 _state = _State.DECLARED;
498 }
499 return;
500 }
501
306 if (_declaring) _forced = false; 502 if (_declaring) _forced = false;
307 503
308 assert(_state == _State.APPLYING); 504 assert(_state == _State.APPLYING);
309 if (hadError) { 505 if (hadError) {
310 _clearOutputs(); 506 _clearOutputs();
311 // If the transformer threw an error, we don't want to emit the 507 // If the transformer threw an error, we don't want to emit the
312 // pass-through asset in case it will be overwritten by the transformer. 508 // pass-through assets in case they'll be overwritten by the
313 // However, if the transformer declared that it wouldn't overwrite or 509 // transformer. However, if the transformer declared that it wouldn't
314 // consume the pass-through asset, we can safely emit it. 510 // overwrite or consume a pass-through asset, we can safely emit it.
315 if (_declaredOutputs != null && !_consumePrimary && 511 if (_declaredOutputs != null) {
316 !_declaredOutputs.contains(primary.id)) { 512 for (var input in _primaries) {
317 _emitPassThrough(); 513 if (_consumedPrimaries.contains(input.id) ||
318 } else { 514 _declaredOutputs.contains(input.id)) {
319 _dontEmitPassThrough(); 515 _consumePrimary(input.id);
516 } else {
517 _passThrough(input.id);
518 }
519 }
320 } 520 }
321 } 521 }
322 522
323 _state = _State.APPLIED; 523 _state = _State.APPLIED;
324 _streams.changeStatus(NodeStatus.IDLE); 524 _streams.changeStatus(NodeStatus.IDLE);
325 }); 525 });
326 } 526 }
327 527
328 /// Gets the asset for an input [id]. 528 /// Gets the asset for an input [id].
329 /// 529 ///
330 /// If an input with [id] cannot be found, throws an [AssetNotFoundException]. 530 /// If an input with [id] cannot be found, throws an [AssetNotFoundException].
331 Future<Asset> getInput(AssetId id) { 531 Future<Asset> getInput(AssetId id) {
332 return phase.previous.getOutput(id).then((node) { 532 return phase.previous.getOutput(id).then((node) {
333 // Throw if the input isn't found. This ensures the transformer's apply 533 // Throw if the input isn't found. This ensures the transformer's apply
334 // is exited. We'll then catch this and report it through the proper 534 // is exited. We'll then catch this and report it through the proper
335 // results stream. 535 // results stream.
336 if (node == null) { 536 if (node == null) {
337 _missingInputs.add(id); 537 _missingInputs.add(id);
338 throw new AssetNotFoundException(id); 538 throw new AssetNotFoundException(id);
339 } 539 }
340 540
341 _inputSubscriptions.putIfAbsent(node.id, () { 541 _secondarySubscriptions.putIfAbsent(node.id, () {
342 return node.onStateChange.listen((state) => _dirty()); 542 return node.onStateChange.listen((_) => _dirty());
343 }); 543 });
344 544
345 return node.asset; 545 return node.asset;
346 }); 546 });
347 } 547 }
348 548
349 /// Run [Transformer.apply] as soon as [primary] is available. 549 /// Run [AggregateTransformer.apply].
350 /// 550 ///
351 /// Returns whether or not an error occurred while running the transformer. 551 /// Returns whether or not an error occurred while running the transformer.
352 Future<bool> _runApply() { 552 Future<bool> _runApply() {
353 var controller = new AggregateTransformController(this); 553 var controller = new AggregateTransformController(this);
554 _applyController = controller;
354 _streams.onLogPool.add(controller.onLog); 555 _streams.onLogPool.add(controller.onLog);
556 for (var primary in _primaries) {
557 if (!primary.state.isAvailable) continue;
558 controller.addInput(primary.asset);
559 }
355 560
356 return primary.whenAvailable((_) { 561 return syncFuture(() {
357 if (_isRemoved) return null; 562 return transformer.apply(controller.transform);
358 _state = _State.APPLYING; 563 }).whenComplete(() {
359 controller.inputController.add(primary.asset); 564 // Cancel the controller here even if `apply` wasn't interrupted. Since
360 return newTransform(controller.transform).then((transform) { 565 // the apply is finished, we want to close out the controller's streams.
361 return transformer.apply(transform); 566 controller.cancel();
362 }); 567 _applyController = null;
363 }).then((_) { 568 }).then((_) {
364 if (!_forced && !primary.state.isAvailable) { 569 assert(_state != _State.DECLARED);
570 assert(_state != _State.DECLARING);
571 assert(_state != _State.APPLIED);
572
573 if (!_forced && _primaries.any((node) => !node.state.isAvailable)) {
365 _state = _State.DECLARED; 574 _state = _State.DECLARED;
366 _streams.changeStatus(NodeStatus.IDLE); 575 _streams.changeStatus(NodeStatus.IDLE);
367 return false; 576 return false;
368 } 577 }
369 578
370 if (_isRemoved) return false; 579 if (_isRemoved) return false;
371 if (_state == _State.NEEDS_APPLY) return false; 580 if (_state == _State.NEEDS_APPLY) return false;
372 if (_state == _State.DECLARING) return false; 581 if (_state == _State.NEEDS_DECLARE) return false;
373 if (controller.loggedError) return true; 582 if (controller.loggedError) return true;
374 _handleApplyResults(controller); 583 _handleApplyResults(controller);
375 return false; 584 return false;
376 }).catchError((error, stackTrace) { 585 }).catchError((error, stackTrace) {
377 // If the transform became dirty while processing, ignore any errors from 586 // If the transform became dirty while processing, ignore any errors from
378 // it. 587 // it.
379 if (_state == _State.NEEDS_APPLY || _isRemoved) return false; 588 if (_state == _State.NEEDS_APPLY || _isRemoved) return false;
380 589
381 // Catch all transformer errors and pipe them to the results stream. This 590 // Catch all transformer errors and pipe them to the results stream. This
382 // is so a broken transformer doesn't take down the whole graph. 591 // is so a broken transformer doesn't take down the whole graph.
383 phase.cascade.reportError(_wrapException(error, stackTrace)); 592 phase.cascade.reportError(_wrapException(error, stackTrace));
384 return true; 593 return true;
385 }); 594 });
386 } 595 }
387 596
388 /// Handle the results of running [Transformer.apply]. 597 /// Handle the results of running [Transformer.apply].
389 /// 598 ///
390 /// [controller] should be the controller for the [AggegateTransform] passed 599 /// [controller] should be the controller for the [AggegateTransform] passed
391 /// to [AggregateTransformer.apply]. 600 /// to [AggregateTransformer.apply].
392 void _handleApplyResults(AggregateTransformController controller) { 601 void _handleApplyResults(AggregateTransformController controller) {
393 _consumePrimary = controller.consumedPrimaries.contains(primary.id); 602 _consumedPrimaries = controller.consumedPrimaries;
394 603
395 var newOutputs = controller.outputs; 604 var newOutputs = controller.outputs;
396 // Any ids that are for a different package are invalid. 605 // Any ids that are for a different package are invalid.
397 var invalidIds = newOutputs 606 var invalidIds = newOutputs
398 .map((asset) => asset.id) 607 .map((asset) => asset.id)
399 .where((id) => id.package != phase.cascade.package) 608 .where((id) => id.package != phase.cascade.package)
400 .toSet(); 609 .toSet();
401 for (var id in invalidIds) { 610 for (var id in invalidIds) {
402 newOutputs.removeId(id); 611 newOutputs.removeId(id);
403 // TODO(nweiz): report this as a warning rather than a failing error. 612 // TODO(nweiz): report this as a warning rather than a failing error.
404 phase.cascade.reportError(new InvalidOutputException(info, id)); 613 phase.cascade.reportError(new InvalidOutputException(info, id));
405 } 614 }
406 615
407 // Remove outputs that used to exist but don't anymore. 616 // Remove outputs that used to exist but don't anymore.
408 for (var id in _outputControllers.keys.toList()) { 617 for (var id in _outputControllers.keys.toList()) {
409 if (newOutputs.containsId(id)) continue; 618 if (newOutputs.containsId(id)) continue;
410 _outputControllers.remove(id).setRemoved(); 619 _outputControllers.remove(id).setRemoved();
411 } 620 }
412 621
413 // Emit or stop emitting the pass-through asset between removing and 622 // Emit or stop emitting pass-through assets between removing and adding
414 // adding outputs to ensure there are no collisions. 623 // outputs to ensure there are no collisions.
415 if (!_consumePrimary && !newOutputs.containsId(primary.id)) { 624 for (var id in _primaries.map((node) => node.id)) {
416 _emitPassThrough(); 625 if (_consumedPrimaries.contains(id) || newOutputs.containsId(id)) {
417 } else { 626 _consumePrimary(id);
418 _dontEmitPassThrough(); 627 } else {
628 _passThrough(id);
629 }
419 } 630 }
420 631
421 // Store any new outputs or new contents for existing outputs. 632 // Store any new outputs or new contents for existing outputs.
422 for (var asset in newOutputs) { 633 for (var asset in newOutputs) {
423 var controller = _outputControllers[asset.id]; 634 var controller = _outputControllers[asset.id];
424 if (controller != null) { 635 if (controller != null) {
425 controller.setAvailable(asset); 636 controller.setAvailable(asset);
426 } else { 637 } else {
427 var controller = new AssetNodeController.available(asset, this); 638 var controller = new AssetNodeController.available(asset, this);
428 _outputControllers[asset.id] = controller; 639 _outputControllers[asset.id] = controller;
429 _streams.onAssetController.add(controller.node); 640 _streams.onAssetController.add(controller.node);
430 } 641 }
431 } 642 }
432 } 643 }
433 644
434 /// Cancels all subscriptions to secondary input nodes. 645 /// Cancels all subscriptions to secondary input nodes.
435 void _clearInputSubscriptions() { 646 void _clearSecondarySubscriptions() {
436 _missingInputs.clear(); 647 _missingInputs.clear();
437 for (var subscription in _inputSubscriptions.values) { 648 for (var subscription in _secondarySubscriptions.values) {
438 subscription.cancel(); 649 subscription.cancel();
439 } 650 }
440 _inputSubscriptions.clear(); 651 _secondarySubscriptions.clear();
441 } 652 }
442 653
443 /// Removes all output assets. 654 /// Removes all output assets.
444 void _clearOutputs() { 655 void _clearOutputs() {
445 // Remove all the previously-emitted assets. 656 // Remove all the previously-emitted assets.
446 for (var controller in _outputControllers.values) { 657 for (var controller in _outputControllers.values) {
447 controller.setRemoved(); 658 controller.setRemoved();
448 } 659 }
449 _outputControllers.clear(); 660 _outputControllers.clear();
450 } 661 }
451 662
452 /// Emit the pass-through asset if it's not being emitted already. 663 /// Emit the pass-through node for the primary input [id] if it's not being
453 void _emitPassThrough() { 664 /// emitted already.
454 assert(!_outputControllers.containsKey(primary.id)); 665 void _passThrough(AssetId id) {
666 assert(!_outputControllers.containsKey(id));
455 667
456 if (_consumePrimary) return; 668 if (_consumedPrimaries.contains(id)) return;
457 if (_passThroughController == null) { 669 var controller = _passThroughControllers[id];
458 _passThroughController = new AssetNodeController.from(primary); 670 var primary = _primaries[id];
459 _streams.onAssetController.add(_passThroughController.node); 671 if (controller == null) {
672 controller = new AssetNodeController.from(primary);
673 _passThroughControllers[id] = controller;
674 _streams.onAssetController.add(controller.node);
460 } else if (primary.state.isDirty) { 675 } else if (primary.state.isDirty) {
461 _passThroughController.setDirty(); 676 controller.setDirty();
462 } else if (!_passThroughController.node.state.isAvailable) { 677 } else if (!controller.node.state.isAvailable) {
463 _passThroughController.setAvailable(primary.asset); 678 controller.setAvailable(primary.asset);
464 } 679 }
465 } 680 }
466 681
467 /// Stop emitting the pass-through asset if it's being emitted already. 682 /// Stops emitting the pass-through node for the primary input [id] if it's
468 void _dontEmitPassThrough() { 683 /// being emitted.
469 if (_passThroughController == null) return; 684 void _consumePrimary(AssetId id) {
470 _passThroughController.setRemoved(); 685 var controller = _passThroughControllers.remove(id);
471 _passThroughController = null; 686 if (controller == null) return;
687 controller.setRemoved();
688 }
689
690 /// If `declareOutputs` is running and all previous phases have declared their
691 /// outputs, mark [_declareController] as done.
692 void _maybeFinishDeclareController() {
693 if (_declareController == null) return;
694 if (phase.previous.status == NodeStatus.RUNNING) return;
695 _declareController.done();
696 }
697
698 /// If `apply` is running, all previous phases have declared their outputs,
699 /// and all primary inputs are available and thus have been passed to the
700 /// transformer, mark [_applyController] as done.
701 void _maybeFinishApplyController() {
702 if (_applyController == null) return;
703 if (_primaries.any((input) => !input.state.isAvailable)) return;
704 if (phase.previous.status == NodeStatus.RUNNING) return;
705 _applyController.done();
472 } 706 }
473 707
474 BarbackException _wrapException(error, StackTrace stackTrace) { 708 BarbackException _wrapException(error, StackTrace stackTrace) {
475 if (error is! AssetNotFoundException) { 709 if (error is! AssetNotFoundException) {
476 return new TransformerException(info, error, stackTrace); 710 return new TransformerException(info, error, stackTrace);
477 } else { 711 } else {
478 return new MissingInputException(info, error.id); 712 return new MissingInputException(info, error.id);
479 } 713 }
480 } 714 }
481 715
482 /// Emit a warning about the transformer on [id].
483 void _warn(String message) {
484 _streams.onLogController.add(
485 new LogEntry(info, primary.id, LogLevel.WARNING, message, null));
486 }
487
488 String toString() => 716 String toString() =>
489 "transform node in $_location for $transformer on $primary ($_state, " 717 "transform node in $_location for $transformer on ${info.primaryId} "
490 "$status, ${_forced ? '' : 'un'}forced)"; 718 "($_state, $status, ${_forced ? '' : 'un'}forced)";
491 } 719 }
492 720
493 /// The enum of states that [TransformNode] can be in. 721 /// The enum of states that [TransformNode] can be in.
494 class _State { 722 class _State {
495 /// The transform is running [DeclaringTransformer.declareOutputs]. 723 /// The transform is running [DeclaringAggregateTransformer.declareOutputs].
496 /// 724 ///
497 /// This is the initial state of the transformer, and it will only occur once 725 /// If the set of primary inputs changes while in this state, it will
498 /// since [DeclaringTransformer.declareOutputs] is independent of the contents 726 /// transition to [NEEDS_DECLARE]. If the [TransformNode] is still in this
499 /// of the primary input. Once the method finishes running, this will 727 /// state when `declareOutputs` finishes running, it will transition to
500 /// transition to [APPLYING] if the transform is non-lazy and the input is 728 /// [APPLYING] if the transform is non-lazy and all of its primary inputs are
501 /// available, and [DECLARED] otherwise. 729 /// available, and [DECLARED] otherwise.
502 /// 730 ///
503 /// Non-declaring transformers will transition out of this state and into 731 /// Non-declaring transformers will transition out of this state and into
504 /// [APPLYING] immediately. 732 /// [APPLYING] immediately.
505 static final DECLARING = const _State._("declaring outputs"); 733 static const DECLARING = const _State._("declaring outputs");
734
735 /// The transform is running [AggregateTransformer.declareOutputs] or
736 /// [AggregateTransform.apply], but a primary input was added or removed after
737 /// it started, so it will need to re-run `declareOutputs`.
738 ///
739 /// The [TransformNode] will transition to [DECLARING] once `declareOutputs`
740 /// or `apply` finishes running.
741 static const NEEDS_DECLARE = const _State._("needs declare");
506 742
507 /// The transform is deferred and has run 743 /// The transform is deferred and has run
508 /// [DeclaringTransformer.declareOutputs] but hasn't yet been forced. 744 /// [DeclaringAggregateTransformer.declareOutputs] but hasn't yet been forced.
509 /// 745 ///
510 /// This will transition to [APPLYING] when one of the outputs has been 746 /// The [TransformNode] will transition to [APPLYING] when one of the outputs
511 /// forced. 747 /// has been forced or if the transformer is non-lazy and all of its primary
512 static final DECLARED = const _State._("declared"); 748 /// inputs become available.
749 static const DECLARED = const _State._("declared");
513 750
514 /// The transform is running [Transformer.apply]. 751 /// The transform is running [AggregateTransformer.apply].
515 /// 752 ///
516 /// If an input changes while in this state, it will transition to 753 /// If an input's contents change or a secondary input is added or removed
517 /// [NEEDS_APPLY]. If the [TransformNode] is still in this state when 754 /// while in this state, the [TransformNode] will transition to [NEEDS_APPLY].
518 /// [Transformer.apply] finishes running, it will transition to [APPLIED]. 755 /// If a primary input is added or removed, it will transition to
519 static final APPLYING = const _State._("applying"); 756 /// [NEEDS_DECLARE]. If it's still in this state when `apply` finishes
757 /// running, it will transition to [APPLIED].
758 static const APPLYING = const _State._("applying");
520 759
521 /// The transform is running [Transformer.apply], but an input changed after 760 /// The transform is running [AggregateTransformer.apply], but an input's
522 /// it started, so it will need to re-run [Transformer.apply]. 761 /// contents changed or a secondary input was added or removed after it
762 /// started, so it will need to re-run `apply`.
523 /// 763 ///
524 /// This will transition to [APPLYING] once [Transformer.apply] finishes 764 /// If a primary input is added or removed while in this state, the
525 /// running. 765 /// [TranformNode] will transition to [NEEDS_DECLARE]. If it's still in this
526 static final NEEDS_APPLY = const _State._("needs apply"); 766 /// state when `apply` finishes running, it will transition to [APPLYING].
767 static const NEEDS_APPLY = const _State._("needs apply");
527 768
528 /// The transform has finished running [Transformer.apply], whether or not it 769 /// The transform has finished running [AggregateTransformer.apply], whether
529 /// emitted an error. 770 /// or not it emitted an error.
530 /// 771 ///
531 /// If the transformer is deferred, the [TransformNode] can also be in this 772 /// If an input's contents change or a secondary input is added or removed,
532 /// state when [Transformer.declareOutputs] has been run but 773 /// the [TransformNode] will transition to [DECLARED] if the transform is
533 /// [Transformer.apply] has not. 774 /// declaring and [APPLYING] otherwise. If a primary input is added or
534 /// 775 /// removed, this will transition to [DECLARING].
535 /// If an input changes, this will transition to [DECLARED] if the transform 776 static const APPLIED = const _State._("applied");
536 /// is deferred and [APPLYING] otherwise.
537 static final APPLIED = const _State._("applied");
538 777
539 final String name; 778 final String name;
540 779
541 const _State._(this.name); 780 const _State._(this.name);
542 781
543 String toString() => name; 782 String toString() => name;
544 } 783 }
OLDNEW
« no previous file with comments | « pkg/barback/lib/src/errors.dart ('k') | pkg/barback/lib/src/graph/transformer_classifier.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698