Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library async.stream_events; | 5 library async.stream_events; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection'; | 8 import 'dart:collection'; |
| 9 | 9 |
| 10 import "forkable_stream.dart"; | |
| 10 import "subscription_stream.dart"; | 11 import "subscription_stream.dart"; |
| 11 import "stream_completer.dart"; | 12 import "stream_completer.dart"; |
| 12 import "../result.dart"; | 13 import "../result.dart"; |
| 13 | 14 |
| 14 /// An asynchronous pull-based interface for accessing stream events. | 15 /// An asynchronous pull-based interface for accessing stream events. |
| 15 /// | 16 /// |
| 16 /// Wraps a stream and makes individual events available on request. | 17 /// Wraps a stream and makes individual events available on request. |
| 17 /// | 18 /// |
| 18 /// You can request (and reserve) one or more events from the stream, | 19 /// You can request (and reserve) one or more events from the stream, |
| 19 /// and after all previous requests have been fulfilled, stream events | 20 /// and after all previous requests have been fulfilled, stream events |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 71 // again when new events are available. | 72 // again when new events are available. |
| 72 // The request can remove events that it uses, or keep them in the event | 73 // The request can remove events that it uses, or keep them in the event |
| 73 // queue until it has all that it needs. | 74 // queue until it has all that it needs. |
| 74 // | 75 // |
| 75 // This model is very flexible and easily extensible. | 76 // This model is very flexible and easily extensible. |
| 76 // It allows requests that don't consume events (like [hasNext]) or | 77 // It allows requests that don't consume events (like [hasNext]) or |
| 77 // potentially a request that takes either five or zero events, determined | 78 // potentially a request that takes either five or zero events, determined |
| 78 // by the content of the fifth event. | 79 // by the content of the fifth event. |
| 79 | 80 |
| 80 /// Source of events. | 81 /// Source of events. |
| 81 final Stream _sourceStream; | 82 final ForkableStream _sourceStream; |
| 82 | 83 |
| 83 /// Subscription on [_sourceStream] while listening for events. | 84 /// Subscription on [_sourceStream] while listening for events. |
| 84 /// | 85 /// |
| 85 /// Set to subscription when listening, and set to `null` when the | 86 /// Set to subscription when listening, and set to `null` when the |
| 86 /// subscription is done (and [_isDone] is set to true). | 87 /// subscription is done (and [_isDone] is set to true). |
| 87 StreamSubscription _subscription; | 88 StreamSubscription _subscription; |
| 88 | 89 |
| 89 /// Whether we have listened on [_sourceStream] and the subscription is done. | 90 /// Whether we have listened on [_sourceStream] and the subscription is done. |
| 90 bool _isDone = false; | 91 bool _isDone = false; |
| 91 | 92 |
| 92 /// Whether a closing operation has been performed on the stream queue. | 93 /// Whether a closing operation has been performed on the stream queue. |
| 93 /// | 94 /// |
| 94 /// Closing operations are [cancel] and [rest]. | 95 /// Closing operations are [cancel] and [rest]. |
| 95 bool _isClosed = false; | 96 bool _isClosed = false; |
| 96 | 97 |
| 97 /// Queue of events not used by a request yet. | 98 /// Queue of events not used by a request yet. |
| 98 final Queue<Result> _eventQueue = new Queue(); | 99 final Queue<Result> _eventQueue = new Queue(); |
| 99 | 100 |
| 100 /// Queue of pending requests. | 101 /// Queue of pending requests. |
| 101 /// | 102 /// |
| 102 /// Access through methods below to ensure consistency. | 103 /// Access through methods below to ensure consistency. |
| 103 final Queue<_EventRequest> _requestQueue = new Queue(); | 104 final Queue<_EventRequest> _requestQueue = new Queue(); |
| 104 | 105 |
| 105 /// Create a `StreamQueue` of the events of [source]. | 106 /// Create a `StreamQueue` of the events of [source]. |
| 106 StreamQueue(Stream source) | 107 StreamQueue(Stream source) |
| 107 : _sourceStream = source; | 108 : _sourceStream = source is ForkableStream |
| 109 ? source | |
| 110 : new ForkableStream(source); | |
|
Lasse Reichstein Nielsen
2015/07/15 20:10:51
Would it be possible to make the fork method witho
nweiz
2015/07/15 22:19:43
I don't think this is possible without drastically
Lasse Reichstein Nielsen
2015/07/16 14:01:23
I still think this could/should be handled at a di
nweiz
2015/07/17 20:30:16
In your example, just forking a stream causes it t
| |
| 108 | 111 |
| 109 /// Asks if the stream has any more events. | 112 /// Asks if the stream has any more events. |
| 110 /// | 113 /// |
| 111 /// Returns a future that completes with `true` if the stream has any | 114 /// Returns a future that completes with `true` if the stream has any |
| 112 /// more events, whether data or error. | 115 /// more events, whether data or error. |
| 113 /// If the stream closes without producing any more events, the returned | 116 /// If the stream closes without producing any more events, the returned |
| 114 /// future completes with `false`. | 117 /// future completes with `false`. |
| 115 /// | 118 /// |
| 116 /// Can be used before using [next] to avoid getting an error in the | 119 /// Can be used before using [next] to avoid getting an error in the |
| 117 /// future returned by `next` in the case where there are no more events. | 120 /// future returned by `next` in the case where there are no more events. |
| (...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 209 Future<List<T>> take(int count) { | 212 Future<List<T>> take(int count) { |
| 210 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 213 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| 211 if (!_isClosed) { | 214 if (!_isClosed) { |
| 212 var request = new _TakeRequest<T>(count); | 215 var request = new _TakeRequest<T>(count); |
| 213 _addRequest(request); | 216 _addRequest(request); |
| 214 return request.future; | 217 return request.future; |
| 215 } | 218 } |
| 216 throw _failClosed(); | 219 throw _failClosed(); |
| 217 } | 220 } |
| 218 | 221 |
| 222 /// Creates a new stream queue in the same position as this one. | |
| 223 /// | |
| 224 /// The fork is subscribed to the same underlying stream as this queue, but | |
| 225 /// it's otherwise wholly independent. If requests are made on one, they don't | |
| 226 /// move the other forward; if one is closed, the other is still open. | |
| 227 /// | |
| 228 /// The underlying stream will only be paused when all forks have no | |
| 229 /// outstanding requests, and only canceled when all forks are canceled. | |
| 230 StreamQueue<T> fork() { | |
|
Lasse Reichstein Nielsen
2015/07/16 14:01:23
I have another problem with fork: It may cause buf
nweiz
2015/07/17 20:30:16
I can't emphasize enough how important forking is
| |
| 231 if (_isClosed) throw _failClosed(); | |
| 232 | |
| 233 var request = new _ForkRequest<T>(this); | |
| 234 _addRequest(request); | |
| 235 return request.queue; | |
| 236 } | |
| 237 | |
| 219 /// Cancels the underlying stream subscription. | 238 /// Cancels the underlying stream subscription. |
| 220 /// | 239 /// |
| 221 /// If [immediate] is `false` (the default), the cancel operation waits until | 240 /// If [immediate] is `false` (the default), the cancel operation waits until |
| 222 /// all previously requested events have been processed, then it cancels the | 241 /// all previously requested events have been processed, then it cancels the |
| 223 /// subscription providing the events. | 242 /// subscription providing the events. |
| 224 /// | 243 /// |
| 225 /// If [immediate] is `true`, the subscription is instead canceled | 244 /// If [immediate] is `true`, the subscription is instead canceled |
| 226 /// immediately. Any pending events are completed as though the underlying | 245 /// immediately. Any pending events are completed as though the underlying |
| 227 /// stream had closed. | 246 /// stream had closed. |
| 228 /// | 247 /// |
| 229 /// The returned future completes with the result of calling | 248 /// The returned future completes with the result of calling |
| 230 /// `cancel`. | 249 /// `cancel`. |
| 231 /// | 250 /// |
| 232 /// After calling `cancel`, no further events can be requested. | 251 /// After calling `cancel`, no further events can be requested. |
| 233 /// None of [next], [rest], [skip], [take] or [cancel] may be | 252 /// None of [next], [rest], [skip], [take] or [cancel] may be |
| 234 /// called again. | 253 /// called again. |
| 235 Future cancel({bool immediate: false}) { | 254 Future cancel({bool immediate: false}) { |
| 236 if (_isClosed) throw _failClosed(); | 255 if (_isClosed) throw _failClosed(); |
| 237 _isClosed = true; | 256 _isClosed = true; |
| 238 | 257 |
| 258 if (_isDone) return new Future.value(); | |
| 259 if (_subscription == null) _subscription = _sourceStream.listen(null); | |
| 260 | |
| 239 if (!immediate) { | 261 if (!immediate) { |
| 240 var request = new _CancelRequest(this); | 262 var request = new _CancelRequest(this); |
| 241 _addRequest(request); | 263 _addRequest(request); |
| 242 return request.future; | 264 return request.future; |
| 243 } | 265 } |
| 244 | 266 |
| 245 if (_isDone) return new Future.value(); | |
| 246 if (_subscription == null) _subscription = _sourceStream.listen(null); | |
| 247 var future = _subscription.cancel(); | 267 var future = _subscription.cancel(); |
| 248 _onDone(); | 268 _onDone(); |
| 249 return future; | 269 return future; |
| 250 } | 270 } |
| 251 | 271 |
| 252 /// Returns an error for when a request is made after cancel. | 272 /// Returns an error for when a request is made after cancel. |
| 253 /// | 273 /// |
| 254 /// Returns a [StateError] with a message saying that either | 274 /// Returns a [StateError] with a message saying that either |
| 255 /// [cancel] or [rest] have already been called. | 275 /// [cancel] or [rest] have already been called. |
| 256 Error _failClosed() { | 276 Error _failClosed() { |
| (...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 326 /// | 346 /// |
| 327 /// Called after receiving an event. | 347 /// Called after receiving an event. |
| 328 void _checkQueues() { | 348 void _checkQueues() { |
| 329 while (_requestQueue.isNotEmpty) { | 349 while (_requestQueue.isNotEmpty) { |
| 330 if (_requestQueue.first.addEvents(_eventQueue)) { | 350 if (_requestQueue.first.addEvents(_eventQueue)) { |
| 331 _requestQueue.removeFirst(); | 351 _requestQueue.removeFirst(); |
| 332 } else { | 352 } else { |
| 333 return; | 353 return; |
| 334 } | 354 } |
| 335 } | 355 } |
| 356 | |
| 336 if (!_isDone) { | 357 if (!_isDone) { |
| 337 _subscription.pause(); | 358 _subscription.pause(); |
| 338 } | 359 } |
| 339 } | 360 } |
| 340 | 361 |
| 341 /// Extracts the subscription and makes this stream queue unusable. | 362 /// Extracts the subscription and makes this stream queue unusable. |
| 342 /// | 363 /// |
| 343 /// Can only be used by the very last request. | 364 /// Can only be used by the very last request. |
| 344 StreamSubscription _dispose() { | 365 StreamSubscription _dispose() { |
| 345 assert(_isClosed); | 366 assert(_isClosed); |
| (...skipping 274 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 620 _completer.complete(true); | 641 _completer.complete(true); |
| 621 return true; | 642 return true; |
| 622 } | 643 } |
| 623 return false; | 644 return false; |
| 624 } | 645 } |
| 625 | 646 |
| 626 void close(_) { | 647 void close(_) { |
| 627 _completer.complete(false); | 648 _completer.complete(false); |
| 628 } | 649 } |
| 629 } | 650 } |
| 651 | |
| 652 /// Request for a [StreamQueue.fork] call. | |
| 653 class _ForkRequest<T> implements _EventRequest { | |
| 654 /// Completer for the stream used by the queue by the `fork` call. | |
| 655 StreamCompleter _completer; | |
| 656 | |
| 657 StreamQueue<T> queue; | |
| 658 | |
| 659 /// The [StreamQueue] object that has this request queued. | |
| 660 final StreamQueue _streamQueue; | |
| 661 | |
| 662 _ForkRequest(this._streamQueue) { | |
| 663 _completer = new StreamCompleter<T>(); | |
| 664 queue = new StreamQueue<T>(_completer.stream); | |
| 665 } | |
| 666 | |
| 667 bool addEvents(Queue<Result> events) { | |
| 668 _completeStream(events); | |
| 669 return true; | |
| 670 } | |
| 671 | |
| 672 void close(Queue<Result> events) { | |
| 673 _completeStream(events); | |
| 674 } | |
| 675 | |
| 676 void _completeStream(Queue<Result> events) { | |
| 677 if (events.isEmpty) { | |
| 678 if (_streamQueue._isDone) { | |
| 679 _completer.setEmpty(); | |
| 680 } else { | |
| 681 _completer.setSourceStream(_streamQueue._sourceStream.fork()); | |
| 682 } | |
| 683 } else { | |
| 684 // There are prefetched events which need to be added before the | |
| 685 // remaining stream. | |
| 686 var controller = new StreamController<T>(); | |
| 687 for (var event in events) { | |
| 688 event.addTo(controller); | |
| 689 } | |
| 690 | |
| 691 var fork = _streamQueue._sourceStream.fork(); | |
| 692 controller.addStream(fork, cancelOnError: false) | |
| 693 .whenComplete(controller.close); | |
| 694 _completer.setSourceStream(controller.stream); | |
| 695 } | |
| 696 } | |
| 697 } | |
| OLD | NEW |