Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(16)

Side by Side Diff: pkg/barback/lib/src/graph/phase.dart

Issue 808713003: Remove barback from the repo. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library barback.graph.phase;
6
7 import 'dart:async';
8
9 import '../asset/asset_id.dart';
10 import '../asset/asset_node.dart';
11 import '../asset/asset_node_set.dart';
12 import '../errors.dart';
13 import '../log.dart';
14 import '../transformer/aggregate_transformer.dart';
15 import '../transformer/transformer.dart';
16 import '../transformer/transformer_group.dart';
17 import '../utils.dart';
18 import '../utils/multiset.dart';
19 import 'asset_cascade.dart';
20 import 'group_runner.dart';
21 import 'node_status.dart';
22 import 'node_streams.dart';
23 import 'phase_forwarder.dart';
24 import 'phase_output.dart';
25 import 'transformer_classifier.dart';
26
27 /// One phase in the ordered series of transformations in an [AssetCascade].
28 ///
29 /// Each phase can access outputs from previous phases and can in turn pass
30 /// outputs to later phases. Phases are processed strictly serially. All
31 /// transforms in a phase will be complete before moving on to the next phase.
32 /// Within a single phase, all transforms will be run in parallel.
33 ///
34 /// Building can be interrupted between phases. For example, a source is added
35 /// which starts the background process. Sometime during, say, phase 2 (which
36 /// is running asynchronously) that source is modified. When the process queue
37 /// goes to advance to phase 3, it will see that modification and start the
38 /// waterfall from the beginning again.
39 class Phase {
40 /// The cascade that owns this phase.
41 final AssetCascade cascade;
42
43 /// A string describing the location of [this] in the transformer graph.
44 final String _location;
45
46 /// The index of [this] in its parent cascade or group.
47 final int _index;
48
49 /// The groups for this phase.
50 final _groups = new Map<TransformerGroup, GroupRunner>();
51
52 /// The inputs for this phase.
53 ///
54 /// For the first phase, these will be the source assets. For all other
55 /// phases, they will be the outputs from the previous phase.
56 final _inputs = new AssetNodeSet();
57
58 /// The transformer classifiers for this phase.
59 ///
60 /// The keys can be either [Transformer]s or [AggregateTransformer]s.
61 final _classifiers = new Map<dynamic, TransformerClassifier>();
62
63 /// The forwarders for this phase.
64 final _forwarders = new Map<AssetId, PhaseForwarder>();
65
66 /// The outputs for this phase.
67 final _outputs = new Map<AssetId, PhaseOutput>();
68
69 /// The set of all [AssetNode.origin] properties of the input assets for this
70 /// phase.
71 ///
72 /// This is used to determine which assets have been passed unmodified through
73 /// [_classifiers] or [_groups]. It's possible that a given asset was consumed
74 /// by a group and not an individual transformer, and so shouldn't be
75 /// forwarded through the phase as a whole.
76 ///
77 /// In order to detect whether an output has been forwarded through a group or
78 /// a classifier, we must be able to distinguish it from other outputs with
79 /// the same id. To do so, we check if its origin is in [_inputOrigins]. If
80 /// so, it's been forwarded unmodified.
81 final _inputOrigins = new Multiset<AssetNode>();
82
83 /// The streams exposed by this phase.
84 final _streams = new NodeStreams();
85 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange;
86 Stream<AssetNode> get onAsset => _streams.onAsset;
87 Stream<LogEntry> get onLog => _streams.onLog;
88
89 /// How far along [this] is in processing its assets.
90 NodeStatus get status {
91 // Before any transformers are added, the phase should be dirty if and only
92 // if any input is dirty.
93 if (_classifiers.isEmpty && _groups.isEmpty && previous == null) {
94 return _inputs.any((input) => input.state.isDirty) ?
95 NodeStatus.RUNNING : NodeStatus.IDLE;
96 }
97
98 var classifierStatus = NodeStatus.dirtiest(
99 _classifiers.values.map((classifier) => classifier.status));
100 var groupStatus = NodeStatus.dirtiest(
101 _groups.values.map((group) => group.status));
102 return (previous == null ? NodeStatus.IDLE : previous.status)
103 .dirtier(classifierStatus)
104 .dirtier(groupStatus);
105 }
106
107 /// The previous phase in the cascade, or null if this is the first phase.
108 final Phase previous;
109
110 /// The subscription to [previous]'s [onStatusChange] stream.
111 StreamSubscription _previousStatusSubscription;
112
113 /// The subscription to [previous]'s [onAsset] stream.
114 StreamSubscription<AssetNode> _previousOnAssetSubscription;
115
116 final _inputSubscriptions = new Set<StreamSubscription>();
117
118 /// A map of asset ids to completers for [getInput] requests.
119 ///
120 /// If an asset node is requested before it's available, we put a completer in
121 /// this map to wait for the asset to be generated. If it's not generated, the
122 /// completer should complete to `null`.
123 final _pendingOutputRequests = new Map<AssetId, Completer<AssetNode>>();
124
125 /// Returns all currently-available output assets for this phase.
126 Set<AssetNode> get availableOutputs {
127 return _outputs.values
128 .map((output) => output.output)
129 .where((node) => node.state.isAvailable)
130 .toSet();
131 }
132
133 // TODO(nweiz): Rather than passing the cascade and the phase everywhere,
134 // create an interface that just exposes [getInput]. Emit errors via
135 // [AssetNode]s.
136 Phase(AssetCascade cascade, String location)
137 : this._(cascade, location, 0);
138
139 Phase._(this.cascade, this._location, this._index, [this.previous]) {
140 if (previous != null) {
141 _previousOnAssetSubscription = previous.onAsset.listen(addInput);
142 _previousStatusSubscription = previous.onStatusChange
143 .listen((_) => _streams.changeStatus(status));
144 }
145
146 onStatusChange.listen((status) {
147 if (status == NodeStatus.RUNNING) return;
148
149 // All the previous phases have finished declaring or producing their
150 // outputs. If anyone's still waiting for outputs, cut off the wait; we
151 // won't be generating them, at least until a source asset changes.
152 for (var completer in _pendingOutputRequests.values) {
153 completer.complete(null);
154 }
155 _pendingOutputRequests.clear();
156 });
157 }
158
159 /// Adds a new asset as an input for this phase.
160 ///
161 /// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase
162 /// will automatically begin determining which transforms can consume it as a
163 /// primary input. The transforms themselves won't be applied until [process]
164 /// is called, however.
165 ///
166 /// This should only be used for brand-new assets or assets that have been
167 /// removed and re-created. The phase will automatically handle updated assets
168 /// using the [AssetNode.onStateChange] stream.
169 void addInput(AssetNode node) {
170 // Each group is one channel along which an asset may be forwarded, as is
171 // each transformer.
172 var forwarder = new PhaseForwarder(
173 node, _classifiers.length, _groups.length);
174 _forwarders[node.id] = forwarder;
175 forwarder.onAsset.listen(_handleOutputWithoutForwarder);
176 if (forwarder.output != null) {
177 _handleOutputWithoutForwarder(forwarder.output);
178 }
179
180 _inputOrigins.add(node.origin);
181 _inputs.add(node);
182 _inputSubscriptions.add(node.onStateChange.listen((state) {
183 if (state.isRemoved) {
184 _inputOrigins.remove(node.origin);
185 _forwarders.remove(node.id).remove();
186 }
187 _streams.changeStatus(status);
188 }));
189
190 for (var classifier in _classifiers.values) {
191 classifier.addInput(node);
192 }
193 }
194
195 // TODO(nweiz): If the output is available when this is called, it's
196 // theoretically possible for it to become unavailable between the call and
197 // the return. If it does so, it won't trigger the rebuilding process. To
198 // avoid this, we should have this and the methods it calls take explicit
199 // callbacks, as in [AssetNode.whenAvailable].
200 /// Gets the asset node for an output [id].
201 ///
202 /// If [id] is for a generated or transformed asset, this will wait until it
203 /// has been created and return it. This means that the returned asset will
204 /// always be [AssetState.AVAILABLE].
205 ///
206 /// If the output cannot be found, returns null.
207 Future<AssetNode> getOutput(AssetId id) {
208 return syncFuture(() {
209 if (id.package != cascade.package) return cascade.graph.getAssetNode(id);
210 if (_outputs.containsKey(id)) {
211 var output = _outputs[id].output;
212 // If the requested output is available, we can just return it.
213 if (output.state.isAvailable) return output;
214
215 // If the requested output exists but isn't yet available, wait to see
216 // if it becomes available. If it's removed before becoming available,
217 // try again, since it could be generated again.
218 output.force();
219 return output.whenAvailable((_) {
220 return output;
221 }).catchError((error) {
222 if (error is! AssetNotFoundException) throw error;
223 return getOutput(id);
224 });
225 }
226
227 // If this phase and the previous phases are fully declared or done, the
228 // requested output won't be generated and we can safely return null.
229 if (status != NodeStatus.RUNNING) return null;
230
231 // Otherwise, store a completer for the asset node. If it's generated in
232 // the future, we'll complete this completer.
233 var completer = _pendingOutputRequests.putIfAbsent(id,
234 () => new Completer.sync());
235 return completer.future;
236 });
237 }
238
239 /// Set this phase's transformers to [transformers].
240 void updateTransformers(Iterable transformers) {
241 var newTransformers = transformers
242 .where((op) => op is Transformer || op is AggregateTransformer)
243 .toSet();
244 var oldTransformers = _classifiers.keys.toSet();
245 for (var removed in oldTransformers.difference(newTransformers)) {
246 _classifiers.remove(removed).remove();
247 }
248
249 for (var transformer in newTransformers.difference(oldTransformers)) {
250 var classifier = new TransformerClassifier(
251 this, transformer, "$_location.$_index");
252 _classifiers[transformer] = classifier;
253 classifier.onAsset.listen(_handleOutput);
254 _streams.onLogPool.add(classifier.onLog);
255 classifier.onStatusChange.listen((_) => _streams.changeStatus(status));
256 for (var input in _inputs) {
257 classifier.addInput(input);
258 }
259 }
260
261 var newGroups = transformers.where((op) => op is TransformerGroup)
262 .toSet();
263 var oldGroups = _groups.keys.toSet();
264 for (var removed in oldGroups.difference(newGroups)) {
265 _groups.remove(removed).remove();
266 }
267
268 for (var added in newGroups.difference(oldGroups)) {
269 var runner = new GroupRunner(previous, added, "$_location.$_index");
270 _groups[added] = runner;
271 runner.onAsset.listen(_handleOutput);
272 _streams.onLogPool.add(runner.onLog);
273 runner.onStatusChange.listen((_) => _streams.changeStatus(status));
274 }
275
276 for (var forwarder in _forwarders.values) {
277 forwarder.updateTransformers(_classifiers.length, _groups.length);
278 }
279
280 _streams.changeStatus(status);
281 }
282
283 /// Force all [LazyTransformer]s' transforms in this phase to begin producing
284 /// concrete assets.
285 void forceAllTransforms() {
286 for (var classifier in _classifiers.values) {
287 classifier.forceAllTransforms();
288 }
289
290 for (var group in _groups.values) {
291 group.forceAllTransforms();
292 }
293 }
294
295 /// Add a new phase after this one.
296 ///
297 /// The new phase will have a location annotation describing its place in the
298 /// package graph. By default, this annotation will describe it as being
299 /// directly after [this]. If [location] is passed, though, it's described as
300 /// being the first phase in that location.
301 Phase addPhase([String location]) {
302 var index = 0;
303 if (location == null) {
304 location = _location;
305 index = _index + 1;
306 }
307
308 var next = new Phase._(cascade, location, index, this);
309 for (var output in _outputs.values.toList()) {
310 // Remove [output]'s listeners because now they should get the asset from
311 // [next], rather than this phase. Any transforms consuming [output] will
312 // be re-run and will consume the output from the new final phase.
313 output.removeListeners();
314 }
315 return next;
316 }
317
318 /// Mark this phase as removed.
319 ///
320 /// This will remove all the phase's outputs.
321 void remove() {
322 for (var classifier in _classifiers.values.toList()) {
323 classifier.remove();
324 }
325 for (var group in _groups.values) {
326 group.remove();
327 }
328 _streams.close();
329 for (var subscription in _inputSubscriptions) {
330 subscription.cancel();
331 }
332 if (_previousStatusSubscription != null) {
333 _previousStatusSubscription.cancel();
334 }
335 if (_previousOnAssetSubscription != null) {
336 _previousOnAssetSubscription.cancel();
337 }
338 }
339
340 /// Add [asset] as an output of this phase.
341 void _handleOutput(AssetNode asset) {
342 if (_inputOrigins.contains(asset.origin)) {
343 _forwarders[asset.id].addIntermediateAsset(asset);
344 } else {
345 _handleOutputWithoutForwarder(asset);
346 }
347 }
348
349 /// Add [asset] as an output of this phase without checking if it's a
350 /// forwarded asset.
351 void _handleOutputWithoutForwarder(AssetNode asset) {
352 if (_outputs.containsKey(asset.id)) {
353 _outputs[asset.id].add(asset);
354 } else {
355 _outputs[asset.id] = new PhaseOutput(this, asset, "$_location.$_index");
356 _outputs[asset.id].onAsset.listen(_emit,
357 onDone: () => _outputs.remove(asset.id));
358 _emit(_outputs[asset.id].output);
359 }
360
361 var exception = _outputs[asset.id].collisionException;
362 if (exception != null) cascade.reportError(exception);
363 }
364
365 /// Emit [asset] as an output of this phase.
366 ///
367 /// This should be called after [_handleOutput], so that collisions are
368 /// resolved.
369 void _emit(AssetNode asset) {
370 _streams.onAssetController.add(asset);
371 _providePendingAsset(asset);
372 }
373
374 /// Provide an asset to a pending [getOutput] call.
375 void _providePendingAsset(AssetNode asset) {
376 // If anyone's waiting for this asset, provide it to them.
377 var request = _pendingOutputRequests.remove(asset.id);
378 if (request == null) return;
379
380 if (asset.state.isAvailable) {
381 request.complete(asset);
382 return;
383 }
384
385 // A lazy asset may be emitted while still dirty. If so, we wait until it's
386 // either available or removed before trying again to access it.
387 assert(asset.state.isDirty);
388 asset.force();
389 asset.whenStateChanges().then((state) {
390 if (state.isRemoved) return getOutput(asset.id);
391 return asset;
392 }).then(request.complete).catchError(request.completeError);
393 }
394
395 String toString() => "phase $_location.$_index";
396 }
OLDNEW
« no previous file with comments | « pkg/barback/lib/src/graph/package_graph.dart ('k') | pkg/barback/lib/src/graph/phase_forwarder.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698