Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(554)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 12330074: Fix some dart:io documentation comments. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | sdk/lib/io/http.dart » ('j') | sdk/lib/io/process.dart » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
70 70
71 /** 71 /**
72 * Creates a single-subscription stream that gets its data from [data]. 72 * Creates a single-subscription stream that gets its data from [data].
73 */ 73 */
74 factory Stream.fromIterable(Iterable<T> data) { 74 factory Stream.fromIterable(Iterable<T> data) {
75 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); 75 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data);
76 return new _GeneratedSingleStreamImpl<T>(iterableEvents); 76 return new _GeneratedSingleStreamImpl<T>(iterableEvents);
77 } 77 }
78 78
79 /** 79 /**
80 * Whether the stream is a broadcast stream. 80 * Reports whether this stream is a broadcast stream.
81 */ 81 */
82 bool get isBroadcast => false; 82 bool get isBroadcast => false;
83 83
84 /** 84 /**
85 * Returns a multi-subscription stream that produces the same events as this. 85 * Returns a multi-subscription stream that produces the same events as this.
86 * 86 *
87 * If this stream is single-subscription, return a new stream that allows 87 * If this stream is single-subscription, return a new stream that allows
88 * multiple subscribers. It will subscribe to this stream when its first 88 * multiple subscribers. It will subscribe to this stream when its first
89 * subscriber is added, and unsubscribe again when the last subscription is 89 * subscriber is added, and unsubscribe again when the last subscription is
90 * cancelled. 90 * cancelled.
91 * 91 *
92 * If this stream is already a broadcast stream, it is returned unmodified. 92 * If this stream is already a broadcast stream, it is returned unmodified.
93 */ 93 */
94 Stream<T> asBroadcastStream() { 94 Stream<T> asBroadcastStream() {
95 if (isBroadcast) return this; 95 if (isBroadcast) return this;
96 return new _SingleStreamMultiplexer<T>(this); 96 return new _SingleStreamMultiplexer<T>(this);
97 } 97 }
98 98
99 /** 99 /**
100 * Add a subscription to this stream. 100 * Adds a subscription to this stream.
101 * 101 *
102 * On each data event from this stream, the subscribers [onData] handler 102 * On each data event from this stream, the subscriber's [onData] handler
103 * is called. If [onData] is null, nothing happens. 103 * is called. If [onData] is null, nothing happens.
104 * 104 *
105 * On errors from this stream, the [onError] handler is given a 105 * On errors from this stream, the [onError] handler is given a
106 * [AsyncError] object describing the error. 106 * [AsyncError] object describing the error.
107 * 107 *
108 * If this stream closes, the [onDone] handler is called. 108 * If this stream closes, the [onDone] handler is called.
109 * 109 *
110 * If [unsubscribeOnError] is true, the subscription is ended when 110 * If [unsubscribeOnError] is true, the subscription is ended when
111 * the first error is reported. The default is false. 111 * the first error is reported. The default is false.
112 */ 112 */
113 StreamSubscription<T> listen(void onData(T event), 113 StreamSubscription<T> listen(void onData(T event),
114 { void onError(AsyncError error), 114 { void onError(AsyncError error),
115 void onDone(), 115 void onDone(),
116 bool unsubscribeOnError}); 116 bool unsubscribeOnError});
117 117
118 /** 118 /**
119 * Creates a new stream from this stream that discards some data events. 119 * Creates a new stream from this stream that discards some data events.
120 * 120 *
121 * The new stream sends the same error and done events as this stream, 121 * The new stream sends the same error and done events as this stream,
122 * but it only sends the data events that satisfy the [test]. 122 * but it only sends the data events that satisfy the [test].
123 */ 123 */
124 Stream<T> where(bool test(T event)) { 124 Stream<T> where(bool test(T event)) {
125 return new _WhereStream<T>(this, test); 125 return new _WhereStream<T>(this, test);
126 } 126 }
127 127
128 /** 128 /**
129 * Create a new stream that converts each element of this stream 129 * Creates a new stream that converts each element of this stream
130 * to a new value using the [convert] function. 130 * to a new value using the [convert] function.
131 */ 131 */
132 Stream map(convert(T event)) { 132 Stream map(convert(T event)) {
133 return new _MapStream<T, dynamic>(this, convert); 133 return new _MapStream<T, dynamic>(this, convert);
134 } 134 }
135 135
136 /** 136 /**
137 * Create a wrapper Stream that intercepts some errors from this stream. 137 * Creates a wrapper Stream that intercepts some errors from this stream.
138 * 138 *
139 * If this stream sends an error that matches [test], then it is intercepted 139 * If this stream sends an error that matches [test], then it is intercepted
140 * by the [handle] function. 140 * by the [handle] function.
141 * 141 *
142 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns 142 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns
143 * true. If [test] is omitted, every error is considered matching. 143 * true. If [test] is omitted, every error is considered matching.
144 * 144 *
145 * If the error is intercepted, the [handle] function can decide what to do 145 * If the error is intercepted, the [handle] function can decide what to do
146 * with it. It can throw if it wants to raise a new (or the same) error, 146 * with it. It can throw if it wants to raise a new (or the same) error,
147 * or simply return to make the stream forget the error. 147 * or simply return to make the stream forget the error.
148 * 148 *
149 * If you need to transform an error into a data event, use the more generic 149 * If you need to transform an error into a data event, use the more generic
150 * [Stream.transformEvent] to handle the event by writing a data event to 150 * [Stream.transformEvent] to handle the event by writing a data event to
151 * the output sink 151 * the output sink
152 */ 152 */
153 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { 153 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) {
154 return new _HandleErrorStream<T>(this, handle, test); 154 return new _HandleErrorStream<T>(this, handle, test);
155 } 155 }
156 156
157 /** 157 /**
158 * Create a new stream from this stream that converts each element 158 * Creates a new stream from this stream that converts each element
159 * into zero or more events. 159 * into zero or more events.
160 * 160 *
161 * Each incoming event is converted to an [Iterable] of new events, 161 * Each incoming event is converted to an [Iterable] of new events,
162 * and each of these new events are then sent by the returned stream 162 * and each of these new events are then sent by the returned stream
163 * in order. 163 * in order.
164 */ 164 */
165 Stream expand(Iterable convert(T value)) { 165 Stream expand(Iterable convert(T value)) {
166 return new _ExpandStream<T, dynamic>(this, convert); 166 return new _ExpandStream<T, dynamic>(this, convert);
167 } 167 }
168 168
169 /** 169 /**
170 * Bind this stream as the input of the provided [StreamConsumer]. 170 * Binds this stream as the input of the provided [StreamConsumer].
171 */ 171 */
172 Future pipe(StreamConsumer<T, dynamic> streamConsumer) { 172 Future pipe(StreamConsumer<T, dynamic> streamConsumer) {
173 return streamConsumer.consume(this); 173 return streamConsumer.consume(this);
174 } 174 }
175 175
176 /** 176 /**
177 * Chain this stream as the input of the provided [StreamTransformer]. 177 * Chains this stream as the input of the provided [StreamTransformer].
178 * 178 *
179 * Returns the result of [:streamTransformer.bind:] itself. 179 * Returns the result of [:streamTransformer.bind:] itself.
180 */ 180 */
181 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { 181 Stream transform(StreamTransformer<T, dynamic> streamTransformer) {
182 return streamTransformer.bind(this); 182 return streamTransformer.bind(this);
183 } 183 }
184 184
185 /** Reduces a sequence of values by repeatedly applying [combine]. */ 185 /** Reduces a sequence of values by repeatedly applying [combine]. */
186 Future reduce(var initialValue, combine(var previous, T element)) { 186 Future reduce(var initialValue, combine(var previous, T element)) {
187 _FutureImpl result = new _FutureImpl(); 187 _FutureImpl result = new _FutureImpl();
(...skipping 30 matching lines...) Expand all
218 onDone: () { 218 onDone: () {
219 sink.close(); 219 sink.close();
220 result._setValue(null); 220 result._setValue(null);
221 }, 221 },
222 unsubscribeOnError: unsubscribeOnError); 222 unsubscribeOnError: unsubscribeOnError);
223 return result; 223 return result;
224 } 224 }
225 225
226 226
227 /** 227 /**
228 * Check whether [match] occurs in the elements provided by this stream. 228 * Checks whether [match] occurs in the elements provided by this stream.
229 * 229 *
230 * Completes the [Future] when the answer is known. 230 * Completes the [Future] when the answer is known.
231 * If this stream reports an error, the [Future] will report that error. 231 * If this stream reports an error, the [Future] will report that error.
232 */ 232 */
233 Future<bool> contains(T match) { 233 Future<bool> contains(T match) {
234 _FutureImpl<bool> future = new _FutureImpl<bool>(); 234 _FutureImpl<bool> future = new _FutureImpl<bool>();
235 StreamSubscription subscription; 235 StreamSubscription subscription;
236 subscription = this.listen( 236 subscription = this.listen(
237 // TODO(ahe): Restore type when feature is implemented in dart2js 237 // TODO(ahe): Restore type when feature is implemented in dart2js
238 // checked mode. http://dartbug.com/7733 238 // checked mode. http://dartbug.com/7733
(...skipping 11 matching lines...) Expand all
250 }, 250 },
251 onError: future._setError, 251 onError: future._setError,
252 onDone: () { 252 onDone: () {
253 future._setValue(false); 253 future._setValue(false);
254 }, 254 },
255 unsubscribeOnError: true); 255 unsubscribeOnError: true);
256 return future; 256 return future;
257 } 257 }
258 258
259 /** 259 /**
260 * Check whether [test] accepts all elements provided by this stream. 260 * Checks whether [test] accepts all elements provided by this stream.
261 * 261 *
262 * Completes the [Future] when the answer is known. 262 * Completes the [Future] when the answer is known.
263 * If this stream reports an error, the [Future] will report that error. 263 * If this stream reports an error, the [Future] will report that error.
264 */ 264 */
265 Future<bool> every(bool test(T element)) { 265 Future<bool> every(bool test(T element)) {
266 _FutureImpl<bool> future = new _FutureImpl<bool>(); 266 _FutureImpl<bool> future = new _FutureImpl<bool>();
267 StreamSubscription subscription; 267 StreamSubscription subscription;
268 subscription = this.listen( 268 subscription = this.listen(
269 // TODO(ahe): Restore type when feature is implemented in dart2js 269 // TODO(ahe): Restore type when feature is implemented in dart2js
270 // checked mode. http://dartbug.com/7733 270 // checked mode. http://dartbug.com/7733
(...skipping 11 matching lines...) Expand all
282 }, 282 },
283 onError: future._setError, 283 onError: future._setError,
284 onDone: () { 284 onDone: () {
285 future._setValue(true); 285 future._setValue(true);
286 }, 286 },
287 unsubscribeOnError: true); 287 unsubscribeOnError: true);
288 return future; 288 return future;
289 } 289 }
290 290
291 /** 291 /**
292 * Check whether [test] accepts any element provided by this stream. 292 * Checks whether [test] accepts any element provided by this stream.
293 * 293 *
294 * Completes the [Future] when the answer is known. 294 * Completes the [Future] when the answer is known.
295 * If this stream reports an error, the [Future] will report that error. 295 * If this stream reports an error, the [Future] will report that error.
296 */ 296 */
297 Future<bool> any(bool test(T element)) { 297 Future<bool> any(bool test(T element)) {
298 _FutureImpl<bool> future = new _FutureImpl<bool>(); 298 _FutureImpl<bool> future = new _FutureImpl<bool>();
299 StreamSubscription subscription; 299 StreamSubscription subscription;
300 subscription = this.listen( 300 subscription = this.listen(
301 // TODO(ahe): Restore type when feature is implemented in dart2js 301 // TODO(ahe): Restore type when feature is implemented in dart2js
302 // checked mode. http://dartbug.com/7733 302 // checked mode. http://dartbug.com/7733
(...skipping 130 matching lines...) Expand 10 before | Expand all | Expand 10 after
433 future._setValue(false); 433 future._setValue(false);
434 }, 434 },
435 onError: future._setError, 435 onError: future._setError,
436 onDone: () { 436 onDone: () {
437 future._setValue(true); 437 future._setValue(true);
438 }, 438 },
439 unsubscribeOnError: true); 439 unsubscribeOnError: true);
440 return future; 440 return future;
441 } 441 }
442 442
443 /** Collect the data of this stream in a [List]. */ 443 /** Collects the data of this stream in a [List]. */
444 Future<List<T>> toList() { 444 Future<List<T>> toList() {
445 List<T> result = <T>[]; 445 List<T> result = <T>[];
446 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); 446 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>();
447 this.listen( 447 this.listen(
448 // TODO(ahe): Restore type when feature is implemented in dart2js 448 // TODO(ahe): Restore type when feature is implemented in dart2js
449 // checked mode. http://dartbug.com/7733 449 // checked mode. http://dartbug.com/7733
450 (/*T*/ data) { 450 (/*T*/ data) {
451 result.add(data); 451 result.add(data);
452 }, 452 },
453 onError: future._setError, 453 onError: future._setError,
454 onDone: () { 454 onDone: () {
455 future._setValue(result); 455 future._setValue(result);
456 }, 456 },
457 unsubscribeOnError: true); 457 unsubscribeOnError: true);
458 return future; 458 return future;
459 } 459 }
460 460
461 /** Collect the data of this stream in a [Set]. */ 461 /** Collects the data of this stream in a [Set]. */
462 Future<Set<T>> toSet() { 462 Future<Set<T>> toSet() {
463 Set<T> result = new Set<T>(); 463 Set<T> result = new Set<T>();
464 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); 464 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>();
465 this.listen( 465 this.listen(
466 // TODO(ahe): Restore type when feature is implemented in dart2js 466 // TODO(ahe): Restore type when feature is implemented in dart2js
467 // checked mode. http://dartbug.com/7733 467 // checked mode. http://dartbug.com/7733
468 (/*T*/ data) { 468 (/*T*/ data) {
469 result.add(data); 469 result.add(data);
470 }, 470 },
471 onError: future._setError, 471 onError: future._setError,
472 onDone: () { 472 onDone: () {
473 future._setValue(result); 473 future._setValue(result);
474 }, 474 },
475 unsubscribeOnError: true); 475 unsubscribeOnError: true);
476 return future; 476 return future;
477 } 477 }
478 478
479 /** 479 /**
480 * Provide at most the first [n] values of this stream. 480 * Provides at most the first [n] values of this stream.
481 * 481 *
482 * Forwards the first [n] data events of this stream, and all error 482 * Forwards the first [n] data events of this stream, and all error
483 * events, to the returned stream, and ends with a done event. 483 * events, to the returned stream, and ends with a done event.
484 * 484 *
485 * If this stream produces fewer than [count] values before it's done, 485 * If this stream produces fewer than [count] values before it's done,
486 * so will the returned stream. 486 * so will the returned stream.
487 */ 487 */
488 Stream<T> take(int count) { 488 Stream<T> take(int count) {
489 return new _TakeStream(this, count); 489 return new _TakeStream(this, count);
490 } 490 }
(...skipping 23 matching lines...) Expand all
514 * Error and done events are provided by the returned stream unmodified. 514 * Error and done events are provided by the returned stream unmodified.
515 * 515 *
516 * Starting with the first data event where [test] returns true for the 516 * Starting with the first data event where [test] returns true for the
517 * event data, the returned stream will have the same events as this stream. 517 * event data, the returned stream will have the same events as this stream.
518 */ 518 */
519 Stream<T> skipWhile(bool test(T value)) { 519 Stream<T> skipWhile(bool test(T value)) {
520 return new _SkipWhileStream(this, test); 520 return new _SkipWhileStream(this, test);
521 } 521 }
522 522
523 /** 523 /**
524 * Skip data events if they are equal to the previous data event. 524 * Skips data events if they are equal to the previous data event.
525 * 525 *
526 * The returned stream provides the same events as this stream, except 526 * The returned stream provides the same events as this stream, except
527 * that it never provides two consequtive data events that are equal. 527 * that it never provides two consequtive data events that are equal.
528 * 528 *
529 * Equality is determined by the provided [equals] method. If that is 529 * Equality is determined by the provided [equals] method. If that is
530 * omitted, the '==' operator on the last provided data element is used. 530 * omitted, the '==' operator on the last provided data element is used.
531 */ 531 */
532 Stream<T> distinct([bool equals(T previous, T next)]) { 532 Stream<T> distinct([bool equals(T previous, T next)]) {
533 return new _DistinctStream(this, equals); 533 return new _DistinctStream(this, equals);
534 } 534 }
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
617 future._setValue(result); 617 future._setValue(result);
618 return; 618 return;
619 } 619 }
620 future._setError(new AsyncError(new StateError("No elements"))); 620 future._setError(new AsyncError(new StateError("No elements")));
621 }, 621 },
622 unsubscribeOnError: true); 622 unsubscribeOnError: true);
623 return future; 623 return future;
624 } 624 }
625 625
626 /** 626 /**
627 * Find the first element of this stream matching [test]. 627 * Finds the first element of this stream matching [test].
628 * 628 *
629 * Returns a future that is filled with the first element of this stream 629 * Returns a future that is filled with the first element of this stream
630 * that [test] returns true for. 630 * that [test] returns true for.
631 * 631 *
632 * If no such element is found before this stream is done, and a 632 * If no such element is found before this stream is done, and a
633 * [defaultValue] function is provided, the result of calling [defaultValue] 633 * [defaultValue] function is provided, the result of calling [defaultValue]
634 * becomes the value of the future. 634 * becomes the value of the future.
635 * 635 *
636 * If an error occurs, or if this stream ends without finding a match and 636 * If an error occurs, or if this stream ends without finding a match and
637 * with no [defaultValue] function provided, the future will receive an 637 * with no [defaultValue] function provided, the future will receive an
(...skipping 466 matching lines...) Expand 10 before | Expand all | Expand 10 after
1104 } 1104 }
1105 1105
1106 class _StreamOutputSinkWrapper<T> implements StreamSink<T> { 1106 class _StreamOutputSinkWrapper<T> implements StreamSink<T> {
1107 _StreamOutputSink _sink; 1107 _StreamOutputSink _sink;
1108 _StreamOutputSinkWrapper(this._sink); 1108 _StreamOutputSinkWrapper(this._sink);
1109 1109
1110 void add(T data) => _sink._sendData(data); 1110 void add(T data) => _sink._sendData(data);
1111 void signalError(AsyncError error) => _sink._sendError(error); 1111 void signalError(AsyncError error) => _sink._sendError(error);
1112 void close() => _sink._sendDone(); 1112 void close() => _sink._sendDone();
1113 } 1113 }
OLDNEW
« no previous file with comments | « no previous file | sdk/lib/io/http.dart » ('j') | sdk/lib/io/process.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698