| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** | 7 /** |
| 8 * Utility function to attach a stack trace to an [error] if it doesn't have | 8 * Utility function to attach a stack trace to an [error] if it doesn't have |
| 9 * one already. | 9 * one already. |
| 10 */ | 10 */ |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 62 StreamSubscription<T> _createSubscription(void onData(T value), | 62 StreamSubscription<T> _createSubscription(void onData(T value), |
| 63 void onError(error), | 63 void onError(error), |
| 64 void onDone(), | 64 void onDone(), |
| 65 bool cancelOnError) { | 65 bool cancelOnError) { |
| 66 return new _ForwardingStreamSubscription<S, T>( | 66 return new _ForwardingStreamSubscription<S, T>( |
| 67 this, onData, onError, onDone, cancelOnError); | 67 this, onData, onError, onDone, cancelOnError); |
| 68 } | 68 } |
| 69 | 69 |
| 70 // Override the following methods in subclasses to change the behavior. | 70 // Override the following methods in subclasses to change the behavior. |
| 71 | 71 |
| 72 void _handleData(S data, _EventSink<T> sink) { | 72 void _handleData(S data, _EventOutputSink<T> sink) { |
| 73 var outputData = data; | 73 var outputData = data; |
| 74 sink._add(outputData); | 74 sink._sendData(outputData); |
| 75 } | 75 } |
| 76 | 76 |
| 77 void _handleError(error, _EventSink<T> sink) { | 77 void _handleError(error, _EventOutputSink<T> sink) { |
| 78 sink._addError(error); | 78 sink._sendError(error); |
| 79 } | 79 } |
| 80 | 80 |
| 81 void _handleDone(_EventSink<T> sink) { | 81 void _handleDone(_EventOutputSink<T> sink) { |
| 82 sink._close(); | 82 sink._sendDone(); |
| 83 } | 83 } |
| 84 } | 84 } |
| 85 | 85 |
| 86 /** | 86 /** |
| 87 * Common behavior of [StreamSubscription] classes. |
| 88 * |
| 89 * Stores and allows updating of the event handlers of a [StreamSubscription]. |
| 90 */ |
| 91 abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> { |
| 92 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 93 // checked mode. http://dartbug.com/7733 |
| 94 var /* _DataHandler<T> */ _onData; |
| 95 _ErrorHandler _onError; |
| 96 _DoneHandler _onDone; |
| 97 |
| 98 _BaseStreamSubscription(this._onData, |
| 99 this._onError, |
| 100 this._onDone) { |
| 101 if (_onData == null) _onData = _nullDataHandler; |
| 102 if (_onError == null) _onError = _nullErrorHandler; |
| 103 if (_onDone == null) _onDone = _nullDoneHandler; |
| 104 } |
| 105 |
| 106 // StreamSubscription interface. |
| 107 void onData(void handleData(T event)) { |
| 108 if (handleData == null) handleData = _nullDataHandler; |
| 109 _onData = handleData; |
| 110 } |
| 111 |
| 112 void onError(void handleError(error)) { |
| 113 if (handleError == null) handleError = _nullErrorHandler; |
| 114 _onError = handleError; |
| 115 } |
| 116 |
| 117 void onDone(void handleDone()) { |
| 118 if (handleDone == null) handleDone = _nullDoneHandler; |
| 119 _onDone = handleDone; |
| 120 } |
| 121 |
| 122 void pause([Future resumeSignal]); |
| 123 |
| 124 void resume(); |
| 125 |
| 126 void cancel(); |
| 127 |
| 128 Future asFuture([var futureValue]) { |
| 129 _FutureImpl<T> result = new _FutureImpl<T>(); |
| 130 |
| 131 // Overwrite the onDone and onError handlers. |
| 132 onDone(() { result._setValue(futureValue); }); |
| 133 onError((error) { |
| 134 cancel(); |
| 135 result._setError(error); |
| 136 }); |
| 137 |
| 138 return result; |
| 139 } |
| 140 } |
| 141 |
| 142 |
| 143 /** |
| 87 * Abstract superclass for subscriptions that forward to other subscriptions. | 144 * Abstract superclass for subscriptions that forward to other subscriptions. |
| 88 */ | 145 */ |
| 89 class _ForwardingStreamSubscription<S, T> | 146 class _ForwardingStreamSubscription<S, T> |
| 90 extends _BufferingStreamSubscription<T> { | 147 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { |
| 91 final _ForwardingStream<S, T> _stream; | 148 final _ForwardingStream<S, T> _stream; |
| 149 final bool _cancelOnError; |
| 92 | 150 |
| 93 StreamSubscription<S> _subscription; | 151 StreamSubscription<S> _subscription; |
| 94 | 152 |
| 95 _ForwardingStreamSubscription(this._stream, | 153 _ForwardingStreamSubscription(this._stream, |
| 96 void onData(T data), | 154 void onData(T data), |
| 97 void onError(error), | 155 void onError(error), |
| 98 void onDone(), | 156 void onDone(), |
| 99 bool cancelOnError) | 157 this._cancelOnError) |
| 100 : super(onData, onError, onDone, cancelOnError) { | 158 : super(onData, onError, onDone) { |
| 159 // Don't unsubscribe on incoming error, only if we send an error forwards. |
| 101 _subscription = | 160 _subscription = |
| 102 _stream._source.listen(_handleData, | 161 _stream._source.listen(_handleData, |
| 103 onError: _handleError, | 162 onError: _handleError, |
| 104 onDone: _handleDone); | 163 onDone: _handleDone); |
| 105 } | 164 } |
| 106 | 165 |
| 107 // _StreamSink interface. | 166 // StreamSubscription interface. |
| 108 // Transformers sending more than one event have no way to know if the stream | |
| 109 // is canceled or closed after the first, so we just ignore remaining events. | |
| 110 | 167 |
| 111 void _add(T data) { | 168 void pause([Future resumeSignal]) { |
| 112 if (_isClosed) return; | 169 if (_subscription == null) return; |
| 113 super._add(data); | 170 _subscription.pause(resumeSignal); |
| 114 } | 171 } |
| 115 | 172 |
| 116 void _addError(Object error) { | 173 void resume() { |
| 117 if (_isClosed) return; | |
| 118 super._addError(error); | |
| 119 } | |
| 120 | |
| 121 // StreamSubscription callbacks. | |
| 122 | |
| 123 void _onPause() { | |
| 124 if (_subscription == null) return; | |
| 125 _subscription.pause(); | |
| 126 } | |
| 127 | |
| 128 void _onResume() { | |
| 129 if (_subscription == null) return; | 174 if (_subscription == null) return; |
| 130 _subscription.resume(); | 175 _subscription.resume(); |
| 131 } | 176 } |
| 132 | 177 |
| 133 void _onCancel() { | 178 bool get isPaused { |
| 179 if (_subscription == null) return false; |
| 180 return _subscription.isPaused; |
| 181 } |
| 182 |
| 183 void cancel() { |
| 134 if (_subscription != null) { | 184 if (_subscription != null) { |
| 135 StreamSubscription subscription = _subscription; | 185 _subscription.cancel(); |
| 136 _subscription = null; | 186 _subscription = null; |
| 137 subscription.cancel(); | |
| 138 } | 187 } |
| 139 } | 188 } |
| 140 | 189 |
| 190 // _EventOutputSink interface. Sends data to this subscription. |
| 191 |
| 192 void _sendData(T data) { |
| 193 _onData(data); |
| 194 } |
| 195 |
| 196 void _sendError(error) { |
| 197 _onError(error); |
| 198 if (_cancelOnError) { |
| 199 _subscription.cancel(); |
| 200 _subscription = null; |
| 201 } |
| 202 } |
| 203 |
| 204 void _sendDone() { |
| 205 // If the transformation sends a done signal, we stop the subscription. |
| 206 if (_subscription != null) { |
| 207 _subscription.cancel(); |
| 208 _subscription = null; |
| 209 } |
| 210 _onDone(); |
| 211 } |
| 212 |
| 141 // Methods used as listener on source subscription. | 213 // Methods used as listener on source subscription. |
| 142 | 214 |
| 143 // TODO(ahe): Restore type when feature is implemented in dart2js | 215 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 144 // checked mode. http://dartbug.com/7733 | 216 // checked mode. http://dartbug.com/7733 |
| 145 void _handleData(/*S*/ data) { | 217 void _handleData(/*S*/ data) { |
| 146 _stream._handleData(data, this); | 218 _stream._handleData(data, this); |
| 147 } | 219 } |
| 148 | 220 |
| 149 void _handleError(error) { | 221 void _handleError(error) { |
| 150 _stream._handleError(error, this); | 222 _stream._handleError(error, this); |
| (...skipping 11 matching lines...) Expand all Loading... |
| 162 // ------------------------------------------------------------------- | 234 // ------------------------------------------------------------------- |
| 163 | 235 |
| 164 typedef bool _Predicate<T>(T value); | 236 typedef bool _Predicate<T>(T value); |
| 165 | 237 |
| 166 class _WhereStream<T> extends _ForwardingStream<T, T> { | 238 class _WhereStream<T> extends _ForwardingStream<T, T> { |
| 167 final _Predicate<T> _test; | 239 final _Predicate<T> _test; |
| 168 | 240 |
| 169 _WhereStream(Stream<T> source, bool test(T value)) | 241 _WhereStream(Stream<T> source, bool test(T value)) |
| 170 : _test = test, super(source); | 242 : _test = test, super(source); |
| 171 | 243 |
| 172 void _handleData(T inputEvent, _EventSink<T> sink) { | 244 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| 173 bool satisfies; | 245 bool satisfies; |
| 174 try { | 246 try { |
| 175 satisfies = _test(inputEvent); | 247 satisfies = _test(inputEvent); |
| 176 } catch (e, s) { | 248 } catch (e, s) { |
| 177 sink._addError(_asyncError(e, s)); | 249 sink._sendError(_asyncError(e, s)); |
| 178 return; | 250 return; |
| 179 } | 251 } |
| 180 if (satisfies) { | 252 if (satisfies) { |
| 181 sink._add(inputEvent); | 253 sink._sendData(inputEvent); |
| 182 } | 254 } |
| 183 } | 255 } |
| 184 } | 256 } |
| 185 | 257 |
| 186 | 258 |
| 187 typedef T _Transformation<S, T>(S value); | 259 typedef T _Transformation<S, T>(S value); |
| 188 | 260 |
| 189 /** | 261 /** |
| 190 * A stream pipe that converts data events before passing them on. | 262 * A stream pipe that converts data events before passing them on. |
| 191 */ | 263 */ |
| 192 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 264 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
| 193 final _Transformation _transform; | 265 final _Transformation _transform; |
| 194 | 266 |
| 195 _MapStream(Stream<S> source, T transform(S event)) | 267 _MapStream(Stream<S> source, T transform(S event)) |
| 196 : this._transform = transform, super(source); | 268 : this._transform = transform, super(source); |
| 197 | 269 |
| 198 void _handleData(S inputEvent, _EventSink<T> sink) { | 270 void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
| 199 T outputEvent; | 271 T outputEvent; |
| 200 try { | 272 try { |
| 201 outputEvent = _transform(inputEvent); | 273 outputEvent = _transform(inputEvent); |
| 202 } catch (e, s) { | 274 } catch (e, s) { |
| 203 sink._addError(_asyncError(e, s)); | 275 sink._sendError(_asyncError(e, s)); |
| 204 return; | 276 return; |
| 205 } | 277 } |
| 206 sink._add(outputEvent); | 278 sink._sendData(outputEvent); |
| 207 } | 279 } |
| 208 } | 280 } |
| 209 | 281 |
| 210 /** | 282 /** |
| 211 * A stream pipe that converts data events before passing them on. | 283 * A stream pipe that converts data events before passing them on. |
| 212 */ | 284 */ |
| 213 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | 285 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
| 214 final _Transformation<S, Iterable<T>> _expand; | 286 final _Transformation<S, Iterable<T>> _expand; |
| 215 | 287 |
| 216 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | 288 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
| 217 : this._expand = expand, super(source); | 289 : this._expand = expand, super(source); |
| 218 | 290 |
| 219 void _handleData(S inputEvent, _EventSink<T> sink) { | 291 void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
| 220 try { | 292 try { |
| 221 for (T value in _expand(inputEvent)) { | 293 for (T value in _expand(inputEvent)) { |
| 222 sink._add(value); | 294 sink._sendData(value); |
| 223 } | 295 } |
| 224 } catch (e, s) { | 296 } catch (e, s) { |
| 225 // If either _expand or iterating the generated iterator throws, | 297 // If either _expand or iterating the generated iterator throws, |
| 226 // we abort the iteration. | 298 // we abort the iteration. |
| 227 sink._addError(_asyncError(e, s)); | 299 sink._sendError(_asyncError(e, s)); |
| 228 } | 300 } |
| 229 } | 301 } |
| 230 } | 302 } |
| 231 | 303 |
| 232 | 304 |
| 233 typedef void _ErrorTransformation(error); | 305 typedef void _ErrorTransformation(error); |
| 234 typedef bool _ErrorTest(error); | 306 typedef bool _ErrorTest(error); |
| 235 | 307 |
| 236 /** | 308 /** |
| 237 * A stream pipe that converts or disposes error events | 309 * A stream pipe that converts or disposes error events |
| 238 * before passing them on. | 310 * before passing them on. |
| 239 */ | 311 */ |
| 240 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 312 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
| 241 final _ErrorTransformation _transform; | 313 final _ErrorTransformation _transform; |
| 242 final _ErrorTest _test; | 314 final _ErrorTest _test; |
| 243 | 315 |
| 244 _HandleErrorStream(Stream<T> source, | 316 _HandleErrorStream(Stream<T> source, |
| 245 void transform(event), | 317 void transform(event), |
| 246 bool test(error)) | 318 bool test(error)) |
| 247 : this._transform = transform, this._test = test, super(source); | 319 : this._transform = transform, this._test = test, super(source); |
| 248 | 320 |
| 249 void _handleError(Object error, _EventSink<T> sink) { | 321 void _handleError(Object error, _EventOutputSink<T> sink) { |
| 250 bool matches = true; | 322 bool matches = true; |
| 251 if (_test != null) { | 323 if (_test != null) { |
| 252 try { | 324 try { |
| 253 matches = _test(error); | 325 matches = _test(error); |
| 254 } catch (e, s) { | 326 } catch (e, s) { |
| 255 sink._addError(_asyncError(e, s)); | 327 sink._sendError(_asyncError(e, s)); |
| 256 return; | 328 return; |
| 257 } | 329 } |
| 258 } | 330 } |
| 259 if (matches) { | 331 if (matches) { |
| 260 try { | 332 try { |
| 261 _transform(error); | 333 _transform(error); |
| 262 } catch (e, s) { | 334 } catch (e, s) { |
| 263 sink._addError(_asyncError(e, s)); | 335 sink._sendError(_asyncError(e, s)); |
| 264 return; | 336 return; |
| 265 } | 337 } |
| 266 } else { | 338 } else { |
| 267 sink._addError(error); | 339 sink._sendError(error); |
| 268 } | 340 } |
| 269 } | 341 } |
| 270 } | 342 } |
| 271 | 343 |
| 272 | 344 |
| 273 class _TakeStream<T> extends _ForwardingStream<T, T> { | 345 class _TakeStream<T> extends _ForwardingStream<T, T> { |
| 274 int _remaining; | 346 int _remaining; |
| 275 | 347 |
| 276 _TakeStream(Stream<T> source, int count) | 348 _TakeStream(Stream<T> source, int count) |
| 277 : this._remaining = count, super(source) { | 349 : this._remaining = count, super(source) { |
| 278 // This test is done early to avoid handling an async error | 350 // This test is done early to avoid handling an async error |
| 279 // in the _handleData method. | 351 // in the _handleData method. |
| 280 if (count is! int) throw new ArgumentError(count); | 352 if (count is! int) throw new ArgumentError(count); |
| 281 } | 353 } |
| 282 | 354 |
| 283 void _handleData(T inputEvent, _EventSink<T> sink) { | 355 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| 284 if (_remaining > 0) { | 356 if (_remaining > 0) { |
| 285 sink._add(inputEvent); | 357 sink._sendData(inputEvent); |
| 286 _remaining -= 1; | 358 _remaining -= 1; |
| 287 if (_remaining == 0) { | 359 if (_remaining == 0) { |
| 288 // Closing also unsubscribes all subscribers, which unsubscribes | 360 // Closing also unsubscribes all subscribers, which unsubscribes |
| 289 // this from source. | 361 // this from source. |
| 290 sink._close(); | 362 sink._sendDone(); |
| 291 } | 363 } |
| 292 } | 364 } |
| 293 } | 365 } |
| 294 } | 366 } |
| 295 | 367 |
| 296 | 368 |
| 297 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | 369 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
| 298 final _Predicate<T> _test; | 370 final _Predicate<T> _test; |
| 299 | 371 |
| 300 _TakeWhileStream(Stream<T> source, bool test(T value)) | 372 _TakeWhileStream(Stream<T> source, bool test(T value)) |
| 301 : this._test = test, super(source); | 373 : this._test = test, super(source); |
| 302 | 374 |
| 303 void _handleData(T inputEvent, _EventSink<T> sink) { | 375 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| 304 bool satisfies; | 376 bool satisfies; |
| 305 try { | 377 try { |
| 306 satisfies = _test(inputEvent); | 378 satisfies = _test(inputEvent); |
| 307 } catch (e, s) { | 379 } catch (e, s) { |
| 308 sink._addError(_asyncError(e, s)); | 380 sink._sendError(_asyncError(e, s)); |
| 309 // The test didn't say true. Didn't say false either, but we stop anyway. | 381 // The test didn't say true. Didn't say false either, but we stop anyway. |
| 310 sink._close(); | 382 sink._sendDone(); |
| 311 return; | 383 return; |
| 312 } | 384 } |
| 313 if (satisfies) { | 385 if (satisfies) { |
| 314 sink._add(inputEvent); | 386 sink._sendData(inputEvent); |
| 315 } else { | 387 } else { |
| 316 sink._close(); | 388 sink._sendDone(); |
| 317 } | 389 } |
| 318 } | 390 } |
| 319 } | 391 } |
| 320 | 392 |
| 321 class _SkipStream<T> extends _ForwardingStream<T, T> { | 393 class _SkipStream<T> extends _ForwardingStream<T, T> { |
| 322 int _remaining; | 394 int _remaining; |
| 323 | 395 |
| 324 _SkipStream(Stream<T> source, int count) | 396 _SkipStream(Stream<T> source, int count) |
| 325 : this._remaining = count, super(source) { | 397 : this._remaining = count, super(source) { |
| 326 // This test is done early to avoid handling an async error | 398 // This test is done early to avoid handling an async error |
| 327 // in the _handleData method. | 399 // in the _handleData method. |
| 328 if (count is! int || count < 0) throw new ArgumentError(count); | 400 if (count is! int || count < 0) throw new ArgumentError(count); |
| 329 } | 401 } |
| 330 | 402 |
| 331 void _handleData(T inputEvent, _EventSink<T> sink) { | 403 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| 332 if (_remaining > 0) { | 404 if (_remaining > 0) { |
| 333 _remaining--; | 405 _remaining--; |
| 334 return; | 406 return; |
| 335 } | 407 } |
| 336 return sink._add(inputEvent); | 408 return sink._sendData(inputEvent); |
| 337 } | 409 } |
| 338 } | 410 } |
| 339 | 411 |
| 340 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { | 412 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
| 341 final _Predicate<T> _test; | 413 final _Predicate<T> _test; |
| 342 bool _hasFailed = false; | 414 bool _hasFailed = false; |
| 343 | 415 |
| 344 _SkipWhileStream(Stream<T> source, bool test(T value)) | 416 _SkipWhileStream(Stream<T> source, bool test(T value)) |
| 345 : this._test = test, super(source); | 417 : this._test = test, super(source); |
| 346 | 418 |
| 347 void _handleData(T inputEvent, _EventSink<T> sink) { | 419 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| 348 if (_hasFailed) { | 420 if (_hasFailed) { |
| 349 sink._add(inputEvent); | 421 sink._sendData(inputEvent); |
| 350 return; | 422 return; |
| 351 } | 423 } |
| 352 bool satisfies; | 424 bool satisfies; |
| 353 try { | 425 try { |
| 354 satisfies = _test(inputEvent); | 426 satisfies = _test(inputEvent); |
| 355 } catch (e, s) { | 427 } catch (e, s) { |
| 356 sink._addError(_asyncError(e, s)); | 428 sink._sendError(_asyncError(e, s)); |
| 357 // A failure to return a boolean is considered "not matching". | 429 // A failure to return a boolean is considered "not matching". |
| 358 _hasFailed = true; | 430 _hasFailed = true; |
| 359 return; | 431 return; |
| 360 } | 432 } |
| 361 if (!satisfies) { | 433 if (!satisfies) { |
| 362 _hasFailed = true; | 434 _hasFailed = true; |
| 363 sink._add(inputEvent); | 435 sink._sendData(inputEvent); |
| 364 } | 436 } |
| 365 } | 437 } |
| 366 } | 438 } |
| 367 | 439 |
| 368 typedef bool _Equality<T>(T a, T b); | 440 typedef bool _Equality<T>(T a, T b); |
| 369 | 441 |
| 370 class _DistinctStream<T> extends _ForwardingStream<T, T> { | 442 class _DistinctStream<T> extends _ForwardingStream<T, T> { |
| 371 static var _SENTINEL = new Object(); | 443 static var _SENTINEL = new Object(); |
| 372 | 444 |
| 373 _Equality<T> _equals; | 445 _Equality<T> _equals; |
| 374 var _previous = _SENTINEL; | 446 var _previous = _SENTINEL; |
| 375 | 447 |
| 376 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | 448 _DistinctStream(Stream<T> source, bool equals(T a, T b)) |
| 377 : _equals = equals, super(source); | 449 : _equals = equals, super(source); |
| 378 | 450 |
| 379 void _handleData(T inputEvent, _EventSink<T> sink) { | 451 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| 380 if (identical(_previous, _SENTINEL)) { | 452 if (identical(_previous, _SENTINEL)) { |
| 381 _previous = inputEvent; | 453 _previous = inputEvent; |
| 382 return sink._add(inputEvent); | 454 return sink._sendData(inputEvent); |
| 383 } else { | 455 } else { |
| 384 bool isEqual; | 456 bool isEqual; |
| 385 try { | 457 try { |
| 386 if (_equals == null) { | 458 if (_equals == null) { |
| 387 isEqual = (_previous == inputEvent); | 459 isEqual = (_previous == inputEvent); |
| 388 } else { | 460 } else { |
| 389 isEqual = _equals(_previous, inputEvent); | 461 isEqual = _equals(_previous, inputEvent); |
| 390 } | 462 } |
| 391 } catch (e, s) { | 463 } catch (e, s) { |
| 392 sink._addError(_asyncError(e, s)); | 464 sink._sendError(_asyncError(e, s)); |
| 393 return null; | 465 return null; |
| 394 } | 466 } |
| 395 if (!isEqual) { | 467 if (!isEqual) { |
| 396 sink._add(inputEvent); | 468 sink._sendData(inputEvent); |
| 397 _previous = inputEvent; | 469 _previous = inputEvent; |
| 398 } | 470 } |
| 399 } | 471 } |
| 400 } | 472 } |
| 401 } | 473 } |
| 402 | 474 |
| 403 // Stream transformations and event transformations. | 475 // Stream transformations and event transformations. |
| 404 | 476 |
| 405 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); | 477 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
| 406 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); | 478 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 455 | 527 |
| 456 void handleError(error, EventSink<T> sink) { | 528 void handleError(error, EventSink<T> sink) { |
| 457 _handleError(error, sink); | 529 _handleError(error, sink); |
| 458 } | 530 } |
| 459 | 531 |
| 460 void handleDone(EventSink<T> sink) { | 532 void handleDone(EventSink<T> sink) { |
| 461 _handleDone(sink); | 533 _handleDone(sink); |
| 462 } | 534 } |
| 463 } | 535 } |
| 464 | 536 |
| OLD | NEW |