OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 // part of dart.async; |
| 6 |
| 7 /** |
| 8 * A pipe between two streams. |
| 9 * |
| 10 * The default pipe subscribes to the [source] and sends on the |
| 11 * [stream]. |
| 12 * |
| 13 * The events are passed through the [_handleData], [_handleError] and |
| 14 * [_handleDone] methods. Subclasses are supposed to add handling of some of |
| 15 * the events by overriding these methods. |
| 16 * |
| 17 * This class is intended for internal use only. Users can use the [PipeStream] |
| 18 * to configure similar behavior. |
| 19 */ |
| 20 abstract class _ForwardingStream<S, T> extends _MultiStreamImpl<T> |
| 21 implements StreamTransformer<S, T> { |
| 22 Stream<S> _source = null; |
| 23 StreamSubscription _subscription = null; |
| 24 |
| 25 StreamController<T> _createController() { |
| 26 return new _BaseForwardingController<T>(this); |
| 27 } |
| 28 |
| 29 void _subscribeToSource() { |
| 30 _subscription = _source.listen(this._handleData, |
| 31 onError: this._handleError, |
| 32 onDone: this._handleDone); |
| 33 if (_isPaused) { |
| 34 _subscription.pause(); |
| 35 } |
| 36 } |
| 37 |
| 38 Stream<T> bind(Stream<S> source) { |
| 39 assert(_source == null); |
| 40 _source = source; |
| 41 if (_hasSubscribers) { |
| 42 _subscribeToSource(); |
| 43 } |
| 44 return this; |
| 45 } |
| 46 |
| 47 /** |
| 48 * Subscribe or unsubscribe on [source] depending on whether |
| 49 * [stream] has subscribers. |
| 50 */ |
| 51 void _onSubscriptionStateChange() { |
| 52 if (_hasSubscribers) { |
| 53 assert(_subscription == null); |
| 54 if (_source != null) { |
| 55 _subscribeToSource(); |
| 56 } |
| 57 } else { |
| 58 if (_subscription != null) { |
| 59 _subscription.cancel(); |
| 60 _subscription = null; |
| 61 } |
| 62 } |
| 63 } |
| 64 |
| 65 void _onPauseStateChange() { |
| 66 if (_subscription == null) return; |
| 67 if (isPaused) { |
| 68 _subscription.pause(); |
| 69 } else { |
| 70 _subscription.resume(); |
| 71 } |
| 72 } |
| 73 |
| 74 void _handleData(S inputEvent) { |
| 75 var outputEvent = inputEvent; |
| 76 _add(outputEvent); |
| 77 } |
| 78 |
| 79 void _handleError(AsyncError error) { |
| 80 _signalError(error); |
| 81 } |
| 82 |
| 83 void _handleDone() { |
| 84 _close(); |
| 85 } |
| 86 } |
| 87 |
| 88 |
| 89 // ------------------------------------------------------------------- |
| 90 // Stream pipes used by the default Stream implementation. |
| 91 // ------------------------------------------------------------------- |
| 92 |
| 93 typedef bool _Predicate<T>(T value); |
| 94 |
| 95 class WhereStream<T> extends _ForwardingStream<T, T> { |
| 96 final _Predicate<T> _test; |
| 97 |
| 98 WhereStream(bool test(T value)) |
| 99 : this._test = test; |
| 100 |
| 101 void _handleData(T inputEvent) { |
| 102 bool satisfies; |
| 103 try { |
| 104 satisfies = _test(inputEvent); |
| 105 } catch (e, s) { |
| 106 _signalError(new AsyncError(e, s)); |
| 107 return; |
| 108 } |
| 109 if (satisfies) { |
| 110 _add(inputEvent); |
| 111 } |
| 112 } |
| 113 } |
| 114 |
| 115 |
| 116 typedef T _Transformation<S, T>(S value); |
| 117 |
| 118 /** |
| 119 * A stream pipe that converts data events before passing them on. |
| 120 */ |
| 121 class MapStream<S, T> extends _ForwardingStream<S, T> { |
| 122 final _Transformation _transform; |
| 123 |
| 124 MapStream(T transform(S event)) |
| 125 : this._transform = transform; |
| 126 |
| 127 void _handleData(S inputEvent) { |
| 128 T outputEvent; |
| 129 try { |
| 130 outputEvent = _transform(inputEvent); |
| 131 } catch (e, s) { |
| 132 _signalError(new AsyncError(e, s)); |
| 133 return; |
| 134 } |
| 135 _add(outputEvent); |
| 136 } |
| 137 } |
| 138 |
| 139 /** |
| 140 * A stream pipe that converts data events before passing them on. |
| 141 */ |
| 142 class ExpandStream<S, T> extends _ForwardingStream<S, T> { |
| 143 final _Transformation<S, Iterable<T>> _expand; |
| 144 |
| 145 ExpandStream(Iterable<T> expand(S event)) |
| 146 : this._expand = expand; |
| 147 |
| 148 void _handleData(S inputEvent) { |
| 149 try { |
| 150 for (T value in _expand(inputEvent)) { |
| 151 _add(value); |
| 152 } |
| 153 } catch (e, s) { |
| 154 // If either _expand or iterating the generated iterator throws, |
| 155 // we abort the iteration. |
| 156 _signalError(new AsyncError(e, s)); |
| 157 } |
| 158 } |
| 159 } |
| 160 |
| 161 |
| 162 typedef AsyncError _ErrorTransformation(AsyncError error); |
| 163 |
| 164 /** |
| 165 * A stream pipe that converts or disposes error events |
| 166 * before passing them on. |
| 167 */ |
| 168 class HandleErrorStream<T> extends _ForwardingStream<T, T> { |
| 169 final _ErrorTransformation _transform; |
| 170 |
| 171 HandleErrorStream(AsyncError transform(AsyncError event)) |
| 172 : this._transform = transform; |
| 173 |
| 174 void _handleError(AsyncError error) { |
| 175 try { |
| 176 error = _transform(error); |
| 177 if (error == null) return; |
| 178 } catch (e, s) { |
| 179 error = new AsyncError.withCause(e, s, error); |
| 180 } |
| 181 _signalError(error); |
| 182 } |
| 183 } |
| 184 |
| 185 |
| 186 typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink); |
| 187 typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink); |
| 188 typedef void _TransformDoneHandler<T>(StreamSink<T> sink); |
| 189 |
| 190 /** |
| 191 * A stream pipe that intercepts all events and can generate any event as |
| 192 * output. |
| 193 * |
| 194 * Each incoming event on this [StreamSink] is passed to the corresponding |
| 195 * provided event handler, along with a [StreamSink] linked to the [output] of |
| 196 * this pipe. |
| 197 * The handler can then decide which events to send to the output |
| 198 */ |
| 199 class PipeStream<S, T> extends _ForwardingStream<S, T> { |
| 200 final _TransformDataHandler<S, T> _onData; |
| 201 final _TransformErrorHandler<T> _onError; |
| 202 final _TransformDoneHandler<T> _onDone; |
| 203 StreamSink<T> _sink; |
| 204 |
| 205 PipeStream({void onData(S data, StreamSink<T> sink), |
| 206 void onError(AsyncError data, StreamSink<T> sink), |
| 207 void onDone(StreamSink<T> sink)}) |
| 208 : this._onData = (onData == null ? _defaultHandleData : onData), |
| 209 this._onError = (onError == null ? _defaultHandleError : onError), |
| 210 this._onDone = (onDone == null ? _defaultHandleDone : onDone) { |
| 211 // Cache the sink wrapper to avoid creating a new one for each event. |
| 212 this._sink = new _StreamImplSink(this); |
| 213 } |
| 214 |
| 215 void _handleData(S data) { |
| 216 try { |
| 217 return _onData(data, _sink); |
| 218 } catch (e, s) { |
| 219 _signalError(new AsyncError(e, s)); |
| 220 } |
| 221 } |
| 222 |
| 223 void _handleError(AsyncError error) { |
| 224 try { |
| 225 _onError(error, _sink); |
| 226 } catch (e, s) { |
| 227 _signalError(new AsyncError.withCause(e, s, error)); |
| 228 } |
| 229 } |
| 230 |
| 231 void _handleDone() { |
| 232 try { |
| 233 _onDone(_sink); |
| 234 } catch (e, s) { |
| 235 _signalError(new AsyncError(e, s)); |
| 236 } |
| 237 } |
| 238 |
| 239 /** Default data handler forwards all data. */ |
| 240 static void _defaultHandleData(dynamic data, StreamSink sink) { |
| 241 sink.add(data); |
| 242 } |
| 243 /** Default error handler forwards all errors. */ |
| 244 static void _defaultHandleError(AsyncError error, StreamSink sink) { |
| 245 sink.signalError(error); |
| 246 } |
| 247 /** Default done handler forwards done. */ |
| 248 static void _defaultHandleDone(StreamSink sink) { |
| 249 sink.close(); |
| 250 } |
| 251 } |
| 252 |
| 253 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */ |
| 254 class _StreamImplSink<T> implements StreamSink<T> { |
| 255 _StreamImpl<T> _target; |
| 256 _StreamImplSink(this._target); |
| 257 void add(T data) { _target._add(data); } |
| 258 void signalError(AsyncError error) { _target._signalError(error); } |
| 259 void close() { _target._close(); } |
| 260 } |
| 261 |
| 262 /** |
| 263 * A stream pipe that intercepts all events and can generate any event as |
| 264 * output. |
| 265 * |
| 266 * Each incoming event on this [StreamSink] is passed to the corresponding |
| 267 * method on [transform], along with a [StreamSink] linked to the [output] of |
| 268 * this pipe. |
| 269 * The handler can then decide which events to send to the output |
| 270 */ |
| 271 class TransformStream<S, T> extends _ForwardingStream<S, T> { |
| 272 final StreamTransformer<S, T> _transform; |
| 273 StreamSink<T> _sink; |
| 274 |
| 275 TransformStream(StreamTransformer<S, T> transform) |
| 276 : this._transform = transform { |
| 277 // Cache the sink wrapper to avoid creating a new one for each event. |
| 278 this._sink = new _StreamImplSink(this); |
| 279 } |
| 280 |
| 281 void _handleData(S data) { |
| 282 try { |
| 283 return _transform.handleData(data, _sink); |
| 284 } catch (e, s) { |
| 285 _controller.signalError(new AsyncError(e, s)); |
| 286 } |
| 287 } |
| 288 |
| 289 void _handleError(AsyncError error) { |
| 290 try { |
| 291 _transform.handleError(error, _sink); |
| 292 } catch (e, s) { |
| 293 _controller.signalError(new AsyncError.withCause(e, s, error)); |
| 294 } |
| 295 } |
| 296 |
| 297 void _handleDone() { |
| 298 try { |
| 299 _transform.handleDone(_sink); |
| 300 } catch (e, s) { |
| 301 _controller.signalError(new AsyncError(e, s)); |
| 302 } |
| 303 } |
| 304 } |
| 305 |
| 306 |
| 307 /** Helper class for transforming three functions into a StreamTransformer. */ |
| 308 class _StreamTransformerFunctionWrapper<S, T> |
| 309 extends _StreamTransformer<S, T> { |
| 310 final _TransformDataHandler<S, T> _handleData; |
| 311 final _TransformErrorHandler<T> _handleError; |
| 312 final _TransformDoneHandler<T> _handleDone; |
| 313 |
| 314 _StreamTransformerFunctionWrapper({ |
| 315 void onData(S data, StreamSink<T> sink), |
| 316 void onError(AsyncError data, StreamSink<T> sink), |
| 317 void onDone(StreamSink<T> sink)}) |
| 318 : _handleData = onData != null ? onData : PipeStream._defaultHandleData, |
| 319 _handleError = onError != null ? onError |
| 320 : PipeStream._defaultHandleError, |
| 321 _handleDone = onDone != null ? onDone : PipeStream._defaultHandleDone; |
| 322 |
| 323 void handleData(S data, StreamSink<T> sink) { |
| 324 return _handleData(data, sink); |
| 325 } |
| 326 |
| 327 void handleError(AsyncError error, StreamSink<T> sink) { |
| 328 _handleError(error, sink); |
| 329 } |
| 330 |
| 331 void handleDone(StreamSink<T> sink) { |
| 332 _handleDone(sink); |
| 333 } |
| 334 } |
| 335 |
| 336 |
| 337 class TakeStream<T> extends _ForwardingStream<T, T> { |
| 338 int _remaining; |
| 339 |
| 340 TakeStream(int count) |
| 341 : this._remaining = count { |
| 342 if (count is! int) throw new ArgumentError(count); |
| 343 } |
| 344 |
| 345 void _handleData(T inputEvent) { |
| 346 if (_remaining > 0) { |
| 347 _add(inputEvent); |
| 348 _remaining -= 1; |
| 349 if (_remaining == 0) { |
| 350 // Closing also unsubscribes all subscribers, which unsubscribes |
| 351 // this from source. |
| 352 _close(); |
| 353 } |
| 354 } |
| 355 } |
| 356 } |
| 357 |
| 358 |
| 359 class TakeWhileStream<T> extends _ForwardingStream<T, T> { |
| 360 final _Predicate<T> _test; |
| 361 |
| 362 TakeWhileStream(bool test(T value)) |
| 363 : this._test = test; |
| 364 |
| 365 void _handleData(T inputEvent) { |
| 366 bool satisfies; |
| 367 try { |
| 368 satisfies = _test(inputEvent); |
| 369 } catch (e, s) { |
| 370 _signalError(new AsyncError(e, s)); |
| 371 // The test didn't say true. Didn't say false either, but we stop anyway. |
| 372 _close(); |
| 373 return; |
| 374 } |
| 375 if (satisfies) { |
| 376 _add(inputEvent); |
| 377 } else { |
| 378 _close(); |
| 379 } |
| 380 } |
| 381 } |
| 382 |
| 383 class SkipStream<T> extends _ForwardingStream<T, T> { |
| 384 int _remaining; |
| 385 |
| 386 SkipStream(int count) |
| 387 : this._remaining = count{ |
| 388 if (count is! int) throw new ArgumentError(count); |
| 389 } |
| 390 |
| 391 void _handleData(T inputEvent) { |
| 392 if (_remaining > 0) { |
| 393 _remaining--; |
| 394 return; |
| 395 } |
| 396 return _add(inputEvent); |
| 397 } |
| 398 } |
| 399 |
| 400 class SkipWhileStream<T> extends _ForwardingStream<T, T> { |
| 401 final _Predicate<T> _test; |
| 402 bool _hasFailed = false; |
| 403 |
| 404 SkipWhileStream(bool test(T value)) |
| 405 : this._test = test; |
| 406 |
| 407 void _handleData(T inputEvent) { |
| 408 if (_hasFailed) { |
| 409 _add(inputEvent); |
| 410 } |
| 411 bool satisfies; |
| 412 try { |
| 413 satisfies = _test(inputEvent); |
| 414 } catch (e, s) { |
| 415 _signalError(new AsyncError(e, s)); |
| 416 // A failure to return a boolean is considered "not matching". |
| 417 _hasFailed = true; |
| 418 return; |
| 419 } |
| 420 if (!satisfies) { |
| 421 _hasFailed = true; |
| 422 _add(inputEvent); |
| 423 } |
| 424 } |
| 425 } |
| 426 |
| 427 typedef bool _Equality<T>(T a, T b); |
| 428 |
| 429 class DistinctStream<T> extends _ForwardingStream<T, T> { |
| 430 static var _SENTINEL = new Object(); |
| 431 |
| 432 _Equality<T> _equals; |
| 433 var _previous = _SENTINEL; |
| 434 |
| 435 DistinctStream(bool equals(T a, T b)) |
| 436 : _equals = equals; |
| 437 |
| 438 void _handleData(T inputEvent) { |
| 439 if (identical(_previous, _SENTINEL)) { |
| 440 _previous = inputEvent; |
| 441 return _add(inputEvent); |
| 442 } else { |
| 443 bool isEqual; |
| 444 try { |
| 445 if (_equals == null) { |
| 446 isEqual = (_previous == inputEvent); |
| 447 } else { |
| 448 isEqual = _equals(_previous, inputEvent); |
| 449 } |
| 450 } catch (e, s) { |
| 451 _signalError(new AsyncError(e, s)); |
| 452 return null; |
| 453 } |
| 454 if (!isEqual) { |
| 455 _add(inputEvent); |
| 456 _previous = inputEvent; |
| 457 } |
| 458 } |
| 459 } |
| 460 } |
OLD | NEW |