| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ | 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ |
| 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { | 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { |
| 9 if (error is AsyncError) return error; | 9 if (error is AsyncError) return error; |
| 10 if (cause == null) return new AsyncError(error, stackTrace); | 10 if (cause == null) return new AsyncError(error, stackTrace); |
| (...skipping 20 matching lines...) Expand all Loading... |
| 31 | 31 |
| 32 /** Helper function to make an onError argument to [_runUserCode]. */ | 32 /** Helper function to make an onError argument to [_runUserCode]. */ |
| 33 _cancelAndError(StreamSubscription subscription, _FutureImpl future) => | 33 _cancelAndError(StreamSubscription subscription, _FutureImpl future) => |
| 34 (AsyncError error) { | 34 (AsyncError error) { |
| 35 subscription.cancel(); | 35 subscription.cancel(); |
| 36 future._setError(error); | 36 future._setError(error); |
| 37 }; | 37 }; |
| 38 | 38 |
| 39 | 39 |
| 40 /** | 40 /** |
| 41 * A wrapper around a stream that allows independent subscribers. | 41 * A [StreamTransformer] that forwards events and subscriptions. |
| 42 * | 42 * |
| 43 * By default [this] subscribes to [_source] and forwards all events to its own | 43 * By default this transformer subscribes to [_source] and forwards all events |
| 44 * subscribers. It does not subscribe until there is a subscriber, and | 44 * to [_stream]. It does not subscribe to [_source] until there is a subscriber, |
| 45 * unsubscribes again when there are no subscribers left. | 45 * on [_stream] and unsubscribes again when there are no subscribers left. |
| 46 * | 46 * |
| 47 * The events are passed through the [_handleData], [_handleError] and | 47 * The events are passed through the [_handleData], [_handleError] and |
| 48 * [_handleDone] methods. Subclasses are supposed to add handling of some of | 48 * [_handleDone] methods. Subclasses are supposed to add handling of some of |
| 49 * the events by overriding these methods. | 49 * the events by overriding these methods. |
| 50 * | 50 * |
| 51 * This class is intended for internal use only. | 51 * This class is intended for internal use only. |
| 52 */ | 52 */ |
| 53 class _ForwardingMultiStream<S, T> extends _MultiStreamImpl<T> { | 53 /** |
| 54 Stream<S> _source = null; | 54 * |
| 55 StreamSubscription _subscription = null; | 55 * Handles backwards propagation of subscription and pause. |
| 56 */ |
| 57 class _ForwardingStreamTransformer<S, T> implements StreamTransformer<S, T> { |
| 58 Stream<T> _stream; |
| 59 Stream<S> _source; |
| 60 StreamSubscription<S> _subscription; |
| 56 | 61 |
| 57 void _subscribeToSource() { | 62 Stream<T> _createOutputStream() { |
| 58 _subscription = _source.listen(this._handleData, | 63 if (_source.isSingleSubscription) { |
| 59 onError: this._handleError, | 64 return new _ForwardingSingleStream<T>(this); |
| 60 onDone: this._handleDone); | 65 } |
| 61 if (_isPaused) { | 66 return new _ForwardingMultiStream<T>(this); |
| 62 _subscription.pause(); | 67 } |
| 68 |
| 69 Stream<T> bind(Stream<S> source) { |
| 70 if (_source != null) { |
| 71 throw new StateError("Transformer source already bound"); |
| 72 } |
| 73 _source = source; |
| 74 _stream = _createOutputStream(); |
| 75 return _stream; |
| 76 } |
| 77 |
| 78 void _onPauseStateChange(bool isPaused) { |
| 79 if (isPaused) { |
| 80 if (_subscription != null) { |
| 81 _subscription.pause(); |
| 82 } |
| 83 } else { |
| 84 if (_subscription != null) { |
| 85 _subscription.resume(); |
| 86 } |
| 63 } | 87 } |
| 64 } | 88 } |
| 65 | 89 |
| 66 /** | 90 /** |
| 67 * Subscribe or unsubscribe on [source] depending on whether | 91 * Subscribe or unsubscribe on [_source] depending on whether |
| 68 * [stream] has subscribers. | 92 * [_stream] has subscribers. |
| 69 */ | 93 */ |
| 70 void _onSubscriptionStateChange() { | 94 void _onSubscriptionStateChange(bool hasSubscribers) { |
| 71 if (_hasSubscribers) { | 95 if (hasSubscribers) { |
| 72 assert(_subscription == null); | 96 assert(_subscription == null); |
| 73 if (_source != null) { | 97 _subscription = _source.listen(this._handleData, |
| 74 _subscribeToSource(); | 98 onError: this._handleError, |
| 75 } | 99 onDone: this._handleDone); |
| 76 } else { | 100 } else { |
| 77 if (_subscription != null) { | 101 // TODO(lrn): Check why this can happen. |
| 78 _subscription.cancel(); | 102 if (_subscription == null) return; |
| 79 _subscription = null; | 103 _subscription.cancel(); |
| 80 } | 104 _subscription = null; |
| 81 } | |
| 82 } | |
| 83 | |
| 84 void _onPauseStateChange() { | |
| 85 if (_subscription == null) return; | |
| 86 if (isPaused) { | |
| 87 _subscription.pause(); | |
| 88 } else { | |
| 89 _subscription.resume(); | |
| 90 } | 105 } |
| 91 } | 106 } |
| 92 | 107 |
| 93 void _handleData(S inputEvent) { | 108 void _handleData(S inputEvent) { |
| 94 var outputEvent = inputEvent; | 109 var outputEvent = inputEvent; |
| 95 _add(outputEvent); | 110 _stream._add(outputEvent); |
| 96 } | 111 } |
| 97 | 112 |
| 98 void _handleError(AsyncError error) { | 113 void _handleError(AsyncError error) { |
| 99 _signalError(error); | 114 _stream._signalError(error); |
| 100 } | 115 } |
| 101 | 116 |
| 102 void _handleDone() { | 117 void _handleDone() { |
| 103 _close(); | 118 _stream._close(); |
| 104 } | 119 } |
| 105 } | 120 } |
| 106 | 121 |
| 122 class _ForwardingMultiStream<T> extends _MultiStreamImpl<T> { |
| 123 _ForwardingStreamTransformer _transformer; |
| 124 _ForwardingMultiStream(this._transformer); |
| 107 | 125 |
| 108 abstract class _ForwardingTransformer<S, T> extends _ForwardingMultiStream<S, T> | 126 _onSubscriptionStateChange() { |
| 109 implements StreamTransformer<S, T> { | 127 _transformer._onSubscriptionStateChange(_hasSubscribers); |
| 110 Stream<T> bind(Stream<S> source) { | 128 } |
| 111 if (_source != null) throw new StateError("Already bound to source."); | 129 |
| 112 _source = source; | 130 _onPauseStateChange() { |
| 113 if (_hasSubscribers) { | 131 _transformer._onPauseStateChange(_isPaused); |
| 114 _subscribeToSource(); | |
| 115 } | |
| 116 return this; | |
| 117 } | 132 } |
| 118 } | 133 } |
| 119 | 134 |
| 135 class _ForwardingSingleStream<T> extends _SingleStreamImpl<T> { |
| 136 _ForwardingStreamTransformer _transformer; |
| 137 _ForwardingSingleStream(this._transformer); |
| 138 |
| 139 _onSubscriptionStateChange() { |
| 140 _transformer._onSubscriptionStateChange(_hasSubscribers); |
| 141 } |
| 142 |
| 143 _onPauseStateChange() { |
| 144 _transformer._onPauseStateChange(_isPaused); |
| 145 } |
| 146 } |
| 147 |
| 148 |
| 120 // ------------------------------------------------------------------- | 149 // ------------------------------------------------------------------- |
| 121 // Stream transformers used by the default Stream implementation. | 150 // Stream transformers used by the default Stream implementation. |
| 122 // ------------------------------------------------------------------- | 151 // ------------------------------------------------------------------- |
| 123 | 152 |
| 124 typedef bool _Predicate<T>(T value); | 153 typedef bool _Predicate<T>(T value); |
| 125 | 154 |
| 126 class WhereStream<T> extends _ForwardingTransformer<T, T> { | 155 class WhereTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
| 127 final _Predicate<T> _test; | 156 final _Predicate<T> _test; |
| 128 | 157 |
| 129 WhereStream(bool test(T value)) | 158 WhereTransformer(bool test(T value)) |
| 130 : this._test = test; | 159 : this._test = test; |
| 131 | 160 |
| 132 void _handleData(T inputEvent) { | 161 void _handleData(T inputEvent) { |
| 133 bool satisfies; | 162 bool satisfies; |
| 134 try { | 163 try { |
| 135 satisfies = _test(inputEvent); | 164 satisfies = _test(inputEvent); |
| 136 } catch (e, s) { | 165 } catch (e, s) { |
| 137 _signalError(_asyncError(e, s)); | 166 _stream._signalError(_asyncError(e, s)); |
| 138 return; | 167 return; |
| 139 } | 168 } |
| 140 if (satisfies) { | 169 if (satisfies) { |
| 141 _add(inputEvent); | 170 _stream._add(inputEvent); |
| 142 } | 171 } |
| 143 } | 172 } |
| 144 } | 173 } |
| 145 | 174 |
| 146 | 175 |
| 147 typedef T _Transformation<S, T>(S value); | 176 typedef T _Transformation<S, T>(S value); |
| 148 | 177 |
| 149 /** | 178 /** |
| 150 * A stream pipe that converts data events before passing them on. | 179 * A stream pipe that converts data events before passing them on. |
| 151 */ | 180 */ |
| 152 class MapStream<S, T> extends _ForwardingTransformer<S, T> { | 181 class MapTransformer<S, T> extends _ForwardingStreamTransformer<S, T> { |
| 153 final _Transformation _transform; | 182 final _Transformation _transform; |
| 154 | 183 |
| 155 MapStream(T transform(S event)) | 184 MapTransformer(T transform(S event)) |
| 156 : this._transform = transform; | 185 : this._transform = transform; |
| 157 | 186 |
| 158 void _handleData(S inputEvent) { | 187 void _handleData(S inputEvent) { |
| 159 T outputEvent; | 188 T outputEvent; |
| 160 try { | 189 try { |
| 161 outputEvent = _transform(inputEvent); | 190 outputEvent = _transform(inputEvent); |
| 162 } catch (e, s) { | 191 } catch (e, s) { |
| 163 _signalError(_asyncError(e, s)); | 192 _stream._signalError(_asyncError(e, s)); |
| 164 return; | 193 return; |
| 165 } | 194 } |
| 166 _add(outputEvent); | 195 _stream._add(outputEvent); |
| 167 } | 196 } |
| 168 } | 197 } |
| 169 | 198 |
| 170 /** | 199 /** |
| 171 * A stream pipe that converts data events before passing them on. | 200 * A stream pipe that converts data events before passing them on. |
| 172 */ | 201 */ |
| 173 class ExpandStream<S, T> extends _ForwardingTransformer<S, T> { | 202 class ExpandTransformer<S, T> extends _ForwardingStreamTransformer<S, T> { |
| 174 final _Transformation<S, Iterable<T>> _expand; | 203 final _Transformation<S, Iterable<T>> _expand; |
| 175 | 204 |
| 176 ExpandStream(Iterable<T> expand(S event)) | 205 ExpandTransformer(Iterable<T> expand(S event)) |
| 177 : this._expand = expand; | 206 : this._expand = expand; |
| 178 | 207 |
| 179 void _handleData(S inputEvent) { | 208 void _handleData(S inputEvent) { |
| 180 try { | 209 try { |
| 181 for (T value in _expand(inputEvent)) { | 210 for (T value in _expand(inputEvent)) { |
| 182 _add(value); | 211 _stream._add(value); |
| 183 } | 212 } |
| 184 } catch (e, s) { | 213 } catch (e, s) { |
| 185 // If either _expand or iterating the generated iterator throws, | 214 // If either _expand or iterating the generated iterator throws, |
| 186 // we abort the iteration. | 215 // we abort the iteration. |
| 187 _signalError(_asyncError(e, s)); | 216 _stream._signalError(_asyncError(e, s)); |
| 188 } | 217 } |
| 189 } | 218 } |
| 190 } | 219 } |
| 191 | 220 |
| 192 | 221 |
| 193 typedef void _ErrorTransformation(AsyncError error); | 222 typedef void _ErrorTransformation(AsyncError error); |
| 194 typedef bool _ErrorTest(error); | 223 typedef bool _ErrorTest(error); |
| 195 | 224 |
| 196 /** | 225 /** |
| 197 * A stream pipe that converts or disposes error events | 226 * A stream pipe that converts or disposes error events |
| 198 * before passing them on. | 227 * before passing them on. |
| 199 */ | 228 */ |
| 200 class HandleErrorStream<T> extends _ForwardingTransformer<T, T> { | 229 class HandleErrorTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
| 201 final _ErrorTransformation _transform; | 230 final _ErrorTransformation _transform; |
| 202 final _ErrorTest _test; | 231 final _ErrorTest _test; |
| 203 | 232 |
| 204 HandleErrorStream(void transform(AsyncError event), bool test(error)) | 233 HandleErrorTransformer(void transform(AsyncError event), bool test(error)) |
| 205 : this._transform = transform, this._test = test; | 234 : this._transform = transform, this._test = test; |
| 206 | 235 |
| 207 void _handleError(AsyncError error) { | 236 void _handleError(AsyncError error) { |
| 208 bool matches = true; | 237 bool matches = true; |
| 209 if (_test != null) { | 238 if (_test != null) { |
| 210 try { | 239 try { |
| 211 matches = _test(error.error); | 240 matches = _test(error.error); |
| 212 } catch (e, s) { | 241 } catch (e, s) { |
| 213 _signalError(_asyncError(e, s, error)); | 242 _stream._signalError(_asyncError(e, s, error)); |
| 214 return; | 243 return; |
| 215 } | 244 } |
| 216 } | 245 } |
| 217 if (matches) { | 246 if (matches) { |
| 218 try { | 247 try { |
| 219 _transform(error); | 248 _transform(error); |
| 220 } catch (e, s) { | 249 } catch (e, s) { |
| 221 _signalError(_asyncError(e, s, error)); | 250 _stream._signalError(_asyncError(e, s, error)); |
| 222 return; | 251 return; |
| 223 } | 252 } |
| 224 } else { | 253 } else { |
| 225 _signalError(error); | 254 _stream._signalError(error); |
| 226 } | 255 } |
| 227 } | 256 } |
| 228 } | 257 } |
| 229 | 258 |
| 230 | 259 |
| 231 typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink); | 260 typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink); |
| 232 typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink); | 261 typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink); |
| 233 typedef void _TransformDoneHandler<T>(StreamSink<T> sink); | 262 typedef void _TransformDoneHandler<T>(StreamSink<T> sink); |
| 234 | 263 |
| 235 /** | 264 /** |
| 236 * A stream pipe that intercepts all events and can generate any event as | 265 * A stream transfomer that intercepts all events and can generate any event as |
| 237 * output. | 266 * output. |
| 238 * | 267 * |
| 239 * Each incoming event on this [StreamSink] is passed to the corresponding | 268 * Each incoming event on the source stream is passed to the corresponding |
| 240 * provided event handler, along with a [StreamSink] linked to the [output] of | 269 * provided event handler, along with a [StreamSink] linked to the output |
| 241 * this pipe. | 270 * Stream. |
| 242 * The handler can then decide which events to send to the output | 271 * The handler can then decide exactly which events to send to the output. |
| 243 */ | 272 */ |
| 244 class PipeStream<S, T> extends _ForwardingTransformer<S, T> { | 273 class _StreamTransformerImpl<S, T> extends _ForwardingStreamTransformer<S, T> { |
| 245 final _TransformDataHandler<S, T> _onData; | 274 final _TransformDataHandler<S, T> _onData; |
| 246 final _TransformErrorHandler<T> _onError; | 275 final _TransformErrorHandler<T> _onError; |
| 247 final _TransformDoneHandler<T> _onDone; | 276 final _TransformDoneHandler<T> _onDone; |
| 248 StreamSink<T> _sink; | 277 StreamSink<T> _sink; |
| 249 | 278 |
| 250 PipeStream({void onData(S data, StreamSink<T> sink), | 279 _StreamTransformerImpl(void onData(S data, StreamSink<T> sink), |
| 251 void onError(AsyncError data, StreamSink<T> sink), | 280 void onError(AsyncError data, StreamSink<T> sink), |
| 252 void onDone(StreamSink<T> sink)}) | 281 void onDone(StreamSink<T> sink)) |
| 253 : this._onData = (onData == null ? _defaultHandleData : onData), | 282 : this._onData = (onData == null ? _defaultHandleData : onData), |
| 254 this._onError = (onError == null ? _defaultHandleError : onError), | 283 this._onError = (onError == null ? _defaultHandleError : onError), |
| 255 this._onDone = (onDone == null ? _defaultHandleDone : onDone) { | 284 this._onDone = (onDone == null ? _defaultHandleDone : onDone); |
| 256 // Cache the sink wrapper to avoid creating a new one for each event. | 285 |
| 257 this._sink = new _StreamImplSink(this); | 286 Stream<T> bind(Stream<S> source) { |
| 287 Stream<T> stream = super.bind(source); |
| 288 // Cache a Sink object to avoid creating a new one for each event. |
| 289 _sink = new _StreamImplSink(stream); |
| 290 return stream; |
| 258 } | 291 } |
| 259 | 292 |
| 260 void _handleData(S data) { | 293 void _handleData(S data) { |
| 261 try { | 294 try { |
| 262 return _onData(data, _sink); | 295 _onData(data, _sink); |
| 263 } catch (e, s) { | 296 } catch (e, s) { |
| 264 _signalError(_asyncError(e, s)); | 297 _stream._signalError(_asyncError(e, s)); |
| 265 } | 298 } |
| 266 } | 299 } |
| 267 | 300 |
| 268 void _handleError(AsyncError error) { | 301 void _handleError(AsyncError error) { |
| 269 try { | 302 try { |
| 270 _onError(error, _sink); | 303 _onError(error, _sink); |
| 271 } catch (e, s) { | 304 } catch (e, s) { |
| 272 _signalError(_asyncError(e, s, error)); | 305 _stream._signalError(_asyncError(e, s, error)); |
| 273 } | 306 } |
| 274 } | 307 } |
| 275 | 308 |
| 276 void _handleDone() { | 309 void _handleDone() { |
| 277 try { | 310 try { |
| 278 _onDone(_sink); | 311 _onDone(_sink); |
| 279 } catch (e, s) { | 312 } catch (e, s) { |
| 280 _signalError(_asyncError(e, s)); | 313 _stream._signalError(_asyncError(e, s)); |
| 281 } | 314 } |
| 282 } | 315 } |
| 283 | 316 |
| 284 /** Default data handler forwards all data. */ | 317 /** Default data handler forwards all data. */ |
| 285 static void _defaultHandleData(dynamic data, StreamSink sink) { | 318 static void _defaultHandleData(var data, StreamSink sink) { |
| 286 sink.add(data); | 319 sink.add(data); |
| 287 } | 320 } |
| 288 /** Default error handler forwards all errors. */ | 321 /** Default error handler forwards all errors. */ |
| 289 static void _defaultHandleError(AsyncError error, StreamSink sink) { | 322 static void _defaultHandleError(AsyncError error, StreamSink sink) { |
| 290 sink.signalError(error); | 323 sink.signalError(error); |
| 291 } | 324 } |
| 292 /** Default done handler forwards done. */ | 325 /** Default done handler forwards done. */ |
| 293 static void _defaultHandleDone(StreamSink sink) { | 326 static void _defaultHandleDone(StreamSink sink) { |
| 294 sink.close(); | 327 sink.close(); |
| 295 } | 328 } |
| 296 } | 329 } |
| 297 | 330 |
| 298 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */ | 331 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */ |
| 299 class _StreamImplSink<T> implements StreamSink<T> { | 332 class _StreamImplSink<T> implements StreamSink<T> { |
| 300 _StreamImpl<T> _target; | 333 _StreamImpl<T> _target; |
| 301 _StreamImplSink(this._target); | 334 _StreamImplSink(this._target); |
| 302 void add(T data) { _target._add(data); } | 335 void add(T data) { _target._add(data); } |
| 303 void signalError(AsyncError error) { _target._signalError(error); } | 336 void signalError(AsyncError error) { _target._signalError(error); } |
| 304 void close() { _target._close(); } | 337 void close() { _target._close(); } |
| 305 } | 338 } |
| 306 | 339 |
| 307 /** | |
| 308 * A stream pipe that intercepts all events and can generate any event as | |
| 309 * output. | |
| 310 * | |
| 311 * Each incoming event on this [StreamSink] is passed to the corresponding | |
| 312 * method on [transform], along with a [StreamSink] linked to the [output] of | |
| 313 * this pipe. | |
| 314 * The handler can then decide which events to send to the output | |
| 315 */ | |
| 316 class TransformStream<S, T> extends _ForwardingTransformer<S, T> { | |
| 317 final StreamTransformer<S, T> _transform; | |
| 318 StreamSink<T> _sink; | |
| 319 | 340 |
| 320 TransformStream(StreamTransformer<S, T> transform) | 341 class TakeTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
| 321 : this._transform = transform { | |
| 322 // Cache the sink wrapper to avoid creating a new one for each event. | |
| 323 this._sink = new _StreamImplSink(this); | |
| 324 } | |
| 325 | |
| 326 void _handleData(S data) { | |
| 327 try { | |
| 328 return _transform.handleData(data, _sink); | |
| 329 } catch (e, s) { | |
| 330 _controller.signalError(_asyncError(e, s)); | |
| 331 } | |
| 332 } | |
| 333 | |
| 334 void _handleError(AsyncError error) { | |
| 335 try { | |
| 336 _transform.handleError(error, _sink); | |
| 337 } catch (e, s) { | |
| 338 _controller.signalError(_asyncError(e, s, error)); | |
| 339 } | |
| 340 } | |
| 341 | |
| 342 void _handleDone() { | |
| 343 try { | |
| 344 _transform.handleDone(_sink); | |
| 345 } catch (e, s) { | |
| 346 _controller.signalError(_asyncError(e, s)); | |
| 347 } | |
| 348 } | |
| 349 } | |
| 350 | |
| 351 | |
| 352 /** Helper class for transforming three functions into a StreamTransformer. */ | |
| 353 class _StreamTransformerFunctionWrapper<S, T> | |
| 354 extends _StreamTransformer<S, T> { | |
| 355 final _TransformDataHandler<S, T> _handleData; | |
| 356 final _TransformErrorHandler<T> _handleError; | |
| 357 final _TransformDoneHandler<T> _handleDone; | |
| 358 | |
| 359 _StreamTransformerFunctionWrapper({ | |
| 360 void onData(S data, StreamSink<T> sink), | |
| 361 void onError(AsyncError data, StreamSink<T> sink), | |
| 362 void onDone(StreamSink<T> sink)}) | |
| 363 : _handleData = onData != null ? onData : PipeStream._defaultHandleData, | |
| 364 _handleError = onError != null ? onError | |
| 365 : PipeStream._defaultHandleError, | |
| 366 _handleDone = onDone != null ? onDone : PipeStream._defaultHandleDone; | |
| 367 | |
| 368 void handleData(S data, StreamSink<T> sink) { | |
| 369 return _handleData(data, sink); | |
| 370 } | |
| 371 | |
| 372 void handleError(AsyncError error, StreamSink<T> sink) { | |
| 373 _handleError(error, sink); | |
| 374 } | |
| 375 | |
| 376 void handleDone(StreamSink<T> sink) { | |
| 377 _handleDone(sink); | |
| 378 } | |
| 379 } | |
| 380 | |
| 381 | |
| 382 class TakeStream<T> extends _ForwardingTransformer<T, T> { | |
| 383 int _remaining; | 342 int _remaining; |
| 384 | 343 |
| 385 TakeStream(int count) | 344 TakeTransformer(int count) |
| 386 : this._remaining = count { | 345 : this._remaining = count { |
| 387 // This test is done early to avoid handling an async error | 346 // This test is done early to avoid handling an async error |
| 388 // in the _handleData method. | 347 // in the _handleData method. |
| 389 if (count is! int) throw new ArgumentError(count); | 348 if (count is! int) throw new ArgumentError(count); |
| 390 } | 349 } |
| 391 | 350 |
| 392 void _handleData(T inputEvent) { | 351 void _handleData(T inputEvent) { |
| 393 if (_remaining > 0) { | 352 if (_remaining > 0) { |
| 394 _add(inputEvent); | 353 _stream._add(inputEvent); |
| 395 _remaining -= 1; | 354 _remaining -= 1; |
| 396 if (_remaining == 0) { | 355 if (_remaining == 0) { |
| 397 // Closing also unsubscribes all subscribers, which unsubscribes | 356 // Closing also unsubscribes all subscribers, which unsubscribes |
| 398 // this from source. | 357 // this from source. |
| 399 _close(); | 358 _stream._close(); |
| 400 } | 359 } |
| 401 } | 360 } |
| 402 } | 361 } |
| 403 } | 362 } |
| 404 | 363 |
| 405 | 364 |
| 406 class TakeWhileStream<T> extends _ForwardingTransformer<T, T> { | 365 class TakeWhileTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
| 407 final _Predicate<T> _test; | 366 final _Predicate<T> _test; |
| 408 | 367 |
| 409 TakeWhileStream(bool test(T value)) | 368 TakeWhileTransformer(bool test(T value)) |
| 410 : this._test = test; | 369 : this._test = test; |
| 411 | 370 |
| 412 void _handleData(T inputEvent) { | 371 void _handleData(T inputEvent) { |
| 413 bool satisfies; | 372 bool satisfies; |
| 414 try { | 373 try { |
| 415 satisfies = _test(inputEvent); | 374 satisfies = _test(inputEvent); |
| 416 } catch (e, s) { | 375 } catch (e, s) { |
| 417 _signalError(_asyncError(e, s)); | 376 _stream._signalError(_asyncError(e, s)); |
| 418 // The test didn't say true. Didn't say false either, but we stop anyway. | 377 // The test didn't say true. Didn't say false either, but we stop anyway. |
| 419 _close(); | 378 _stream._close(); |
| 420 return; | 379 return; |
| 421 } | 380 } |
| 422 if (satisfies) { | 381 if (satisfies) { |
| 423 _add(inputEvent); | 382 _stream._add(inputEvent); |
| 424 } else { | 383 } else { |
| 425 _close(); | 384 _stream._close(); |
| 426 } | 385 } |
| 427 } | 386 } |
| 428 } | 387 } |
| 429 | 388 |
| 430 class SkipStream<T> extends _ForwardingTransformer<T, T> { | 389 class SkipTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
| 431 int _remaining; | 390 int _remaining; |
| 432 | 391 |
| 433 SkipStream(int count) | 392 SkipTransformer(int count) |
| 434 : this._remaining = count{ | 393 : this._remaining = count{ |
| 435 // This test is done early to avoid handling an async error | 394 // This test is done early to avoid handling an async error |
| 436 // in the _handleData method. | 395 // in the _handleData method. |
| 437 if (count is! int) throw new ArgumentError(count); | 396 if (count is! int || count < 0) throw new ArgumentError(count); |
| 438 } | 397 } |
| 439 | 398 |
| 440 void _handleData(T inputEvent) { | 399 void _handleData(T inputEvent) { |
| 441 if (_remaining > 0) { | 400 if (_remaining > 0) { |
| 442 _remaining--; | 401 _remaining--; |
| 443 return; | 402 return; |
| 444 } | 403 } |
| 445 return _add(inputEvent); | 404 return _stream._add(inputEvent); |
| 446 } | 405 } |
| 447 } | 406 } |
| 448 | 407 |
| 449 class SkipWhileStream<T> extends _ForwardingTransformer<T, T> { | 408 class SkipWhileTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
| 450 final _Predicate<T> _test; | 409 final _Predicate<T> _test; |
| 451 bool _hasFailed = false; | 410 bool _hasFailed = false; |
| 452 | 411 |
| 453 SkipWhileStream(bool test(T value)) | 412 SkipWhileTransformer(bool test(T value)) |
| 454 : this._test = test; | 413 : this._test = test; |
| 455 | 414 |
| 456 void _handleData(T inputEvent) { | 415 void _handleData(T inputEvent) { |
| 457 if (_hasFailed) { | 416 if (_hasFailed) { |
| 458 _add(inputEvent); | 417 _stream._add(inputEvent); |
| 459 } | 418 } |
| 460 bool satisfies; | 419 bool satisfies; |
| 461 try { | 420 try { |
| 462 satisfies = _test(inputEvent); | 421 satisfies = _test(inputEvent); |
| 463 } catch (e, s) { | 422 } catch (e, s) { |
| 464 _signalError(_asyncError(e, s)); | 423 _stream._signalError(_asyncError(e, s)); |
| 465 // A failure to return a boolean is considered "not matching". | 424 // A failure to return a boolean is considered "not matching". |
| 466 _hasFailed = true; | 425 _hasFailed = true; |
| 467 return; | 426 return; |
| 468 } | 427 } |
| 469 if (!satisfies) { | 428 if (!satisfies) { |
| 470 _hasFailed = true; | 429 _hasFailed = true; |
| 471 _add(inputEvent); | 430 _stream._add(inputEvent); |
| 472 } | 431 } |
| 473 } | 432 } |
| 474 } | 433 } |
| 475 | 434 |
| 476 typedef bool _Equality<T>(T a, T b); | 435 typedef bool _Equality<T>(T a, T b); |
| 477 | 436 |
| 478 class DistinctStream<T> extends _ForwardingTransformer<T, T> { | 437 class DistinctTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
| 479 static var _SENTINEL = new Object(); | 438 static var _SENTINEL = new Object(); |
| 480 | 439 |
| 481 _Equality<T> _equals; | 440 _Equality<T> _equals; |
| 482 var _previous = _SENTINEL; | 441 var _previous = _SENTINEL; |
| 483 | 442 |
| 484 DistinctStream(bool equals(T a, T b)) | 443 DistinctTransformer(bool equals(T a, T b)) |
| 485 : _equals = equals; | 444 : _equals = equals; |
| 486 | 445 |
| 487 void _handleData(T inputEvent) { | 446 void _handleData(T inputEvent) { |
| 488 if (identical(_previous, _SENTINEL)) { | 447 if (identical(_previous, _SENTINEL)) { |
| 489 _previous = inputEvent; | 448 _previous = inputEvent; |
| 490 return _add(inputEvent); | 449 return _stream._add(inputEvent); |
| 491 } else { | 450 } else { |
| 492 bool isEqual; | 451 bool isEqual; |
| 493 try { | 452 try { |
| 494 if (_equals == null) { | 453 if (_equals == null) { |
| 495 isEqual = (_previous == inputEvent); | 454 isEqual = (_previous == inputEvent); |
| 496 } else { | 455 } else { |
| 497 isEqual = _equals(_previous, inputEvent); | 456 isEqual = _equals(_previous, inputEvent); |
| 498 } | 457 } |
| 499 } catch (e, s) { | 458 } catch (e, s) { |
| 500 _signalError(_asyncError(e, s)); | 459 _stream._signalError(_asyncError(e, s)); |
| 501 return null; | 460 return null; |
| 502 } | 461 } |
| 503 if (!isEqual) { | 462 if (!isEqual) { |
| 504 _add(inputEvent); | 463 _stream._add(inputEvent); |
| 505 _previous = inputEvent; | 464 _previous = inputEvent; |
| 506 } | 465 } |
| 507 } | 466 } |
| 508 } | 467 } |
| 509 } | 468 } |
| OLD | NEW |