OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library watcher.utils; |
| 6 |
| 7 import 'dart:async'; |
| 8 import 'dart:io'; |
| 9 import 'dart:collection'; |
| 10 |
| 11 /// Returns `true` if [error] is a [FileSystemException] for a missing |
| 12 /// directory. |
| 13 bool isDirectoryNotFoundException(error) { |
| 14 if (error is! FileSystemException) return false; |
| 15 |
| 16 // See dartbug.com/12461 and tests/standalone/io/directory_error_test.dart. |
| 17 var notFoundCode = Platform.operatingSystem == "windows" ? 3 : 2; |
| 18 return error.osError.errorCode == notFoundCode; |
| 19 } |
| 20 |
| 21 /// Returns the union of all elements in each set in [sets]. |
| 22 Set unionAll(Iterable<Set> sets) => |
| 23 sets.fold(new Set(), (union, set) => union.union(set)); |
| 24 |
| 25 /// Returns a buffered stream that will emit the same values as the stream |
| 26 /// returned by [future] once [future] completes. |
| 27 /// |
| 28 /// If [future] completes to an error, the return value will emit that error and |
| 29 /// then close. |
| 30 /// |
| 31 /// If [broadcast] is true, a broadcast stream is returned. This assumes that |
| 32 /// the stream returned by [future] will be a broadcast stream as well. |
| 33 /// [broadcast] defaults to false. |
| 34 Stream futureStream(Future<Stream> future, {bool broadcast: false}) { |
| 35 var subscription; |
| 36 var controller; |
| 37 |
| 38 future = future.catchError((e, stackTrace) { |
| 39 // Since [controller] is synchronous, it's likely that emitting an error |
| 40 // will cause it to be cancelled before we call close. |
| 41 if (controller != null) controller.addError(e, stackTrace); |
| 42 if (controller != null) controller.close(); |
| 43 controller = null; |
| 44 }); |
| 45 |
| 46 onListen() { |
| 47 future.then((stream) { |
| 48 if (controller == null) return; |
| 49 subscription = stream.listen( |
| 50 controller.add, |
| 51 onError: controller.addError, |
| 52 onDone: controller.close); |
| 53 }); |
| 54 } |
| 55 |
| 56 onCancel() { |
| 57 if (subscription != null) subscription.cancel(); |
| 58 subscription = null; |
| 59 controller = null; |
| 60 } |
| 61 |
| 62 if (broadcast) { |
| 63 controller = new StreamController.broadcast( |
| 64 sync: true, onListen: onListen, onCancel: onCancel); |
| 65 } else { |
| 66 controller = new StreamController( |
| 67 sync: true, onListen: onListen, onCancel: onCancel); |
| 68 } |
| 69 return controller.stream; |
| 70 } |
| 71 |
| 72 /// Like [new Future], but avoids around issue 11911 by using [new Future.value] |
| 73 /// under the covers. |
| 74 Future newFuture(callback()) => new Future.value().then((_) => callback()); |
| 75 |
| 76 /// Returns a [Future] that completes after pumping the event queue [times] |
| 77 /// times. By default, this should pump the event queue enough times to allow |
| 78 /// any code to run, as long as it's not waiting on some external event. |
| 79 Future pumpEventQueue([int times = 20]) { |
| 80 if (times == 0) return new Future.value(); |
| 81 // We use a delayed future to allow microtask events to finish. The |
| 82 // Future.value or Future() constructors use scheduleMicrotask themselves and |
| 83 // would therefore not wait for microtask callbacks that are scheduled after |
| 84 // invoking this method. |
| 85 return new Future.delayed(Duration.ZERO, () => pumpEventQueue(times - 1)); |
| 86 } |
| 87 |
| 88 /// A stream transformer that batches all events that are sent at the same time. |
| 89 /// |
| 90 /// When multiple events are synchronously added to a stream controller, the |
| 91 /// [StreamController] implementation uses [scheduleMicrotask] to schedule the |
| 92 /// asynchronous firing of each event. In order to recreate the synchronous |
| 93 /// batches, this collates all the events that are received in "nearby" |
| 94 /// microtasks. |
| 95 class BatchedStreamTransformer<T> implements StreamTransformer<T, List<T>> { |
| 96 Stream<List<T>> bind(Stream<T> input) { |
| 97 var batch = new Queue(); |
| 98 return new StreamTransformer<T, List<T>>.fromHandlers( |
| 99 handleData: (event, sink) { |
| 100 batch.add(event); |
| 101 |
| 102 // [Timer.run] schedules an event that runs after any microtasks that have |
| 103 // been scheduled. |
| 104 Timer.run(() { |
| 105 if (batch.isEmpty) return; |
| 106 sink.add(batch.toList()); |
| 107 batch.clear(); |
| 108 }); |
| 109 }, handleDone: (sink) { |
| 110 if (batch.isNotEmpty) { |
| 111 sink.add(batch.toList()); |
| 112 batch.clear(); |
| 113 } |
| 114 sink.close(); |
| 115 }).bind(input); |
| 116 } |
| 117 } |
OLD | NEW |