| Index: pkg/watcher/lib/src/utils.dart
|
| diff --git a/pkg/watcher/lib/src/utils.dart b/pkg/watcher/lib/src/utils.dart
|
| index 3d00c084314ad4cfcfbb0b290f795ef56d3d3769..e4e4457944dc6296890bb202ede4726f55c66fdd 100644
|
| --- a/pkg/watcher/lib/src/utils.dart
|
| +++ b/pkg/watcher/lib/src/utils.dart
|
| @@ -4,7 +4,9 @@
|
|
|
| library watcher.utils;
|
|
|
| +import 'dart:async';
|
| import 'dart:io';
|
| +import 'dart:collection';
|
|
|
| /// Returns `true` if [error] is a [FileSystemException] for a missing
|
| /// directory.
|
| @@ -15,3 +17,57 @@ bool isDirectoryNotFoundException(error) {
|
| var notFoundCode = Platform.operatingSystem == "windows" ? 3 : 2;
|
| return error.osError.errorCode == notFoundCode;
|
| }
|
| +
|
| +/// Returns a buffered stream that will emit the same values as the stream
|
| +/// returned by [future] once [future] completes.
|
| +///
|
| +/// If [future] completes to an error, the return value will emit that error and
|
| +/// then close.
|
| +Stream futureStream(Future<Stream> future) {
|
| + var controller = new StreamController(sync: true);
|
| + future.then((stream) {
|
| + stream.listen(
|
| + controller.add,
|
| + onError: controller.addError,
|
| + onDone: controller.close);
|
| + }).catchError((e, stackTrace) {
|
| + controller.addError(e, stackTrace);
|
| + controller.close();
|
| + });
|
| + return controller.stream;
|
| +}
|
| +
|
| +/// Like [new Future], but avoids around issue 11911 by using [new Future.value]
|
| +/// under the covers.
|
| +Future newFuture(callback()) => new Future.value().then((_) => callback());
|
| +
|
| +/// A stream transformer that batches all events that are sent at the same time.
|
| +///
|
| +/// When multiple events are synchronously added to a stream controller, the
|
| +/// [StreamController] implementation uses [scheduleMicrotask] to schedule the
|
| +/// asynchronous firing of each event. In order to recreate the synchronous
|
| +/// batches, this collates all the events that are received in "nearby"
|
| +/// microtasks.
|
| +class BatchedStreamTransformer<T> implements StreamTransformer<T, List<T>> {
|
| + Stream<List<T>> bind(Stream<T> input) {
|
| + var batch = new Queue();
|
| + return new StreamTransformer<T, List<T>>.fromHandlers(
|
| + handleData: (event, sink) {
|
| + batch.add(event);
|
| +
|
| + // [Timer.run] schedules an event that runs after any microtasks that have
|
| + // been scheduled.
|
| + Timer.run(() {
|
| + if (batch.isEmpty) return;
|
| + sink.add(batch.toList());
|
| + batch.clear();
|
| + });
|
| + }, handleDone: (sink) {
|
| + if (batch.isNotEmpty) {
|
| + sink.add(batch.toList());
|
| + batch.clear();
|
| + }
|
| + sink.close();
|
| + }).bind(input);
|
| + }
|
| +}
|
|
|