Chromium Code Reviews| Index: pkg/watcher/lib/src/utils.dart |
| diff --git a/pkg/watcher/lib/src/utils.dart b/pkg/watcher/lib/src/utils.dart |
| index 3d00c084314ad4cfcfbb0b290f795ef56d3d3769..45a043eadcc44238491678b135adc624b5393867 100644 |
| --- a/pkg/watcher/lib/src/utils.dart |
| +++ b/pkg/watcher/lib/src/utils.dart |
| @@ -4,7 +4,9 @@ |
| library watcher.utils; |
| +import 'dart:async'; |
| import 'dart:io'; |
| +import 'dart:collection'; |
| /// Returns `true` if [error] is a [FileSystemException] for a missing |
| /// directory. |
| @@ -15,3 +17,50 @@ bool isDirectoryNotFoundException(error) { |
| var notFoundCode = Platform.operatingSystem == "windows" ? 3 : 2; |
| return error.osError.errorCode == notFoundCode; |
| } |
| + |
| +/// Returns a buffered stream that will emit the same values as the stream |
| +/// returned by [future] once [future] completes. |
| +/// |
| +/// If [future] completes to an error, the return value will emit that error and |
| +/// then close. |
| +Stream futureStream(Future<Stream> future) { |
| + var controller = new StreamController(sync: true); |
| + future.then((stream) { |
| + stream.listen( |
| + controller.add, |
| + onError: controller.addError, |
| + onDone: controller.close); |
| + }).catchError((e, stackTrace) { |
| + controller.addError(e, stackTrace); |
| + controller.close(); |
| + }); |
| + return controller.stream; |
| +} |
|
Bob Nystrom
2013/11/06 19:24:04
We have an increasingly large set of copy/pasted f
nweiz
2013/11/07 00:46:37
I'd love to, but there are a bunch of hurdles we h
Bob Nystrom
2013/11/07 18:16:05
Some of those hurdles have been jumped already (fo
|
| + |
| +/// Like [new Future], but avoids around issue 11911 by using [new Future.value] |
| +/// under the covers. |
| +Future newFuture(callback()) => new Future.value().then((_) => callback()); |
| + |
| +/// A stream transformer that batches all events sent in adjacent event loops. |
|
Bob Nystrom
2013/11/06 19:24:04
I don't know what "adjacent event loops" means. Do
nweiz
2013/11/07 00:46:37
Done.
|
| +class BatchedStreamTransformer<T> implements StreamTransformer<T, List<T>> { |
| + Stream<List<T>> bind(Stream<T> input) { |
| + var batch = new Queue(); |
| + return new StreamTransformer<T, List<T>>.fromHandlers( |
| + handleData: (event, sink) { |
| + batch.add(event); |
| + var batchLength = batch.length; |
| + |
| + Timer.run(() { |
|
Bob Nystrom
2013/11/06 19:24:04
Document what's going on here.
nweiz
2013/11/07 00:46:37
Done.
|
| + if (batch.length != batchLength) return; |
| + sink.add(batch.toList()); |
| + batch.clear(); |
| + }); |
| + }, handleDone: (sink) { |
| + if (batch.isNotEmpty) { |
| + sink.add(batch.toList()); |
| + batch.clear(); |
| + } |
| + sink.close(); |
| + }).bind(input); |
| + } |
| +} |