| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library async.stream_events; | 5 library async.stream_events; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection'; | 8 import 'dart:collection'; |
| 9 | 9 |
| 10 import "forkable_stream.dart"; | 10 import "subscription_fork.dart"; |
| 11 import "subscription_stream.dart"; | 11 import "subscription_stream.dart"; |
| 12 import "stream_completer.dart"; | 12 import "stream_completer.dart"; |
| 13 import "../result.dart"; | 13 import "../result.dart"; |
| 14 | 14 |
| 15 /// An asynchronous pull-based interface for accessing stream events. | 15 /// An asynchronous pull-based interface for accessing stream events. |
| 16 /// | 16 /// |
| 17 /// Wraps a stream and makes individual events available on request. | 17 /// Wraps a stream and makes individual events available on request. |
| 18 /// | 18 /// |
| 19 /// You can request (and reserve) one or more events from the stream, | 19 /// You can request (and reserve) one or more events from the stream, |
| 20 /// and after all previous requests have been fulfilled, stream events | 20 /// and after all previous requests have been fulfilled, stream events |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 72 // again when new events are available. | 72 // again when new events are available. |
| 73 // The request can remove events that it uses, or keep them in the event | 73 // The request can remove events that it uses, or keep them in the event |
| 74 // queue until it has all that it needs. | 74 // queue until it has all that it needs. |
| 75 // | 75 // |
| 76 // This model is very flexible and easily extensible. | 76 // This model is very flexible and easily extensible. |
| 77 // It allows requests that don't consume events (like [hasNext]) or | 77 // It allows requests that don't consume events (like [hasNext]) or |
| 78 // potentially a request that takes either five or zero events, determined | 78 // potentially a request that takes either five or zero events, determined |
| 79 // by the content of the fifth event. | 79 // by the content of the fifth event. |
| 80 | 80 |
| 81 /// Source of events. | 81 /// Source of events. |
| 82 final ForkableStream _sourceStream; | 82 final Stream _sourceStream; |
| 83 | 83 |
| 84 /// Subscription on [_sourceStream] while listening for events. | 84 /// Subscription on [_sourceStream] while listening for events. |
| 85 /// | 85 /// |
| 86 /// Set to subscription when listening, and set to `null` when the | 86 /// Set to subscription when listening, and set to `null` when the |
| 87 /// subscription is done (and [_isDone] is set to true). | 87 /// subscription is done (and [_isDone] is set to true). |
| 88 StreamSubscription _subscription; | 88 StreamSubscription _subscription; |
| 89 | 89 |
| 90 /// Whether we have listened on [_sourceStream] and the subscription is done. | 90 /// Whether we have listened on [_sourceStream] and the subscription is done. |
| 91 bool _isDone = false; | 91 bool _isDone = false; |
| 92 | 92 |
| 93 /// Whether a closing operation has been performed on the stream queue. | 93 /// Whether a closing operation has been performed on the stream queue. |
| 94 /// | 94 /// |
| 95 /// Closing operations are [cancel] and [rest]. | 95 /// Closing operations are [cancel] and [rest]. |
| 96 bool _isClosed = false; | 96 bool _isClosed = false; |
| 97 | 97 |
| 98 /// Queue of events not used by a request yet. | 98 /// Queue of events not used by a request yet. |
| 99 final Queue<Result> _eventQueue = new Queue(); | 99 final Queue<Result> _eventQueue = new Queue(); |
| 100 | 100 |
| 101 /// Queue of pending requests. | 101 /// Queue of pending requests. |
| 102 /// | 102 /// |
| 103 /// Access through methods below to ensure consistency. | 103 /// Access through methods below to ensure consistency. |
| 104 final Queue<_EventRequest> _requestQueue = new Queue(); | 104 final Queue<_EventRequest> _requestQueue = new Queue(); |
| 105 | 105 |
| 106 /// Create a `StreamQueue` of the events of [source]. | 106 /// Create a `StreamQueue` of the events of [source]. |
| 107 StreamQueue(Stream source) | 107 StreamQueue(Stream source) |
| 108 : _sourceStream = source is ForkableStream | 108 : _sourceStream = source; |
| 109 ? source | |
| 110 : new ForkableStream(source); | |
| 111 | 109 |
| 112 /// Asks if the stream has any more events. | 110 /// Asks if the stream has any more events. |
| 113 /// | 111 /// |
| 114 /// Returns a future that completes with `true` if the stream has any | 112 /// Returns a future that completes with `true` if the stream has any |
| 115 /// more events, whether data or error. | 113 /// more events, whether data or error. |
| 116 /// If the stream closes without producing any more events, the returned | 114 /// If the stream closes without producing any more events, the returned |
| 117 /// future completes with `false`. | 115 /// future completes with `false`. |
| 118 /// | 116 /// |
| 119 /// Can be used before using [next] to avoid getting an error in the | 117 /// Can be used before using [next] to avoid getting an error in the |
| 120 /// future returned by `next` in the case where there are no more events. | 118 /// future returned by `next` in the case where there are no more events. |
| (...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 223 /// | 221 /// |
| 224 /// The fork is subscribed to the same underlying stream as this queue, but | 222 /// The fork is subscribed to the same underlying stream as this queue, but |
| 225 /// it's otherwise wholly independent. If requests are made on one, they don't | 223 /// it's otherwise wholly independent. If requests are made on one, they don't |
| 226 /// move the other forward; if one is closed, the other is still open. | 224 /// move the other forward; if one is closed, the other is still open. |
| 227 /// | 225 /// |
| 228 /// The underlying stream will only be paused when all forks have no | 226 /// The underlying stream will only be paused when all forks have no |
| 229 /// outstanding requests, and only canceled when all forks are canceled. | 227 /// outstanding requests, and only canceled when all forks are canceled. |
| 230 StreamQueue<T> fork() { | 228 StreamQueue<T> fork() { |
| 231 if (_isClosed) throw _failClosed(); | 229 if (_isClosed) throw _failClosed(); |
| 232 | 230 |
| 231 _ensureListening(); |
| 232 if (_subscription is! ForkableSubscription) { |
| 233 _subscription = new ForkableSubscription<T>(_subscription); |
| 234 _subscription.onData(_onData); |
| 235 _subscription.onError(_onError); |
| 236 _subscription.onDone(_onDone); |
| 237 } |
| 233 var request = new _ForkRequest<T>(this); | 238 var request = new _ForkRequest<T>(this); |
| 234 _addRequest(request); | 239 _addRequest(request); |
| 235 return request.queue; | 240 return request.queue; |
| 236 } | 241 } |
| 237 | 242 |
| 238 /// Cancels the underlying stream subscription. | 243 /// Cancels the underlying stream subscription. |
| 239 /// | 244 /// |
| 240 /// If [immediate] is `false` (the default), the cancel operation waits until | 245 /// If [immediate] is `false` (the default), the cancel operation waits until |
| 241 /// all previously requested events have been processed, then it cancels the | 246 /// all previously requested events have been processed, then it cancels the |
| 242 /// subscription providing the events. | 247 /// subscription providing the events. |
| (...skipping 429 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 672 | 677 |
| 673 void close(Queue<Result> events) { | 678 void close(Queue<Result> events) { |
| 674 _completeStream(events); | 679 _completeStream(events); |
| 675 } | 680 } |
| 676 | 681 |
| 677 void _completeStream(Queue<Result> events) { | 682 void _completeStream(Queue<Result> events) { |
| 678 if (events.isEmpty) { | 683 if (events.isEmpty) { |
| 679 if (_streamQueue._isDone) { | 684 if (_streamQueue._isDone) { |
| 680 _completer.setEmpty(); | 685 _completer.setEmpty(); |
| 681 } else { | 686 } else { |
| 682 _completer.setSourceStream(_streamQueue._sourceStream.fork()); | 687 assert(_streamQueue._subscription is ForkableSubscription); |
| 688 var stream = new SubscriptionStream<T>(_streamQueue._subscription.fork()
); |
| 689 _completer.setSourceStream(stream); |
| 683 } | 690 } |
| 684 } else { | 691 } else { |
| 685 // There are prefetched events which need to be added before the | 692 // There are prefetched events which need to be added before the |
| 686 // remaining stream. | 693 // remaining stream. |
| 687 var controller = new StreamController<T>(); | 694 var controller = new StreamController<T>(); |
| 688 for (var event in events) { | 695 for (var event in events) { |
| 689 event.addTo(controller); | 696 event.addTo(controller); |
| 690 } | 697 } |
| 691 | 698 |
| 692 var fork = _streamQueue._sourceStream.fork(); | 699 var fork = new SubscriptionStream(_streamQueue._subscription.fork()); |
| 693 controller.addStream(fork, cancelOnError: false) | 700 controller.addStream(fork, cancelOnError: false) |
| 694 .whenComplete(controller.close); | 701 .whenComplete(controller.close); |
| 695 _completer.setSourceStream(controller.stream); | 702 _completer.setSourceStream(controller.stream); |
| 696 } | 703 } |
| 697 } | 704 } |
| 698 } | 705 } |
| OLD | NEW |