Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(390)

Unified Diff: packages/async/lib/src/stream_queue.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « packages/async/lib/src/stream_group.dart ('k') | packages/async/lib/src/stream_sink_completer.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: packages/async/lib/src/stream_queue.dart
diff --git a/packages/async/lib/src/stream_queue.dart b/packages/async/lib/src/stream_queue.dart
index 09b3a75b2360a9adf2a18ad2715fe2fef08cc656..b9023ab91a9ff8279fd269e5b319fa2b0e20e6a4 100644
--- a/packages/async/lib/src/stream_queue.dart
+++ b/packages/async/lib/src/stream_queue.dart
@@ -2,14 +2,16 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-library async.stream_events;
-
import 'dart:async';
import 'dart:collection';
+import 'package:collection/collection.dart';
+
+import "cancelable_operation.dart";
+import "result.dart";
import "subscription_stream.dart";
import "stream_completer.dart";
-import "../result.dart";
+import "stream_splitter.dart";
/// An asynchronous pull-based interface for accessing stream events.
///
@@ -87,8 +89,17 @@ abstract class StreamQueue<T> {
/// Closing operations are [cancel] and [rest].
bool _isClosed = false;
+ /// The number of events dispatched by this queue.
+ ///
+ /// This counts error events. It doesn't count done events, or events
+ /// dispatched to a stream returned by [rest].
+ int get eventsDispatched => _eventsReceived - _eventQueue.length;
+
+ /// The number of events received by this queue.
+ var _eventsReceived = 0;
+
/// Queue of events not used by a request yet.
- final Queue<Result> _eventQueue = new Queue();
+ final QueueList<Result> _eventQueue = new QueueList();
/// Queue of pending requests.
///
@@ -96,7 +107,7 @@ abstract class StreamQueue<T> {
final Queue<_EventRequest> _requestQueue = new Queue();
/// Create a `StreamQueue` of the events of [source].
- factory StreamQueue(Stream source) = _StreamQueue<T>;
+ factory StreamQueue(Stream<T> source) = _StreamQueue<T>;
StreamQueue._();
@@ -120,6 +131,21 @@ abstract class StreamQueue<T> {
throw _failClosed();
}
+ /// Look at the next [count] data events without consuming them.
+ ///
+ /// Works like [take] except that the events are left in the queue.
+ /// If one of the next [count] events is an error, the returned future
+ /// completes with this error, and the error is still left in the queue.
+ Future<List<T>> lookAhead(int count) {
+ if (count < 0) throw new RangeError.range(count, 0, null, "count");
+ if (!_isClosed) {
+ var request = new _LookAheadRequest<T>(count);
+ _addRequest(request);
+ return request.future;
+ }
+ throw _failClosed();
+ }
+
/// Requests the next (yet unrequested) event from the stream.
///
/// When the requested event arrives, the returned future is completed with
@@ -143,6 +169,19 @@ abstract class StreamQueue<T> {
throw _failClosed();
}
+ /// Looks at the next (yet unrequested) event from the stream.
+ ///
+ /// Like [next] except that the event is not consumed.
+ /// If the next event is an error event, it stays in the queue.
+ Future<T> get peek {
+ if (!_isClosed) {
+ var nextRequest = new _PeekRequest<T>();
+ _addRequest(nextRequest);
+ return nextRequest.future;
+ }
+ throw _failClosed();
+ }
+
/// Returns a stream of all the remaning events of the source stream.
///
/// All requested [next], [skip] or [take] operations are completed
@@ -212,6 +251,123 @@ abstract class StreamQueue<T> {
throw _failClosed();
}
+ /// Requests a transaction that can conditionally consume events.
+ ///
+ /// The transaction can create copies of this queue at the current position
+ /// using [StreamQueueTransaction.newQueue]. Each of these queues is
+ /// independent of one another and of the parent queue. The transaction
+ /// finishes when one of two methods is called:
+ ///
+ /// * [StreamQueueTransaction.commit] updates the parent queue's position to
+ /// match that of one of the copies.
+ ///
+ /// * [StreamQueueTransaction.reject] causes the parent queue to continue as
+ /// though [startTransaction] hadn't been called.
+ ///
+ /// Until the transaction finishes, this queue won't emit any events.
+ ///
+ /// See also [withTransaction] and [cancelable].
+ ///
+ /// ```dart
+ /// /// Consumes all empty lines from the beginning of [lines].
+ /// Future consumeEmptyLines(StreamQueue<String> lines) async {
+ /// while (await lines.hasNext) {
+ /// var transaction = lines.startTransaction();
+ /// var queue = transaction.newQueue();
+ /// if ((await queue.next).isNotEmpty) {
+ /// transaction.reject();
+ /// return;
+ /// } else {
+ /// transaction.commit(queue);
+ /// }
+ /// }
+ /// }
+ /// ```
+ StreamQueueTransaction<T> startTransaction() {
+ if (_isClosed) throw _failClosed();
+
+ var request = new _TransactionRequest(this);
+ _addRequest(request);
+ return request.transaction;
+ }
+
+ /// Passes a copy of this queue to [callback], and updates this queue to match
+ /// the copy's position if [callback] returns `true`.
+ ///
+ /// This queue won't emit any events until [callback] returns. If it returns
+ /// `false`, this queue continues as though [withTransaction] hadn't been
+ /// called. If it throws an error, this updates this queue to match the copy's
+ /// position and throws the error from the returned `Future`.
+ ///
+ /// Returns the same value as [callback].
+ ///
+ /// See also [startTransaction] and [cancelable].
+ ///
+ /// ```dart
+ /// /// Consumes all empty lines from the beginning of [lines].
+ /// Future consumeEmptyLines(StreamQueue<String> lines) async {
+ /// while (await lines.hasNext) {
+ /// // Consume a line if it's empty, otherwise return.
+ /// if (!await lines.withTransaction(
+ /// (queue) async => (await queue.next).isEmpty)) {
+ /// return;
+ /// }
+ /// }
+ /// }
+ /// ```
+ Future<bool> withTransaction(Future<bool> callback(StreamQueue<T> queue)) {
+ var transaction = startTransaction();
+
+ /// Avoid async/await to ensure that [startTransaction] is called
+ /// synchronously and so ends up in the right place in the request queue.
+ var queue = transaction.newQueue();
+ return callback(queue).then((result) {
+ if (result) {
+ transaction.commit(queue);
+ } else {
+ transaction.reject();
+ }
+ return result;
+ }, onError: (error) {
+ transaction.commit(queue);
+ throw error;
+ });
+ }
+
+ /// Passes a copy of this queue to [callback], and updates this queue to match
+ /// the copy's position once [callback] completes.
+ ///
+ /// If the returned [CancelableOperation] is canceled, this queue instead
+ /// continues as though [cancelable] hadn't been called. Otherwise, it emits
+ /// the same value or error as [callback].
+ ///
+ /// See also [startTransaction] and [withTransaction].
+ ///
+ /// ```dart
+ /// final _stdinQueue = new StreamQueue(stdin);
+ ///
+ /// /// Returns an operation that completes when the user sends a line to
+ /// /// standard input.
+ /// ///
+ /// /// If the operation is canceled, stops waiting for user input.
+ /// CancelableOperation<String> nextStdinLine() =>
+ /// _stdinQueue.cancelable((queue) => queue.next);
+ /// ```
+ CancelableOperation<S> cancelable<S>(
+ Future<S> callback(StreamQueue<T> queue)) {
+ var transaction = startTransaction();
+ var completer = new CancelableCompleter<S>(onCancel: () {
+ transaction.reject();
+ });
+
+ var queue = transaction.newQueue();
+ completer.complete(callback(queue).whenComplete(() {
+ if (!completer.isCanceled) transaction.commit(queue);
+ }));
+
+ return completer.operation;
+ }
+
/// Cancels the underlying event source.
///
/// If [immediate] is `false` (the default), the cancel operation waits until
@@ -226,8 +382,8 @@ abstract class StreamQueue<T> {
/// `cancel`.
///
/// After calling `cancel`, no further events can be requested.
- /// None of [next], [rest], [skip], [take] or [cancel] may be
- /// called again.
+ /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
+ /// may be called again.
Future cancel({bool immediate: false}) {
if (_isClosed) throw _failClosed();
_isClosed = true;
@@ -276,7 +432,7 @@ abstract class StreamQueue<T> {
/// Can only be used by the very last request (the stream queue must
/// be closed by that request).
/// Only used by [rest].
- Stream _extractStream();
+ Stream<T> _extractStream();
/// Requests that the event source pauses events.
///
@@ -302,6 +458,7 @@ abstract class StreamQueue<T> {
/// Called when the event source adds a new data or error event.
/// Always calls [_updateRequests] after adding.
void _addResult(Result result) {
+ _eventsReceived++;
_eventQueue.add(result);
_updateRequests();
}
@@ -337,20 +494,19 @@ abstract class StreamQueue<T> {
}
}
-
/// The default implementation of [StreamQueue].
///
/// This queue gets its events from a stream which is listened
/// to when a request needs events.
class _StreamQueue<T> extends StreamQueue<T> {
/// Source of events.
- final Stream _sourceStream;
+ final Stream<T> _sourceStream;
/// Subscription on [_sourceStream] while listening for events.
///
/// Set to subscription when listening, and set to `null` when the
/// subscription is done (and [_isDone] is set to true).
- StreamSubscription _subscription;
+ StreamSubscription<T> _subscription;
_StreamQueue(this._sourceStream) : super._();
@@ -363,20 +519,16 @@ class _StreamQueue<T> extends StreamQueue<T> {
}
void _ensureListening() {
- assert(!_isDone);
+ if (_isDone) return;
if (_subscription == null) {
- _subscription =
- _sourceStream.listen(
- (data) {
- _addResult(new Result.value(data));
- },
- onError: (error, StackTrace stackTrace) {
- _addResult(new Result.error(error, stackTrace));
- },
- onDone: () {
- _subscription = null;
- this._close();
- });
+ _subscription = _sourceStream.listen((data) {
+ _addResult(new Result.value(data));
+ }, onError: (error, StackTrace stackTrace) {
+ _addResult(new Result.error(error, stackTrace));
+ }, onDone: () {
+ _subscription = null;
+ this._close();
+ });
} else {
_subscription.resume();
}
@@ -391,6 +543,7 @@ class _StreamQueue<T> extends StreamQueue<T> {
if (_isDone) {
return new Stream<T>.empty();
}
+ _isDone = true;
if (_subscription == null) {
return _sourceStream;
@@ -398,7 +551,6 @@ class _StreamQueue<T> extends StreamQueue<T> {
var subscription = _subscription;
_subscription = null;
- _isDone = true;
var wasPaused = subscription.isPaused;
var result = new SubscriptionStream<T>(subscription);
@@ -409,6 +561,104 @@ class _StreamQueue<T> extends StreamQueue<T> {
}
}
+/// A transaction on a [StreamQueue], created by [StreamQueue.startTransaction].
+///
+/// Copies of the parent queue may be created using [newQueue]. Calling [commit]
+/// moves the parent queue to a copy's position, and calling [reject] causes it
+/// to continue as though [StreamQueue.startTransaction] was never called.
+class StreamQueueTransaction<T> {
+ /// The parent queue on which this transaction is active.
+ final StreamQueue<T> _parent;
+
+ /// The splitter that produces copies of the parent queue's stream.
+ final StreamSplitter<T> _splitter;
+
+ /// Queues created using [newQueue].
+ final _queues = new Set<StreamQueue>();
+
+ /// Whether [commit] has been called.
+ var _committed = false;
+
+ /// Whether [reject] has been called.
+ var _rejected = false;
+
+ StreamQueueTransaction._(this._parent, Stream<T> source)
+ : _splitter = new StreamSplitter(source);
+
+ /// Creates a new copy of the parent queue.
+ ///
+ /// This copy starts at the parent queue's position when
+ /// [StreamQueue.startTransaction] was called. Its position can be committed
+ /// to the parent queue using [commit].
+ StreamQueue<T> newQueue() {
+ var queue = new StreamQueue(_splitter.split());
+ _queues.add(queue);
+ return queue;
+ }
+
+ /// Commits a queue created using [newQueue].
+ ///
+ /// The parent queue's position is updated to be the same as [queue]'s.
+ /// Further requests on all queues created by this transaction, including
+ /// [queue], will complete as though [cancel] were called with `immediate:
+ /// true`.
+ ///
+ /// Throws a [StateError] if [commit] or [reject] have already been called, or
+ /// if there are pending requests on [queue].
+ void commit(StreamQueue<T> queue) {
+ _assertActive();
+ if (!_queues.contains(queue)) {
+ throw new ArgumentError("Queue doesn't belong to this transaction.");
+ } else if (queue._requestQueue.isNotEmpty) {
+ throw new StateError("A queue with pending requests can't be committed.");
+ }
+ _committed = true;
+
+ // Remove all events from the parent queue that were consumed by the
+ // child queue.
+ for (var j = 0; j < queue.eventsDispatched; j++) {
+ _parent._eventQueue.removeFirst();
+ }
+
+ _done();
+ }
+
+ /// Rejects this transaction without updating the parent queue.
+ ///
+ /// The parent will continue as though [StreamQueue.startTransaction] hadn't
+ /// been called. Further requests on all queues created by this transaction
+ /// will complete as though [cancel] were called with `immediate: true`.
+ ///
+ /// Throws a [StateError] if [commit] or [reject] have already been called.
+ void reject() {
+ _assertActive();
+ _rejected = true;
+ _done();
+ }
+
+ // Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s
+ // request queue, and runs the next request.
+ void _done() {
+ _splitter.close();
+ for (var queue in _queues) {
+ queue._cancel();
+ }
+
+ assert((_parent._requestQueue.first as _TransactionRequest).transaction ==
+ this);
+ _parent._requestQueue.removeFirst();
+ _parent._updateRequests();
+ }
+
+ /// Throws a [StateError] if [accept] or [reject] has already been called.
+ void _assertActive() {
+ if (_committed) {
+ throw new StateError("This transaction has already been accepted.");
+ } else if (_rejected) {
+ throw new StateError("This transaction has already been rejected.");
+ }
+ }
+}
/// Request object that receives events when they arrive, until fulfilled.
///
@@ -424,7 +674,7 @@ class _StreamQueue<T> extends StreamQueue<T> {
///
/// The [close] method is also called immediately when the source stream
/// is done.
-abstract class _EventRequest {
+abstract class _EventRequest<T> {
/// Handle available events.
///
/// The available events are provided as a queue. The `update` function
@@ -445,30 +695,55 @@ abstract class _EventRequest {
/// If the function returns `false` when the stream has already closed
/// ([isDone] is true), then the request must call
/// [StreamQueue._updateRequests] itself when it's ready to continue.
- bool update(Queue<Result> events, bool isDone);
+ bool update(QueueList<Result<T>> events, bool isDone);
}
/// Request for a [StreamQueue.next] call.
///
/// Completes the returned future when receiving the first event,
/// and is then complete.
-class _NextRequest<T> implements _EventRequest {
+class _NextRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by [StreamQueue.next].
- final Completer _completer;
+ final _completer = new Completer<T>();
- _NextRequest() : _completer = new Completer<T>();
+ _NextRequest();
Future<T> get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(QueueList<Result<T>> events, bool isDone) {
if (events.isNotEmpty) {
events.removeFirst().complete(_completer);
return true;
}
if (isDone) {
- var errorFuture =
- new Future.sync(() => throw new StateError("No elements"));
- _completer.complete(errorFuture);
+ _completer.completeError(
+ new StateError("No elements"), StackTrace.current);
+ return true;
+ }
+ return false;
+ }
+}
+
+/// Request for a [StreamQueue.peek] call.
+///
+/// Completes the returned future when receiving the first event,
+/// and is then complete, but doesn't consume the event.
+class _PeekRequest<T> implements _EventRequest<T> {
+ /// Completer for the future returned by [StreamQueue.next].
+ final _completer = new Completer<T>();
+
+ _PeekRequest();
+
+ Future<T> get future => _completer.future;
+
+ bool update(QueueList<Result<T>> events, bool isDone) {
+ if (events.isNotEmpty) {
+ events.first.complete(_completer);
+ return true;
+ }
+ if (isDone) {
+ _completer.completeError(
+ new StateError("No elements"), StackTrace.current);
return true;
}
return false;
@@ -476,9 +751,9 @@ class _NextRequest<T> implements _EventRequest {
}
/// Request for a [StreamQueue.skip] call.
-class _SkipRequest implements _EventRequest {
+class _SkipRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the skip call.
- final Completer _completer = new Completer<int>();
+ final _completer = new Completer<int>();
/// Number of remaining events to skip.
///
@@ -491,9 +766,9 @@ class _SkipRequest implements _EventRequest {
_SkipRequest(this._eventsToSkip);
/// The future completed when the correct number of events have been skipped.
- Future get future => _completer.future;
+ Future<int> get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(QueueList<Result<T>> events, bool isDone) {
while (_eventsToSkip > 0) {
if (events.isEmpty) {
if (isDone) break;
@@ -503,7 +778,7 @@ class _SkipRequest implements _EventRequest {
var event = events.removeFirst();
if (event.isError) {
- event.complete(_completer);
+ _completer.completeError(event.asError.error, event.asError.stackTrace);
return true;
}
}
@@ -512,13 +787,13 @@ class _SkipRequest implements _EventRequest {
}
}
-/// Request for a [StreamQueue.take] call.
-class _TakeRequest<T> implements _EventRequest {
+/// Common superclass for [_TakeRequest] and [_LookAheadRequest].
+abstract class _ListRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the take call.
- final Completer _completer;
+ final _completer = new Completer<List<T>>();
/// List collecting events until enough have been seen.
- final List _list = <T>[];
+ final _list = <T>[];
/// Number of events to capture.
///
@@ -526,24 +801,51 @@ class _TakeRequest<T> implements _EventRequest {
/// this value.
final int _eventsToTake;
- _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
+ _ListRequest(this._eventsToTake);
/// The future completed when the correct number of events have been captured.
- Future get future => _completer.future;
+ Future<List<T>> get future => _completer.future;
+}
- bool update(Queue<Result> events, bool isDone) {
+/// Request for a [StreamQueue.take] call.
+class _TakeRequest<T> extends _ListRequest<T> {
+ _TakeRequest(int eventsToTake) : super(eventsToTake);
+
+ bool update(QueueList<Result<T>> events, bool isDone) {
while (_list.length < _eventsToTake) {
if (events.isEmpty) {
if (isDone) break;
return false;
}
- var result = events.removeFirst();
- if (result.isError) {
- result.complete(_completer);
+ var event = events.removeFirst();
+ if (event.isError) {
+ event.asError.complete(_completer);
+ return true;
+ }
+ _list.add(event.asValue.value);
+ }
+ _completer.complete(_list);
+ return true;
+ }
+}
+
+/// Request for a [StreamQueue.lookAhead] call.
+class _LookAheadRequest<T> extends _ListRequest<T> {
+ _LookAheadRequest(int eventsToTake) : super(eventsToTake);
+
+ bool update(QueueList<Result<T>> events, bool isDone) {
+ while (_list.length < _eventsToTake) {
+ if (events.length == _list.length) {
+ if (isDone) break;
+ return false;
+ }
+ var event = events.elementAt(_list.length);
+ if (event.isError) {
+ event.asError.complete(_completer);
return true;
}
- _list.add(result.asValue.value);
+ _list.add(event.asValue.value);
}
_completer.complete(_list);
return true;
@@ -555,11 +857,10 @@ class _TakeRequest<T> implements _EventRequest {
/// The request needs no events, it just waits in the request queue
/// until all previous events are fulfilled, then it cancels the stream queue
/// source subscription.
-class _CancelRequest implements _EventRequest {
+class _CancelRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the `cancel` call.
- final Completer _completer = new Completer();
+ final _completer = new Completer();
- /// The [StreamQueue] object that has this request queued.
///
/// When the event is completed, it needs to cancel the active subscription
/// of the `StreamQueue` object, if any.
@@ -570,7 +871,7 @@ class _CancelRequest implements _EventRequest {
/// The future completed when the cancel request is completed.
Future get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(QueueList<Result<T>> events, bool isDone) {
if (_streamQueue._isDone) {
_completer.complete();
} else {
@@ -586,22 +887,22 @@ class _CancelRequest implements _EventRequest {
/// The request is always complete, it just waits in the request queue
/// until all previous events are fulfilled, then it takes over the
/// stream events subscription and creates a stream from it.
-class _RestRequest<T> implements _EventRequest {
+class _RestRequest<T> implements _EventRequest<T> {
/// Completer for the stream returned by the `rest` call.
- final StreamCompleter _completer = new StreamCompleter<T>();
+ final _completer = new StreamCompleter<T>();
/// The [StreamQueue] object that has this request queued.
///
/// When the event is completed, it needs to cancel the active subscription
/// of the `StreamQueue` object, if any.
- final StreamQueue _streamQueue;
+ final StreamQueue<T> _streamQueue;
_RestRequest(this._streamQueue);
/// The stream which will contain the remaining events of [_streamQueue].
Stream<T> get stream => _completer.stream;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(QueueList<Result<T>> events, bool isDone) {
if (events.isEmpty) {
if (_streamQueue._isDone) {
_completer.setEmpty();
@@ -615,8 +916,9 @@ class _RestRequest<T> implements _EventRequest {
for (var event in events) {
event.addTo(controller);
}
- controller.addStream(_streamQueue._extractStream(), cancelOnError: false)
- .whenComplete(controller.close);
+ controller
+ .addStream(_streamQueue._extractStream(), cancelOnError: false)
+ .whenComplete(controller.close);
_completer.setSourceStream(controller.stream);
}
return true;
@@ -629,12 +931,12 @@ class _RestRequest<T> implements _EventRequest {
/// but doesn't consume the event.
/// If the request is closed without seeing an event, then
/// the [future] is completed with `false`.
-class _HasNextRequest<T> implements _EventRequest {
- final Completer _completer = new Completer<bool>();
+class _HasNextRequest<T> implements _EventRequest<T> {
+ final _completer = new Completer<bool>();
Future<bool> get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(QueueList<Result<T>> events, bool isDone) {
if (events.isNotEmpty) {
_completer.complete(true);
return true;
@@ -646,3 +948,33 @@ class _HasNextRequest<T> implements _EventRequest {
return false;
}
}
+
+/// Request for a [StreamQueue.startTransaction] call.
+///
+/// This request isn't complete until the user calls
+/// [StreamQueueTransaction.commit] or [StreamQueueTransaction.reject], at which
+/// point it manually removes itself from the request queue and calls
+/// [StreamQueue._updateRequests].
+class _TransactionRequest<T> implements _EventRequest<T> {
+ /// The transaction created by this request.
+ StreamQueueTransaction<T> get transaction => _transaction;
+ StreamQueueTransaction<T> _transaction;
+
+ /// The controller that passes events to [transaction].
+ final _controller = new StreamController<T>(sync: true);
+
+ /// The number of events passed to [_controller] so far.
+ var _eventsSent = 0;
+
+ _TransactionRequest(StreamQueue<T> parent) {
+ _transaction = new StreamQueueTransaction._(parent, _controller.stream);
+ }
+
+ bool update(QueueList<Result<T>> events, bool isDone) {
+ while (_eventsSent < events.length) {
+ events[_eventsSent++].addTo(_controller);
+ }
+ if (isDone && !_controller.isClosed) _controller.close();
+ return false;
+ }
+}
« no previous file with comments | « packages/async/lib/src/stream_group.dart ('k') | packages/async/lib/src/stream_sink_completer.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698