OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.stream_events; | 5 library async.stream_events; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 | 9 |
10 import "forkable_stream.dart"; | 10 import "subscription_fork.dart"; |
11 import "subscription_stream.dart"; | 11 import "subscription_stream.dart"; |
12 import "stream_completer.dart"; | 12 import "stream_completer.dart"; |
13 import "../result.dart"; | 13 import "../result.dart"; |
14 | 14 |
15 /// An asynchronous pull-based interface for accessing stream events. | 15 /// An asynchronous pull-based interface for accessing stream events. |
16 /// | 16 /// |
17 /// Wraps a stream and makes individual events available on request. | 17 /// Wraps a stream and makes individual events available on request. |
18 /// | 18 /// |
19 /// You can request (and reserve) one or more events from the stream, | 19 /// You can request (and reserve) one or more events from the stream, |
20 /// and after all previous requests have been fulfilled, stream events | 20 /// and after all previous requests have been fulfilled, stream events |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
72 // again when new events are available. | 72 // again when new events are available. |
73 // The request can remove events that it uses, or keep them in the event | 73 // The request can remove events that it uses, or keep them in the event |
74 // queue until it has all that it needs. | 74 // queue until it has all that it needs. |
75 // | 75 // |
76 // This model is very flexible and easily extensible. | 76 // This model is very flexible and easily extensible. |
77 // It allows requests that don't consume events (like [hasNext]) or | 77 // It allows requests that don't consume events (like [hasNext]) or |
78 // potentially a request that takes either five or zero events, determined | 78 // potentially a request that takes either five or zero events, determined |
79 // by the content of the fifth event. | 79 // by the content of the fifth event. |
80 | 80 |
81 /// Source of events. | 81 /// Source of events. |
82 final ForkableStream _sourceStream; | 82 final Stream _sourceStream; |
83 | 83 |
84 /// Subscription on [_sourceStream] while listening for events. | 84 /// Subscription on [_sourceStream] while listening for events. |
85 /// | 85 /// |
86 /// Set to subscription when listening, and set to `null` when the | 86 /// Set to subscription when listening, and set to `null` when the |
87 /// subscription is done (and [_isDone] is set to true). | 87 /// subscription is done (and [_isDone] is set to true). |
88 StreamSubscription _subscription; | 88 StreamSubscription _subscription; |
89 | 89 |
90 /// Whether we have listened on [_sourceStream] and the subscription is done. | 90 /// Whether we have listened on [_sourceStream] and the subscription is done. |
91 bool _isDone = false; | 91 bool _isDone = false; |
92 | 92 |
93 /// Whether a closing operation has been performed on the stream queue. | 93 /// Whether a closing operation has been performed on the stream queue. |
94 /// | 94 /// |
95 /// Closing operations are [cancel] and [rest]. | 95 /// Closing operations are [cancel] and [rest]. |
96 bool _isClosed = false; | 96 bool _isClosed = false; |
97 | 97 |
98 /// Queue of events not used by a request yet. | 98 /// Queue of events not used by a request yet. |
99 final Queue<Result> _eventQueue = new Queue(); | 99 final Queue<Result> _eventQueue = new Queue(); |
100 | 100 |
101 /// Queue of pending requests. | 101 /// Queue of pending requests. |
102 /// | 102 /// |
103 /// Access through methods below to ensure consistency. | 103 /// Access through methods below to ensure consistency. |
104 final Queue<_EventRequest> _requestQueue = new Queue(); | 104 final Queue<_EventRequest> _requestQueue = new Queue(); |
105 | 105 |
106 /// Create a `StreamQueue` of the events of [source]. | 106 /// Create a `StreamQueue` of the events of [source]. |
107 StreamQueue(Stream source) | 107 StreamQueue(Stream source) |
108 : _sourceStream = source is ForkableStream | 108 : _sourceStream = source; |
109 ? source | |
110 : new ForkableStream(source); | |
111 | 109 |
112 /// Asks if the stream has any more events. | 110 /// Asks if the stream has any more events. |
113 /// | 111 /// |
114 /// Returns a future that completes with `true` if the stream has any | 112 /// Returns a future that completes with `true` if the stream has any |
115 /// more events, whether data or error. | 113 /// more events, whether data or error. |
116 /// If the stream closes without producing any more events, the returned | 114 /// If the stream closes without producing any more events, the returned |
117 /// future completes with `false`. | 115 /// future completes with `false`. |
118 /// | 116 /// |
119 /// Can be used before using [next] to avoid getting an error in the | 117 /// Can be used before using [next] to avoid getting an error in the |
120 /// future returned by `next` in the case where there are no more events. | 118 /// future returned by `next` in the case where there are no more events. |
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
223 /// | 221 /// |
224 /// The fork is subscribed to the same underlying stream as this queue, but | 222 /// The fork is subscribed to the same underlying stream as this queue, but |
225 /// it's otherwise wholly independent. If requests are made on one, they don't | 223 /// it's otherwise wholly independent. If requests are made on one, they don't |
226 /// move the other forward; if one is closed, the other is still open. | 224 /// move the other forward; if one is closed, the other is still open. |
227 /// | 225 /// |
228 /// The underlying stream will only be paused when all forks have no | 226 /// The underlying stream will only be paused when all forks have no |
229 /// outstanding requests, and only canceled when all forks are canceled. | 227 /// outstanding requests, and only canceled when all forks are canceled. |
230 StreamQueue<T> fork() { | 228 StreamQueue<T> fork() { |
231 if (_isClosed) throw _failClosed(); | 229 if (_isClosed) throw _failClosed(); |
232 | 230 |
| 231 _ensureListening(); |
| 232 if (_subscription is! ForkableSubscription) { |
| 233 _subscription = new ForkableSubscription<T>(_subscription); |
| 234 _subscription.onData(_onData); |
| 235 _subscription.onError(_onError); |
| 236 _subscription.onDone(_onDone); |
| 237 } |
233 var request = new _ForkRequest<T>(this); | 238 var request = new _ForkRequest<T>(this); |
234 _addRequest(request); | 239 _addRequest(request); |
235 return request.queue; | 240 return request.queue; |
236 } | 241 } |
237 | 242 |
238 /// Cancels the underlying stream subscription. | 243 /// Cancels the underlying stream subscription. |
239 /// | 244 /// |
240 /// If [immediate] is `false` (the default), the cancel operation waits until | 245 /// If [immediate] is `false` (the default), the cancel operation waits until |
241 /// all previously requested events have been processed, then it cancels the | 246 /// all previously requested events have been processed, then it cancels the |
242 /// subscription providing the events. | 247 /// subscription providing the events. |
(...skipping 429 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
672 | 677 |
673 void close(Queue<Result> events) { | 678 void close(Queue<Result> events) { |
674 _completeStream(events); | 679 _completeStream(events); |
675 } | 680 } |
676 | 681 |
677 void _completeStream(Queue<Result> events) { | 682 void _completeStream(Queue<Result> events) { |
678 if (events.isEmpty) { | 683 if (events.isEmpty) { |
679 if (_streamQueue._isDone) { | 684 if (_streamQueue._isDone) { |
680 _completer.setEmpty(); | 685 _completer.setEmpty(); |
681 } else { | 686 } else { |
682 _completer.setSourceStream(_streamQueue._sourceStream.fork()); | 687 assert(_streamQueue._subscription is ForkableSubscription); |
| 688 var stream = new SubscriptionStream<T>(_streamQueue._subscription.fork()
); |
| 689 _completer.setSourceStream(stream); |
683 } | 690 } |
684 } else { | 691 } else { |
685 // There are prefetched events which need to be added before the | 692 // There are prefetched events which need to be added before the |
686 // remaining stream. | 693 // remaining stream. |
687 var controller = new StreamController<T>(); | 694 var controller = new StreamController<T>(); |
688 for (var event in events) { | 695 for (var event in events) { |
689 event.addTo(controller); | 696 event.addTo(controller); |
690 } | 697 } |
691 | 698 |
692 var fork = _streamQueue._sourceStream.fork(); | 699 var fork = new SubscriptionStream(_streamQueue._subscription.fork()); |
693 controller.addStream(fork, cancelOnError: false) | 700 controller.addStream(fork, cancelOnError: false) |
694 .whenComplete(controller.close); | 701 .whenComplete(controller.close); |
695 _completer.setSourceStream(controller.stream); | 702 _completer.setSourceStream(controller.stream); |
696 } | 703 } |
697 } | 704 } |
698 } | 705 } |
OLD | NEW |