| Index: packages/async/lib/src/stream_zip.dart
|
| diff --git a/packages/async/lib/stream_zip.dart b/packages/async/lib/src/stream_zip.dart
|
| similarity index 54%
|
| copy from packages/async/lib/stream_zip.dart
|
| copy to packages/async/lib/src/stream_zip.dart
|
| index 055489dbd38cd5dd6eb9dc7015887ee5a8df1f12..3d5a8113bcdc56e9c904a1b6abaa950eeb82a5a7 100644
|
| --- a/packages/async/lib/stream_zip.dart
|
| +++ b/packages/async/lib/src/stream_zip.dart
|
| @@ -1,37 +1,36 @@
|
| -// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
|
| +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
|
| // for details. All rights reserved. Use of this source code is governed by a
|
| // BSD-style license that can be found in the LICENSE file.
|
|
|
| -/**
|
| - * Help for combining multiple streams into a single stream.
|
| - */
|
| -library dart.pkg.async.stream_zip;
|
| -
|
| import "dart:async";
|
|
|
| -/**
|
| - * A stream that combines the values of other streams.
|
| - */
|
| -class StreamZip extends Stream<List> {
|
| - final Iterable<Stream> _streams;
|
| - StreamZip(Iterable<Stream> streams) : _streams = streams;
|
| +/// A stream that combines the values of other streams.
|
| +///
|
| +/// This emits lists of collected values from each input stream. The first list
|
| +/// contains the first value emitted by each stream, the second contains the
|
| +/// second value, and so on. The lists have the same ordering as the iterable
|
| +/// passed to [new StreamZip].
|
| +///
|
| +/// Any errors from any of the streams are forwarded directly to this stream.
|
| +class StreamZip<T> extends Stream<List<T>> {
|
| + final Iterable<Stream<T>> _streams;
|
| +
|
| + StreamZip(Iterable<Stream<T>> streams) : _streams = streams;
|
|
|
| - StreamSubscription<List> listen(void onData(List data), {
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError}) {
|
| + StreamSubscription<List<T>> listen(void onData(List<T> data),
|
| + {Function onError, void onDone(), bool cancelOnError}) {
|
| cancelOnError = identical(true, cancelOnError);
|
| - List<StreamSubscription> subscriptions = <StreamSubscription>[];
|
| - StreamController controller;
|
| - List current;
|
| + var subscriptions = <StreamSubscription<T>>[];
|
| + StreamController<List<T>> controller;
|
| + List<T> current;
|
| int dataCount = 0;
|
|
|
| /// Called for each data from a subscription in [subscriptions].
|
| - void handleData(int index, data) {
|
| + void handleData(int index, T data) {
|
| current[index] = data;
|
| dataCount++;
|
| if (dataCount == subscriptions.length) {
|
| - List data = current;
|
| + var data = current;
|
| current = new List(subscriptions.length);
|
| dataCount = 0;
|
| for (int i = 0; i < subscriptions.length; i++) {
|
| @@ -69,10 +68,11 @@ class StreamZip extends Stream<List> {
|
| }
|
|
|
| try {
|
| - for (Stream stream in _streams) {
|
| + for (var stream in _streams) {
|
| int index = subscriptions.length;
|
| - subscriptions.add(stream.listen(
|
| - (data) { handleData(index, data); },
|
| + subscriptions.add(stream.listen((data) {
|
| + handleData(index, data);
|
| + },
|
| onError: cancelOnError ? handleError : handleErrorCancel,
|
| onDone: handleDone,
|
| cancelOnError: cancelOnError));
|
| @@ -86,34 +86,28 @@ class StreamZip extends Stream<List> {
|
|
|
| current = new List(subscriptions.length);
|
|
|
| - controller = new StreamController<List>(
|
| - onPause: () {
|
| - for (int i = 0; i < subscriptions.length; i++) {
|
| - // This may pause some subscriptions more than once.
|
| - // These will not be resumed by onResume below, but must wait for the
|
| - // next round.
|
| - subscriptions[i].pause();
|
| - }
|
| - },
|
| - onResume: () {
|
| - for (int i = 0; i < subscriptions.length; i++) {
|
| - subscriptions[i].resume();
|
| - }
|
| - },
|
| - onCancel: () {
|
| - for (int i = 0; i < subscriptions.length; i++) {
|
| - // Canceling more than once is safe.
|
| - subscriptions[i].cancel();
|
| - }
|
| + controller = new StreamController<List<T>>(onPause: () {
|
| + for (int i = 0; i < subscriptions.length; i++) {
|
| + // This may pause some subscriptions more than once.
|
| + // These will not be resumed by onResume below, but must wait for the
|
| + // next round.
|
| + subscriptions[i].pause();
|
| + }
|
| + }, onResume: () {
|
| + for (int i = 0; i < subscriptions.length; i++) {
|
| + subscriptions[i].resume();
|
| + }
|
| + }, onCancel: () {
|
| + for (int i = 0; i < subscriptions.length; i++) {
|
| + // Canceling more than once is safe.
|
| + subscriptions[i].cancel();
|
| }
|
| - );
|
| + });
|
|
|
| if (subscriptions.isEmpty) {
|
| controller.close();
|
| }
|
| return controller.stream.listen(onData,
|
| - onError: onError,
|
| - onDone: onDone,
|
| - cancelOnError: cancelOnError);
|
| + onError: onError, onDone: onDone, cancelOnError: cancelOnError);
|
| }
|
| }
|
|
|