OLD | NEW |
---|---|
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.stream_events; | 5 library async.stream_events; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 | 9 |
10 import "forkable_stream.dart"; | |
10 import "subscription_stream.dart"; | 11 import "subscription_stream.dart"; |
11 import "stream_completer.dart"; | 12 import "stream_completer.dart"; |
12 import "../result.dart"; | 13 import "../result.dart"; |
13 | 14 |
14 /// An asynchronous pull-based interface for accessing stream events. | 15 /// An asynchronous pull-based interface for accessing stream events. |
15 /// | 16 /// |
16 /// Wraps a stream and makes individual events available on request. | 17 /// Wraps a stream and makes individual events available on request. |
17 /// | 18 /// |
18 /// You can request (and reserve) one or more events from the stream, | 19 /// You can request (and reserve) one or more events from the stream, |
19 /// and after all previous requests have been fulfilled, stream events | 20 /// and after all previous requests have been fulfilled, stream events |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
71 // again when new events are available. | 72 // again when new events are available. |
72 // The request can remove events that it uses, or keep them in the event | 73 // The request can remove events that it uses, or keep them in the event |
73 // queue until it has all that it needs. | 74 // queue until it has all that it needs. |
74 // | 75 // |
75 // This model is very flexible and easily extensible. | 76 // This model is very flexible and easily extensible. |
76 // It allows requests that don't consume events (like [hasNext]) or | 77 // It allows requests that don't consume events (like [hasNext]) or |
77 // potentially a request that takes either five or zero events, determined | 78 // potentially a request that takes either five or zero events, determined |
78 // by the content of the fifth event. | 79 // by the content of the fifth event. |
79 | 80 |
80 /// Source of events. | 81 /// Source of events. |
81 final Stream _sourceStream; | 82 final ForkableStream _sourceStream; |
82 | 83 |
83 /// Subscription on [_sourceStream] while listening for events. | 84 /// Subscription on [_sourceStream] while listening for events. |
84 /// | 85 /// |
85 /// Set to subscription when listening, and set to `null` when the | 86 /// Set to subscription when listening, and set to `null` when the |
86 /// subscription is done (and [_isDone] is set to true). | 87 /// subscription is done (and [_isDone] is set to true). |
87 StreamSubscription _subscription; | 88 StreamSubscription _subscription; |
88 | 89 |
89 /// Whether we have listened on [_sourceStream] and the subscription is done. | 90 /// Whether we have listened on [_sourceStream] and the subscription is done. |
90 bool _isDone = false; | 91 bool _isDone = false; |
91 | 92 |
92 /// Whether a closing operation has been performed on the stream queue. | 93 /// Whether a closing operation has been performed on the stream queue. |
93 /// | 94 /// |
94 /// Closing operations are [cancel] and [rest]. | 95 /// Closing operations are [cancel] and [rest]. |
95 bool _isClosed = false; | 96 bool _isClosed = false; |
96 | 97 |
97 /// Queue of events not used by a request yet. | 98 /// Queue of events not used by a request yet. |
98 final Queue<Result> _eventQueue = new Queue(); | 99 final Queue<Result> _eventQueue = new Queue(); |
99 | 100 |
100 /// Queue of pending requests. | 101 /// Queue of pending requests. |
101 /// | 102 /// |
102 /// Access through methods below to ensure consistency. | 103 /// Access through methods below to ensure consistency. |
103 final Queue<_EventRequest> _requestQueue = new Queue(); | 104 final Queue<_EventRequest> _requestQueue = new Queue(); |
104 | 105 |
105 /// Create a `StreamQueue` of the events of [source]. | 106 /// Create a `StreamQueue` of the events of [source]. |
106 StreamQueue(Stream source) | 107 StreamQueue(Stream source) |
107 : _sourceStream = source; | 108 : _sourceStream = source is ForkableStream |
109 ? source | |
110 : new ForkableStream(source); | |
Lasse Reichstein Nielsen
2015/07/15 20:10:51
Would it be possible to make the fork method witho
nweiz
2015/07/15 22:19:43
I don't think this is possible without drastically
Lasse Reichstein Nielsen
2015/07/16 14:01:23
I still think this could/should be handled at a di
nweiz
2015/07/17 20:30:16
In your example, just forking a stream causes it t
| |
108 | 111 |
109 /// Asks if the stream has any more events. | 112 /// Asks if the stream has any more events. |
110 /// | 113 /// |
111 /// Returns a future that completes with `true` if the stream has any | 114 /// Returns a future that completes with `true` if the stream has any |
112 /// more events, whether data or error. | 115 /// more events, whether data or error. |
113 /// If the stream closes without producing any more events, the returned | 116 /// If the stream closes without producing any more events, the returned |
114 /// future completes with `false`. | 117 /// future completes with `false`. |
115 /// | 118 /// |
116 /// Can be used before using [next] to avoid getting an error in the | 119 /// Can be used before using [next] to avoid getting an error in the |
117 /// future returned by `next` in the case where there are no more events. | 120 /// future returned by `next` in the case where there are no more events. |
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
209 Future<List<T>> take(int count) { | 212 Future<List<T>> take(int count) { |
210 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 213 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
211 if (!_isClosed) { | 214 if (!_isClosed) { |
212 var request = new _TakeRequest<T>(count); | 215 var request = new _TakeRequest<T>(count); |
213 _addRequest(request); | 216 _addRequest(request); |
214 return request.future; | 217 return request.future; |
215 } | 218 } |
216 throw _failClosed(); | 219 throw _failClosed(); |
217 } | 220 } |
218 | 221 |
222 /// Creates a new stream queue in the same position as this one. | |
223 /// | |
224 /// The fork is subscribed to the same underlying stream as this queue, but | |
225 /// it's otherwise wholly independent. If requests are made on one, they don't | |
226 /// move the other forward; if one is closed, the other is still open. | |
227 /// | |
228 /// The underlying stream will only be paused when all forks have no | |
229 /// outstanding requests, and only canceled when all forks are canceled. | |
230 StreamQueue<T> fork() { | |
Lasse Reichstein Nielsen
2015/07/16 14:01:23
I have another problem with fork: It may cause buf
nweiz
2015/07/17 20:30:16
I can't emphasize enough how important forking is
| |
231 if (_isClosed) throw _failClosed(); | |
232 | |
233 var request = new _ForkRequest<T>(this); | |
234 _addRequest(request); | |
235 return request.queue; | |
236 } | |
237 | |
219 /// Cancels the underlying stream subscription. | 238 /// Cancels the underlying stream subscription. |
220 /// | 239 /// |
221 /// If [immediate] is `false` (the default), the cancel operation waits until | 240 /// If [immediate] is `false` (the default), the cancel operation waits until |
222 /// all previously requested events have been processed, then it cancels the | 241 /// all previously requested events have been processed, then it cancels the |
223 /// subscription providing the events. | 242 /// subscription providing the events. |
224 /// | 243 /// |
225 /// If [immediate] is `true`, the subscription is instead canceled | 244 /// If [immediate] is `true`, the subscription is instead canceled |
226 /// immediately. Any pending events are completed as though the underlying | 245 /// immediately. Any pending events are completed as though the underlying |
227 /// stream had closed. | 246 /// stream had closed. |
228 /// | 247 /// |
229 /// The returned future completes with the result of calling | 248 /// The returned future completes with the result of calling |
230 /// `cancel`. | 249 /// `cancel`. |
231 /// | 250 /// |
232 /// After calling `cancel`, no further events can be requested. | 251 /// After calling `cancel`, no further events can be requested. |
233 /// None of [next], [rest], [skip], [take] or [cancel] may be | 252 /// None of [next], [rest], [skip], [take] or [cancel] may be |
234 /// called again. | 253 /// called again. |
235 Future cancel({bool immediate: false}) { | 254 Future cancel({bool immediate: false}) { |
236 if (_isClosed) throw _failClosed(); | 255 if (_isClosed) throw _failClosed(); |
237 _isClosed = true; | 256 _isClosed = true; |
238 | 257 |
258 if (_isDone) return new Future.value(); | |
259 if (_subscription == null) _subscription = _sourceStream.listen(null); | |
260 | |
239 if (!immediate) { | 261 if (!immediate) { |
240 var request = new _CancelRequest(this); | 262 var request = new _CancelRequest(this); |
241 _addRequest(request); | 263 _addRequest(request); |
242 return request.future; | 264 return request.future; |
243 } | 265 } |
244 | 266 |
245 if (_isDone) return new Future.value(); | |
246 if (_subscription == null) _subscription = _sourceStream.listen(null); | |
247 var future = _subscription.cancel(); | 267 var future = _subscription.cancel(); |
248 _onDone(); | 268 _onDone(); |
249 return future; | 269 return future; |
250 } | 270 } |
251 | 271 |
252 /// Returns an error for when a request is made after cancel. | 272 /// Returns an error for when a request is made after cancel. |
253 /// | 273 /// |
254 /// Returns a [StateError] with a message saying that either | 274 /// Returns a [StateError] with a message saying that either |
255 /// [cancel] or [rest] have already been called. | 275 /// [cancel] or [rest] have already been called. |
256 Error _failClosed() { | 276 Error _failClosed() { |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
326 /// | 346 /// |
327 /// Called after receiving an event. | 347 /// Called after receiving an event. |
328 void _checkQueues() { | 348 void _checkQueues() { |
329 while (_requestQueue.isNotEmpty) { | 349 while (_requestQueue.isNotEmpty) { |
330 if (_requestQueue.first.addEvents(_eventQueue)) { | 350 if (_requestQueue.first.addEvents(_eventQueue)) { |
331 _requestQueue.removeFirst(); | 351 _requestQueue.removeFirst(); |
332 } else { | 352 } else { |
333 return; | 353 return; |
334 } | 354 } |
335 } | 355 } |
356 | |
336 if (!_isDone) { | 357 if (!_isDone) { |
337 _subscription.pause(); | 358 _subscription.pause(); |
338 } | 359 } |
339 } | 360 } |
340 | 361 |
341 /// Extracts the subscription and makes this stream queue unusable. | 362 /// Extracts the subscription and makes this stream queue unusable. |
342 /// | 363 /// |
343 /// Can only be used by the very last request. | 364 /// Can only be used by the very last request. |
344 StreamSubscription _dispose() { | 365 StreamSubscription _dispose() { |
345 assert(_isClosed); | 366 assert(_isClosed); |
(...skipping 274 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
620 _completer.complete(true); | 641 _completer.complete(true); |
621 return true; | 642 return true; |
622 } | 643 } |
623 return false; | 644 return false; |
624 } | 645 } |
625 | 646 |
626 void close(_) { | 647 void close(_) { |
627 _completer.complete(false); | 648 _completer.complete(false); |
628 } | 649 } |
629 } | 650 } |
651 | |
652 /// Request for a [StreamQueue.fork] call. | |
653 class _ForkRequest<T> implements _EventRequest { | |
654 /// Completer for the stream used by the queue by the `fork` call. | |
655 StreamCompleter _completer; | |
656 | |
657 StreamQueue<T> queue; | |
658 | |
659 /// The [StreamQueue] object that has this request queued. | |
660 final StreamQueue _streamQueue; | |
661 | |
662 _ForkRequest(this._streamQueue) { | |
663 _completer = new StreamCompleter<T>(); | |
664 queue = new StreamQueue<T>(_completer.stream); | |
665 } | |
666 | |
667 bool addEvents(Queue<Result> events) { | |
668 _completeStream(events); | |
669 return true; | |
670 } | |
671 | |
672 void close(Queue<Result> events) { | |
673 _completeStream(events); | |
674 } | |
675 | |
676 void _completeStream(Queue<Result> events) { | |
677 if (events.isEmpty) { | |
678 if (_streamQueue._isDone) { | |
679 _completer.setEmpty(); | |
680 } else { | |
681 _completer.setSourceStream(_streamQueue._sourceStream.fork()); | |
682 } | |
683 } else { | |
684 // There are prefetched events which need to be added before the | |
685 // remaining stream. | |
686 var controller = new StreamController<T>(); | |
687 for (var event in events) { | |
688 event.addTo(controller); | |
689 } | |
690 | |
691 var fork = _streamQueue._sourceStream.fork(); | |
692 controller.addStream(fork, cancelOnError: false) | |
693 .whenComplete(controller.close); | |
694 _completer.setSourceStream(controller.stream); | |
695 } | |
696 } | |
697 } | |
OLD | NEW |