Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(49)

Side by Side Diff: lib/src/stream_completer.dart

Issue 1615253002: Add ClosedStreamSink, and *Completer.setError, and StreamSinkCompleter.fromFuture. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Code review changes Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/null_stream_sink.dart ('k') | lib/src/stream_sink_completer.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.stream_completer; 5 library async.stream_completer;
6 6
7 import "dart:async"; 7 import "dart:async";
8 8
9 /// A single-subscription [stream] where the contents are provided later. 9 /// A single-subscription [stream] where the contents are provided later.
10 /// 10 ///
(...skipping 21 matching lines...) Expand all
32 /// 32 ///
33 /// This creates a stream using a stream completer, 33 /// This creates a stream using a stream completer,
34 /// and sets the source stream to the result of the future when the 34 /// and sets the source stream to the result of the future when the
35 /// future completes. 35 /// future completes.
36 /// 36 ///
37 /// If the future completes with an error, the returned stream will 37 /// If the future completes with an error, the returned stream will
38 /// instead contain just that error. 38 /// instead contain just that error.
39 static Stream fromFuture(Future<Stream> streamFuture) { 39 static Stream fromFuture(Future<Stream> streamFuture) {
40 var completer = new StreamCompleter(); 40 var completer = new StreamCompleter();
41 streamFuture.then(completer.setSourceStream, 41 streamFuture.then(completer.setSourceStream,
42 onError: (e, s) { 42 onError: completer.setError);
43 completer.setSourceStream(streamFuture.asStream());
44 });
45 return completer.stream; 43 return completer.stream;
46 } 44 }
47 45
48 /// The stream of this completer. 46 /// The stream of this completer.
49 /// 47 ///
50 /// This stream is always a single-subscription stream. 48 /// This stream is always a single-subscription stream.
51 /// 49 ///
52 /// When a source stream is provided, its events will be forwarded to 50 /// When a source stream is provided, its events will be forwarded to
53 /// listeners on this stream. 51 /// listeners on this stream.
54 /// 52 ///
(...skipping 14 matching lines...) Expand all
69 /// normal subscription, and can be paused or canceled, but it won't 67 /// normal subscription, and can be paused or canceled, but it won't
70 /// produce any events until a source stream is provided. 68 /// produce any events until a source stream is provided.
71 /// 69 ///
72 /// If the `stream` subscription is canceled before a source stream is set, 70 /// If the `stream` subscription is canceled before a source stream is set,
73 /// the source stream will be listened to and immediately canceled again. 71 /// the source stream will be listened to and immediately canceled again.
74 /// 72 ///
75 /// Otherwise, when the source stream is then set, 73 /// Otherwise, when the source stream is then set,
76 /// it is immediately listened to, and its events are forwarded to the 74 /// it is immediately listened to, and its events are forwarded to the
77 /// existing subscription. 75 /// existing subscription.
78 /// 76 ///
79 /// Either [setSourceStream] or [setEmpty] may be called at most once. 77 /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
80 /// Trying to call either of them again will fail. 78 /// most once. Trying to call any of them again will fail.
81 void setSourceStream(Stream<T> sourceStream) { 79 void setSourceStream(Stream<T> sourceStream) {
82 if (_stream._isSourceStreamSet) { 80 if (_stream._isSourceStreamSet) {
83 throw new StateError("Source stream already set"); 81 throw new StateError("Source stream already set");
84 } 82 }
85 _stream._setSourceStream(sourceStream); 83 _stream._setSourceStream(sourceStream);
86 } 84 }
87 85
88 /// Equivalent to setting an empty stream using [setSourceStream]. 86 /// Equivalent to setting an empty stream using [setSourceStream].
89 /// 87 ///
90 /// Either [setSourceStream] or [setEmpty] may be called at most once. 88 /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
91 /// Trying to call either of them again will fail. 89 /// most once. Trying to call any of them again will fail.
92 void setEmpty() { 90 void setEmpty() {
93 if (_stream._isSourceStreamSet) { 91 if (_stream._isSourceStreamSet) {
94 throw new StateError("Source stream already set"); 92 throw new StateError("Source stream already set");
95 } 93 }
96 _stream._setEmpty(); 94 _stream._setEmpty();
97 } 95 }
96
97 /// Completes this to a stream that emits [error] and then closes.
98 ///
99 /// This is useful when the process of creating the data for the stream fails.
100 ///
101 /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
102 /// most once. Trying to call any of them again will fail.
103 void setError(error, [StackTrace stackTrace]) {
104 setSourceStream(new Stream.fromFuture(new Future.error(error, stackTrace)));
105 }
98 } 106 }
99 107
100 /// Stream completed by [StreamCompleter]. 108 /// Stream completed by [StreamCompleter].
101 class _CompleterStream<T> extends Stream<T> { 109 class _CompleterStream<T> extends Stream<T> {
102 /// Controller for an intermediate stream. 110 /// Controller for an intermediate stream.
103 /// 111 ///
104 /// Created if the user listens on this stream before the source stream 112 /// Created if the user listens on this stream before the source stream
105 /// is set, or if using [_setEmpty] so there is no source stream. 113 /// is set, or if using [_setEmpty] so there is no source stream.
106 StreamController _controller; 114 StreamController _controller;
107 115
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
171 _sourceStream = _controller.stream; // Mark stream as set. 179 _sourceStream = _controller.stream; // Mark stream as set.
172 _controller.close(); 180 _controller.close();
173 } 181 }
174 182
175 // Creates the [_controller]. 183 // Creates the [_controller].
176 void _createController() { 184 void _createController() {
177 assert(_controller == null); 185 assert(_controller == null);
178 _controller = new StreamController<T>(sync: true); 186 _controller = new StreamController<T>(sync: true);
179 } 187 }
180 } 188 }
OLDNEW
« no previous file with comments | « lib/src/null_stream_sink.dart ('k') | lib/src/stream_sink_completer.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698