OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library watcher.utils; | |
6 | |
7 import 'dart:async'; | |
8 import 'dart:io'; | |
9 import 'dart:collection'; | |
10 | |
11 /// Returns `true` if [error] is a [FileSystemException] for a missing | |
12 /// directory. | |
13 bool isDirectoryNotFoundException(error) { | |
14 if (error is! FileSystemException) return false; | |
15 | |
16 // See dartbug.com/12461 and tests/standalone/io/directory_error_test.dart. | |
17 var notFoundCode = Platform.operatingSystem == "windows" ? 3 : 2; | |
18 return error.osError.errorCode == notFoundCode; | |
19 } | |
20 | |
21 /// Returns the union of all elements in each set in [sets]. | |
22 Set unionAll(Iterable<Set> sets) => | |
23 sets.fold(new Set(), (union, set) => union.union(set)); | |
24 | |
25 /// Returns a buffered stream that will emit the same values as the stream | |
26 /// returned by [future] once [future] completes. | |
27 /// | |
28 /// If [future] completes to an error, the return value will emit that error and | |
29 /// then close. | |
30 /// | |
31 /// If [broadcast] is true, a broadcast stream is returned. This assumes that | |
32 /// the stream returned by [future] will be a broadcast stream as well. | |
33 /// [broadcast] defaults to false. | |
34 Stream futureStream(Future<Stream> future, {bool broadcast: false}) { | |
35 var subscription; | |
36 var controller; | |
37 | |
38 future = future.catchError((e, stackTrace) { | |
39 // Since [controller] is synchronous, it's likely that emitting an error | |
40 // will cause it to be cancelled before we call close. | |
41 if (controller != null) controller.addError(e, stackTrace); | |
42 if (controller != null) controller.close(); | |
43 controller = null; | |
44 }); | |
45 | |
46 onListen() { | |
47 future.then((stream) { | |
48 if (controller == null) return; | |
49 subscription = stream.listen( | |
50 controller.add, | |
51 onError: controller.addError, | |
52 onDone: controller.close); | |
53 }); | |
54 } | |
55 | |
56 onCancel() { | |
57 if (subscription != null) subscription.cancel(); | |
58 subscription = null; | |
59 controller = null; | |
60 } | |
61 | |
62 if (broadcast) { | |
63 controller = new StreamController.broadcast( | |
64 sync: true, onListen: onListen, onCancel: onCancel); | |
65 } else { | |
66 controller = new StreamController( | |
67 sync: true, onListen: onListen, onCancel: onCancel); | |
68 } | |
69 return controller.stream; | |
70 } | |
71 | |
72 /// Like [new Future], but avoids around issue 11911 by using [new Future.value] | |
73 /// under the covers. | |
74 Future newFuture(callback()) => new Future.value().then((_) => callback()); | |
75 | |
76 /// Returns a [Future] that completes after pumping the event queue [times] | |
77 /// times. By default, this should pump the event queue enough times to allow | |
78 /// any code to run, as long as it's not waiting on some external event. | |
79 Future pumpEventQueue([int times = 20]) { | |
80 if (times == 0) return new Future.value(); | |
81 // We use a delayed future to allow microtask events to finish. The | |
82 // Future.value or Future() constructors use scheduleMicrotask themselves and | |
83 // would therefore not wait for microtask callbacks that are scheduled after | |
84 // invoking this method. | |
85 return new Future.delayed(Duration.ZERO, () => pumpEventQueue(times - 1)); | |
86 } | |
87 | |
88 /// A stream transformer that batches all events that are sent at the same time. | |
89 /// | |
90 /// When multiple events are synchronously added to a stream controller, the | |
91 /// [StreamController] implementation uses [scheduleMicrotask] to schedule the | |
92 /// asynchronous firing of each event. In order to recreate the synchronous | |
93 /// batches, this collates all the events that are received in "nearby" | |
94 /// microtasks. | |
95 class BatchedStreamTransformer<T> implements StreamTransformer<T, List<T>> { | |
96 Stream<List<T>> bind(Stream<T> input) { | |
97 var batch = new Queue(); | |
98 return new StreamTransformer<T, List<T>>.fromHandlers( | |
99 handleData: (event, sink) { | |
100 batch.add(event); | |
101 | |
102 // [Timer.run] schedules an event that runs after any microtasks that have | |
103 // been scheduled. | |
104 Timer.run(() { | |
105 if (batch.isEmpty) return; | |
106 sink.add(batch.toList()); | |
107 batch.clear(); | |
108 }); | |
109 }, handleDone: (sink) { | |
110 if (batch.isNotEmpty) { | |
111 sink.add(batch.toList()); | |
112 batch.clear(); | |
113 } | |
114 sink.close(); | |
115 }).bind(input); | |
116 } | |
117 } | |
OLD | NEW |