Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(102)

Unified Diff: packages/async/lib/src/stream_completer.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: packages/async/lib/src/stream_completer.dart
diff --git a/packages/async/lib/src/stream_completer.dart b/packages/async/lib/src/stream_completer.dart
index c343e6e7bd2330ccda59e9c793f96b0de3fbed50..4311de5216a7fd0faa10d49e2d0789ba557a1600 100644
--- a/packages/async/lib/src/stream_completer.dart
+++ b/packages/async/lib/src/stream_completer.dart
@@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-library async.stream_completer;
-
import "dart:async";
/// A single-subscription [stream] where the contents are provided later.
@@ -26,7 +24,7 @@ import "dart:async";
/// the listen is performed directly on the source stream.
class StreamCompleter<T> {
/// The stream doing the actual work, is returned by [stream].
- final _CompleterStream _stream = new _CompleterStream<T>();
+ final _stream = new _CompleterStream<T>();
/// Convert a `Future<Stream>` to a `Stream`.
///
@@ -36,12 +34,9 @@ class StreamCompleter<T> {
///
/// If the future completes with an error, the returned stream will
/// instead contain just that error.
- static Stream fromFuture(Future<Stream> streamFuture) {
- var completer = new StreamCompleter();
- streamFuture.then(completer.setSourceStream,
- onError: (e, s) {
- completer.setSourceStream(streamFuture.asStream());
- });
+ static Stream<T> fromFuture<T>(Future<Stream<T>> streamFuture) {
+ var completer = new StreamCompleter<T>();
+ streamFuture.then(completer.setSourceStream, onError: completer.setError);
return completer.stream;
}
@@ -76,8 +71,8 @@ class StreamCompleter<T> {
/// it is immediately listened to, and its events are forwarded to the
/// existing subscription.
///
- /// Either [setSourceStream] or [setEmpty] may be called at most once.
- /// Trying to call either of them again will fail.
+ /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
+ /// most once. Trying to call any of them again will fail.
void setSourceStream(Stream<T> sourceStream) {
if (_stream._isSourceStreamSet) {
throw new StateError("Source stream already set");
@@ -87,14 +82,24 @@ class StreamCompleter<T> {
/// Equivalent to setting an empty stream using [setSourceStream].
///
- /// Either [setSourceStream] or [setEmpty] may be called at most once.
- /// Trying to call either of them again will fail.
+ /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
+ /// most once. Trying to call any of them again will fail.
void setEmpty() {
if (_stream._isSourceStreamSet) {
throw new StateError("Source stream already set");
}
_stream._setEmpty();
}
+
+ /// Completes this to a stream that emits [error] and then closes.
+ ///
+ /// This is useful when the process of creating the data for the stream fails.
+ ///
+ /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
+ /// most once. Trying to call any of them again will fail.
+ void setError(error, [StackTrace stackTrace]) {
+ setSourceStream(new Stream.fromFuture(new Future.error(error, stackTrace)));
+ }
}
/// Stream completed by [StreamCompleter].
@@ -103,32 +108,30 @@ class _CompleterStream<T> extends Stream<T> {
///
/// Created if the user listens on this stream before the source stream
/// is set, or if using [_setEmpty] so there is no source stream.
- StreamController _controller;
+ StreamController<T> _controller;
/// Source stream for the events provided by this stream.
///
/// Set when the completer sets the source stream using [_setSourceStream]
/// or [_setEmpty].
- Stream _sourceStream;
+ Stream<T> _sourceStream;
StreamSubscription<T> listen(onData(T data),
- {Function onError,
- void onDone(),
- bool cancelOnError}) {
+ {Function onError, void onDone(), bool cancelOnError}) {
if (_controller == null) {
if (_sourceStream != null && !_sourceStream.isBroadcast) {
// If the source stream is itself single subscription,
// just listen to it directly instead of creating a controller.
- return _sourceStream.listen(onData, onError: onError, onDone: onDone,
- cancelOnError: cancelOnError);
+ return _sourceStream.listen(onData,
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
_createController();
if (_sourceStream != null) {
_linkStreamToController();
}
}
- return _controller.stream.listen(onData, onError: onError, onDone: onDone,
- cancelOnError: cancelOnError);
+ return _controller.stream.listen(onData,
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
/// Whether a source stream has been set.
@@ -155,8 +158,9 @@ class _CompleterStream<T> extends Stream<T> {
void _linkStreamToController() {
assert(_controller != null);
assert(_sourceStream != null);
- _controller.addStream(_sourceStream, cancelOnError: false)
- .whenComplete(_controller.close);
+ _controller
+ .addStream(_sourceStream, cancelOnError: false)
+ .whenComplete(_controller.close);
}
/// Sets an empty source stream.
@@ -168,7 +172,7 @@ class _CompleterStream<T> extends Stream<T> {
if (_controller == null) {
_createController();
}
- _sourceStream = _controller.stream; // Mark stream as set.
+ _sourceStream = _controller.stream; // Mark stream as set.
_controller.close();
}
« no previous file with comments | « packages/async/lib/src/single_subscription_transformer.dart ('k') | packages/async/lib/src/stream_group.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698