| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** | 7 /** |
| 8 * Utility function to attach a stack trace to an [error] if it doesn't have | 8 * Utility function to attach a stack trace to an [error] if it doesn't have |
| 9 * one already. | 9 * one already. |
| 10 */ | 10 */ |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 62 StreamSubscription<T> _createSubscription(void onData(T value), | 62 StreamSubscription<T> _createSubscription(void onData(T value), |
| 63 void onError(error), | 63 void onError(error), |
| 64 void onDone(), | 64 void onDone(), |
| 65 bool cancelOnError) { | 65 bool cancelOnError) { |
| 66 return new _ForwardingStreamSubscription<S, T>( | 66 return new _ForwardingStreamSubscription<S, T>( |
| 67 this, onData, onError, onDone, cancelOnError); | 67 this, onData, onError, onDone, cancelOnError); |
| 68 } | 68 } |
| 69 | 69 |
| 70 // Override the following methods in subclasses to change the behavior. | 70 // Override the following methods in subclasses to change the behavior. |
| 71 | 71 |
| 72 void _handleData(S data, _EventOutputSink<T> sink) { | 72 void _handleData(S data, _EventSink<T> sink) { |
| 73 var outputData = data; | 73 var outputData = data; |
| 74 sink._sendData(outputData); | 74 sink._add(outputData); |
| 75 } | 75 } |
| 76 | 76 |
| 77 void _handleError(error, _EventOutputSink<T> sink) { | 77 void _handleError(error, _EventSink<T> sink) { |
| 78 sink._sendError(error); | 78 sink._addError(error); |
| 79 } | 79 } |
| 80 | 80 |
| 81 void _handleDone(_EventOutputSink<T> sink) { | 81 void _handleDone(_EventSink<T> sink) { |
| 82 sink._sendDone(); | 82 sink._close(); |
| 83 } | 83 } |
| 84 } | 84 } |
| 85 | 85 |
| 86 /** | 86 /** |
| 87 * Common behavior of [StreamSubscription] classes. | |
| 88 * | |
| 89 * Stores and allows updating of the event handlers of a [StreamSubscription]. | |
| 90 */ | |
| 91 abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> { | |
| 92 // TODO(ahe): Restore type when feature is implemented in dart2js | |
| 93 // checked mode. http://dartbug.com/7733 | |
| 94 var /* _DataHandler<T> */ _onData; | |
| 95 _ErrorHandler _onError; | |
| 96 _DoneHandler _onDone; | |
| 97 | |
| 98 _BaseStreamSubscription(this._onData, | |
| 99 this._onError, | |
| 100 this._onDone) { | |
| 101 if (_onData == null) _onData = _nullDataHandler; | |
| 102 if (_onError == null) _onError = _nullErrorHandler; | |
| 103 if (_onDone == null) _onDone = _nullDoneHandler; | |
| 104 } | |
| 105 | |
| 106 // StreamSubscription interface. | |
| 107 void onData(void handleData(T event)) { | |
| 108 if (handleData == null) handleData = _nullDataHandler; | |
| 109 _onData = handleData; | |
| 110 } | |
| 111 | |
| 112 void onError(void handleError(error)) { | |
| 113 if (handleError == null) handleError = _nullErrorHandler; | |
| 114 _onError = handleError; | |
| 115 } | |
| 116 | |
| 117 void onDone(void handleDone()) { | |
| 118 if (handleDone == null) handleDone = _nullDoneHandler; | |
| 119 _onDone = handleDone; | |
| 120 } | |
| 121 | |
| 122 void pause([Future resumeSignal]); | |
| 123 | |
| 124 void resume(); | |
| 125 | |
| 126 void cancel(); | |
| 127 | |
| 128 Future asFuture([var futureValue]) { | |
| 129 _FutureImpl<T> result = new _FutureImpl<T>(); | |
| 130 | |
| 131 // Overwrite the onDone and onError handlers. | |
| 132 onDone(() { result._setValue(futureValue); }); | |
| 133 onError((error) { | |
| 134 cancel(); | |
| 135 result._setError(error); | |
| 136 }); | |
| 137 | |
| 138 return result; | |
| 139 } | |
| 140 } | |
| 141 | |
| 142 | |
| 143 /** | |
| 144 * Abstract superclass for subscriptions that forward to other subscriptions. | 87 * Abstract superclass for subscriptions that forward to other subscriptions. |
| 145 */ | 88 */ |
| 146 class _ForwardingStreamSubscription<S, T> | 89 class _ForwardingStreamSubscription<S, T> |
| 147 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { | 90 extends _BufferingStreamSubscription<T> { |
| 148 final _ForwardingStream<S, T> _stream; | 91 final _ForwardingStream<S, T> _stream; |
| 149 final bool _cancelOnError; | |
| 150 | 92 |
| 151 StreamSubscription<S> _subscription; | 93 StreamSubscription<S> _subscription; |
| 152 | 94 |
| 153 _ForwardingStreamSubscription(this._stream, | 95 _ForwardingStreamSubscription(this._stream, |
| 154 void onData(T data), | 96 void onData(T data), |
| 155 void onError(error), | 97 void onError(error), |
| 156 void onDone(), | 98 void onDone(), |
| 157 this._cancelOnError) | 99 bool cancelOnError) |
| 158 : super(onData, onError, onDone) { | 100 : super(onData, onError, onDone, cancelOnError) { |
| 159 // Don't unsubscribe on incoming error, only if we send an error forwards. | |
| 160 _subscription = | 101 _subscription = |
| 161 _stream._source.listen(_handleData, | 102 _stream._source.listen(_handleData, |
| 162 onError: _handleError, | 103 onError: _handleError, |
| 163 onDone: _handleDone); | 104 onDone: _handleDone); |
| 164 } | 105 } |
| 165 | 106 |
| 166 // StreamSubscription interface. | 107 // _StreamSink interface. |
| 108 // Transformers sending more than one event have no way to know if the stream |
| 109 // is canceled or closed after the first, so we just ignore remaining events. |
| 167 | 110 |
| 168 void pause([Future resumeSignal]) { | 111 void _add(T data) { |
| 169 if (_subscription == null) return; | 112 if (_isClosed) return; |
| 170 _subscription.pause(resumeSignal); | 113 super._add(data); |
| 171 } | 114 } |
| 172 | 115 |
| 173 void resume() { | 116 void _addError(Object error) { |
| 117 if (_isClosed) return; |
| 118 super._addError(error); |
| 119 } |
| 120 |
| 121 // StreamSubscription callbacks. |
| 122 |
| 123 void _onPause() { |
| 124 if (_subscription == null) return; |
| 125 _subscription.pause(); |
| 126 } |
| 127 |
| 128 void _onResume() { |
| 174 if (_subscription == null) return; | 129 if (_subscription == null) return; |
| 175 _subscription.resume(); | 130 _subscription.resume(); |
| 176 } | 131 } |
| 177 | 132 |
| 178 bool get isPaused { | 133 void _onCancel() { |
| 179 if (_subscription == null) return false; | |
| 180 return _subscription.isPaused; | |
| 181 } | |
| 182 | |
| 183 void cancel() { | |
| 184 if (_subscription != null) { | 134 if (_subscription != null) { |
| 185 _subscription.cancel(); | 135 StreamSubscription subscription = _subscription; |
| 186 _subscription = null; | 136 _subscription = null; |
| 137 subscription.cancel(); |
| 187 } | 138 } |
| 188 } | 139 } |
| 189 | 140 |
| 190 // _EventOutputSink interface. Sends data to this subscription. | |
| 191 | |
| 192 void _sendData(T data) { | |
| 193 _onData(data); | |
| 194 } | |
| 195 | |
| 196 void _sendError(error) { | |
| 197 _onError(error); | |
| 198 if (_cancelOnError) { | |
| 199 _subscription.cancel(); | |
| 200 _subscription = null; | |
| 201 } | |
| 202 } | |
| 203 | |
| 204 void _sendDone() { | |
| 205 // If the transformation sends a done signal, we stop the subscription. | |
| 206 if (_subscription != null) { | |
| 207 _subscription.cancel(); | |
| 208 _subscription = null; | |
| 209 } | |
| 210 _onDone(); | |
| 211 } | |
| 212 | |
| 213 // Methods used as listener on source subscription. | 141 // Methods used as listener on source subscription. |
| 214 | 142 |
| 215 // TODO(ahe): Restore type when feature is implemented in dart2js | 143 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 216 // checked mode. http://dartbug.com/7733 | 144 // checked mode. http://dartbug.com/7733 |
| 217 void _handleData(/*S*/ data) { | 145 void _handleData(/*S*/ data) { |
| 218 _stream._handleData(data, this); | 146 _stream._handleData(data, this); |
| 219 } | 147 } |
| 220 | 148 |
| 221 void _handleError(error) { | 149 void _handleError(error) { |
| 222 _stream._handleError(error, this); | 150 _stream._handleError(error, this); |
| (...skipping 11 matching lines...) Expand all Loading... |
| 234 // ------------------------------------------------------------------- | 162 // ------------------------------------------------------------------- |
| 235 | 163 |
| 236 typedef bool _Predicate<T>(T value); | 164 typedef bool _Predicate<T>(T value); |
| 237 | 165 |
| 238 class _WhereStream<T> extends _ForwardingStream<T, T> { | 166 class _WhereStream<T> extends _ForwardingStream<T, T> { |
| 239 final _Predicate<T> _test; | 167 final _Predicate<T> _test; |
| 240 | 168 |
| 241 _WhereStream(Stream<T> source, bool test(T value)) | 169 _WhereStream(Stream<T> source, bool test(T value)) |
| 242 : _test = test, super(source); | 170 : _test = test, super(source); |
| 243 | 171 |
| 244 void _handleData(T inputEvent, _EventOutputSink<T> sink) { | 172 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 245 bool satisfies; | 173 bool satisfies; |
| 246 try { | 174 try { |
| 247 satisfies = _test(inputEvent); | 175 satisfies = _test(inputEvent); |
| 248 } catch (e, s) { | 176 } catch (e, s) { |
| 249 sink._sendError(_asyncError(e, s)); | 177 sink._addError(_asyncError(e, s)); |
| 250 return; | 178 return; |
| 251 } | 179 } |
| 252 if (satisfies) { | 180 if (satisfies) { |
| 253 sink._sendData(inputEvent); | 181 sink._add(inputEvent); |
| 254 } | 182 } |
| 255 } | 183 } |
| 256 } | 184 } |
| 257 | 185 |
| 258 | 186 |
| 259 typedef T _Transformation<S, T>(S value); | 187 typedef T _Transformation<S, T>(S value); |
| 260 | 188 |
| 261 /** | 189 /** |
| 262 * A stream pipe that converts data events before passing them on. | 190 * A stream pipe that converts data events before passing them on. |
| 263 */ | 191 */ |
| 264 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 192 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
| 265 final _Transformation _transform; | 193 final _Transformation _transform; |
| 266 | 194 |
| 267 _MapStream(Stream<S> source, T transform(S event)) | 195 _MapStream(Stream<S> source, T transform(S event)) |
| 268 : this._transform = transform, super(source); | 196 : this._transform = transform, super(source); |
| 269 | 197 |
| 270 void _handleData(S inputEvent, _EventOutputSink<T> sink) { | 198 void _handleData(S inputEvent, _EventSink<T> sink) { |
| 271 T outputEvent; | 199 T outputEvent; |
| 272 try { | 200 try { |
| 273 outputEvent = _transform(inputEvent); | 201 outputEvent = _transform(inputEvent); |
| 274 } catch (e, s) { | 202 } catch (e, s) { |
| 275 sink._sendError(_asyncError(e, s)); | 203 sink._addError(_asyncError(e, s)); |
| 276 return; | 204 return; |
| 277 } | 205 } |
| 278 sink._sendData(outputEvent); | 206 sink._add(outputEvent); |
| 279 } | 207 } |
| 280 } | 208 } |
| 281 | 209 |
| 282 /** | 210 /** |
| 283 * A stream pipe that converts data events before passing them on. | 211 * A stream pipe that converts data events before passing them on. |
| 284 */ | 212 */ |
| 285 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | 213 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
| 286 final _Transformation<S, Iterable<T>> _expand; | 214 final _Transformation<S, Iterable<T>> _expand; |
| 287 | 215 |
| 288 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | 216 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
| 289 : this._expand = expand, super(source); | 217 : this._expand = expand, super(source); |
| 290 | 218 |
| 291 void _handleData(S inputEvent, _EventOutputSink<T> sink) { | 219 void _handleData(S inputEvent, _EventSink<T> sink) { |
| 292 try { | 220 try { |
| 293 for (T value in _expand(inputEvent)) { | 221 for (T value in _expand(inputEvent)) { |
| 294 sink._sendData(value); | 222 sink._add(value); |
| 295 } | 223 } |
| 296 } catch (e, s) { | 224 } catch (e, s) { |
| 297 // If either _expand or iterating the generated iterator throws, | 225 // If either _expand or iterating the generated iterator throws, |
| 298 // we abort the iteration. | 226 // we abort the iteration. |
| 299 sink._sendError(_asyncError(e, s)); | 227 sink._addError(_asyncError(e, s)); |
| 300 } | 228 } |
| 301 } | 229 } |
| 302 } | 230 } |
| 303 | 231 |
| 304 | 232 |
| 305 typedef void _ErrorTransformation(error); | 233 typedef void _ErrorTransformation(error); |
| 306 typedef bool _ErrorTest(error); | 234 typedef bool _ErrorTest(error); |
| 307 | 235 |
| 308 /** | 236 /** |
| 309 * A stream pipe that converts or disposes error events | 237 * A stream pipe that converts or disposes error events |
| 310 * before passing them on. | 238 * before passing them on. |
| 311 */ | 239 */ |
| 312 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 240 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
| 313 final _ErrorTransformation _transform; | 241 final _ErrorTransformation _transform; |
| 314 final _ErrorTest _test; | 242 final _ErrorTest _test; |
| 315 | 243 |
| 316 _HandleErrorStream(Stream<T> source, | 244 _HandleErrorStream(Stream<T> source, |
| 317 void transform(event), | 245 void transform(event), |
| 318 bool test(error)) | 246 bool test(error)) |
| 319 : this._transform = transform, this._test = test, super(source); | 247 : this._transform = transform, this._test = test, super(source); |
| 320 | 248 |
| 321 void _handleError(Object error, _EventOutputSink<T> sink) { | 249 void _handleError(Object error, _EventSink<T> sink) { |
| 322 bool matches = true; | 250 bool matches = true; |
| 323 if (_test != null) { | 251 if (_test != null) { |
| 324 try { | 252 try { |
| 325 matches = _test(error); | 253 matches = _test(error); |
| 326 } catch (e, s) { | 254 } catch (e, s) { |
| 327 sink._sendError(_asyncError(e, s)); | 255 sink._addError(_asyncError(e, s)); |
| 328 return; | 256 return; |
| 329 } | 257 } |
| 330 } | 258 } |
| 331 if (matches) { | 259 if (matches) { |
| 332 try { | 260 try { |
| 333 _transform(error); | 261 _transform(error); |
| 334 } catch (e, s) { | 262 } catch (e, s) { |
| 335 sink._sendError(_asyncError(e, s)); | 263 sink._addError(_asyncError(e, s)); |
| 336 return; | 264 return; |
| 337 } | 265 } |
| 338 } else { | 266 } else { |
| 339 sink._sendError(error); | 267 sink._addError(error); |
| 340 } | 268 } |
| 341 } | 269 } |
| 342 } | 270 } |
| 343 | 271 |
| 344 | 272 |
| 345 class _TakeStream<T> extends _ForwardingStream<T, T> { | 273 class _TakeStream<T> extends _ForwardingStream<T, T> { |
| 346 int _remaining; | 274 int _remaining; |
| 347 | 275 |
| 348 _TakeStream(Stream<T> source, int count) | 276 _TakeStream(Stream<T> source, int count) |
| 349 : this._remaining = count, super(source) { | 277 : this._remaining = count, super(source) { |
| 350 // This test is done early to avoid handling an async error | 278 // This test is done early to avoid handling an async error |
| 351 // in the _handleData method. | 279 // in the _handleData method. |
| 352 if (count is! int) throw new ArgumentError(count); | 280 if (count is! int) throw new ArgumentError(count); |
| 353 } | 281 } |
| 354 | 282 |
| 355 void _handleData(T inputEvent, _EventOutputSink<T> sink) { | 283 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 356 if (_remaining > 0) { | 284 if (_remaining > 0) { |
| 357 sink._sendData(inputEvent); | 285 sink._add(inputEvent); |
| 358 _remaining -= 1; | 286 _remaining -= 1; |
| 359 if (_remaining == 0) { | 287 if (_remaining == 0) { |
| 360 // Closing also unsubscribes all subscribers, which unsubscribes | 288 // Closing also unsubscribes all subscribers, which unsubscribes |
| 361 // this from source. | 289 // this from source. |
| 362 sink._sendDone(); | 290 sink._close(); |
| 363 } | 291 } |
| 364 } | 292 } |
| 365 } | 293 } |
| 366 } | 294 } |
| 367 | 295 |
| 368 | 296 |
| 369 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | 297 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
| 370 final _Predicate<T> _test; | 298 final _Predicate<T> _test; |
| 371 | 299 |
| 372 _TakeWhileStream(Stream<T> source, bool test(T value)) | 300 _TakeWhileStream(Stream<T> source, bool test(T value)) |
| 373 : this._test = test, super(source); | 301 : this._test = test, super(source); |
| 374 | 302 |
| 375 void _handleData(T inputEvent, _EventOutputSink<T> sink) { | 303 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 376 bool satisfies; | 304 bool satisfies; |
| 377 try { | 305 try { |
| 378 satisfies = _test(inputEvent); | 306 satisfies = _test(inputEvent); |
| 379 } catch (e, s) { | 307 } catch (e, s) { |
| 380 sink._sendError(_asyncError(e, s)); | 308 sink._addError(_asyncError(e, s)); |
| 381 // The test didn't say true. Didn't say false either, but we stop anyway. | 309 // The test didn't say true. Didn't say false either, but we stop anyway. |
| 382 sink._sendDone(); | 310 sink._close(); |
| 383 return; | 311 return; |
| 384 } | 312 } |
| 385 if (satisfies) { | 313 if (satisfies) { |
| 386 sink._sendData(inputEvent); | 314 sink._add(inputEvent); |
| 387 } else { | 315 } else { |
| 388 sink._sendDone(); | 316 sink._close(); |
| 389 } | 317 } |
| 390 } | 318 } |
| 391 } | 319 } |
| 392 | 320 |
| 393 class _SkipStream<T> extends _ForwardingStream<T, T> { | 321 class _SkipStream<T> extends _ForwardingStream<T, T> { |
| 394 int _remaining; | 322 int _remaining; |
| 395 | 323 |
| 396 _SkipStream(Stream<T> source, int count) | 324 _SkipStream(Stream<T> source, int count) |
| 397 : this._remaining = count, super(source) { | 325 : this._remaining = count, super(source) { |
| 398 // This test is done early to avoid handling an async error | 326 // This test is done early to avoid handling an async error |
| 399 // in the _handleData method. | 327 // in the _handleData method. |
| 400 if (count is! int || count < 0) throw new ArgumentError(count); | 328 if (count is! int || count < 0) throw new ArgumentError(count); |
| 401 } | 329 } |
| 402 | 330 |
| 403 void _handleData(T inputEvent, _EventOutputSink<T> sink) { | 331 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 404 if (_remaining > 0) { | 332 if (_remaining > 0) { |
| 405 _remaining--; | 333 _remaining--; |
| 406 return; | 334 return; |
| 407 } | 335 } |
| 408 return sink._sendData(inputEvent); | 336 return sink._add(inputEvent); |
| 409 } | 337 } |
| 410 } | 338 } |
| 411 | 339 |
| 412 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { | 340 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
| 413 final _Predicate<T> _test; | 341 final _Predicate<T> _test; |
| 414 bool _hasFailed = false; | 342 bool _hasFailed = false; |
| 415 | 343 |
| 416 _SkipWhileStream(Stream<T> source, bool test(T value)) | 344 _SkipWhileStream(Stream<T> source, bool test(T value)) |
| 417 : this._test = test, super(source); | 345 : this._test = test, super(source); |
| 418 | 346 |
| 419 void _handleData(T inputEvent, _EventOutputSink<T> sink) { | 347 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 420 if (_hasFailed) { | 348 if (_hasFailed) { |
| 421 sink._sendData(inputEvent); | 349 sink._add(inputEvent); |
| 422 return; | 350 return; |
| 423 } | 351 } |
| 424 bool satisfies; | 352 bool satisfies; |
| 425 try { | 353 try { |
| 426 satisfies = _test(inputEvent); | 354 satisfies = _test(inputEvent); |
| 427 } catch (e, s) { | 355 } catch (e, s) { |
| 428 sink._sendError(_asyncError(e, s)); | 356 sink._addError(_asyncError(e, s)); |
| 429 // A failure to return a boolean is considered "not matching". | 357 // A failure to return a boolean is considered "not matching". |
| 430 _hasFailed = true; | 358 _hasFailed = true; |
| 431 return; | 359 return; |
| 432 } | 360 } |
| 433 if (!satisfies) { | 361 if (!satisfies) { |
| 434 _hasFailed = true; | 362 _hasFailed = true; |
| 435 sink._sendData(inputEvent); | 363 sink._add(inputEvent); |
| 436 } | 364 } |
| 437 } | 365 } |
| 438 } | 366 } |
| 439 | 367 |
| 440 typedef bool _Equality<T>(T a, T b); | 368 typedef bool _Equality<T>(T a, T b); |
| 441 | 369 |
| 442 class _DistinctStream<T> extends _ForwardingStream<T, T> { | 370 class _DistinctStream<T> extends _ForwardingStream<T, T> { |
| 443 static var _SENTINEL = new Object(); | 371 static var _SENTINEL = new Object(); |
| 444 | 372 |
| 445 _Equality<T> _equals; | 373 _Equality<T> _equals; |
| 446 var _previous = _SENTINEL; | 374 var _previous = _SENTINEL; |
| 447 | 375 |
| 448 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | 376 _DistinctStream(Stream<T> source, bool equals(T a, T b)) |
| 449 : _equals = equals, super(source); | 377 : _equals = equals, super(source); |
| 450 | 378 |
| 451 void _handleData(T inputEvent, _EventOutputSink<T> sink) { | 379 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 452 if (identical(_previous, _SENTINEL)) { | 380 if (identical(_previous, _SENTINEL)) { |
| 453 _previous = inputEvent; | 381 _previous = inputEvent; |
| 454 return sink._sendData(inputEvent); | 382 return sink._add(inputEvent); |
| 455 } else { | 383 } else { |
| 456 bool isEqual; | 384 bool isEqual; |
| 457 try { | 385 try { |
| 458 if (_equals == null) { | 386 if (_equals == null) { |
| 459 isEqual = (_previous == inputEvent); | 387 isEqual = (_previous == inputEvent); |
| 460 } else { | 388 } else { |
| 461 isEqual = _equals(_previous, inputEvent); | 389 isEqual = _equals(_previous, inputEvent); |
| 462 } | 390 } |
| 463 } catch (e, s) { | 391 } catch (e, s) { |
| 464 sink._sendError(_asyncError(e, s)); | 392 sink._addError(_asyncError(e, s)); |
| 465 return null; | 393 return null; |
| 466 } | 394 } |
| 467 if (!isEqual) { | 395 if (!isEqual) { |
| 468 sink._sendData(inputEvent); | 396 sink._add(inputEvent); |
| 469 _previous = inputEvent; | 397 _previous = inputEvent; |
| 470 } | 398 } |
| 471 } | 399 } |
| 472 } | 400 } |
| 473 } | 401 } |
| 474 | 402 |
| 475 // Stream transformations and event transformations. | 403 // Stream transformations and event transformations. |
| 476 | 404 |
| 477 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); | 405 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
| 478 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); | 406 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 527 | 455 |
| 528 void handleError(error, EventSink<T> sink) { | 456 void handleError(error, EventSink<T> sink) { |
| 529 _handleError(error, sink); | 457 _handleError(error, sink); |
| 530 } | 458 } |
| 531 | 459 |
| 532 void handleDone(EventSink<T> sink) { | 460 void handleDone(EventSink<T> sink) { |
| 533 _handleDone(sink); | 461 _handleDone(sink); |
| 534 } | 462 } |
| 535 } | 463 } |
| 536 | 464 |
| OLD | NEW |