Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(133)

Side by Side Diff: lib/src/stream_queue.dart

Issue 1241723003: Add StreamQueue.fork and ForkableStream. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.stream_events; 5 library async.stream_events;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 9
10 import "forkable_stream.dart";
10 import "subscription_stream.dart"; 11 import "subscription_stream.dart";
11 import "stream_completer.dart"; 12 import "stream_completer.dart";
12 import "../result.dart"; 13 import "../result.dart";
13 14
14 /// An asynchronous pull-based interface for accessing stream events. 15 /// An asynchronous pull-based interface for accessing stream events.
15 /// 16 ///
16 /// Wraps a stream and makes individual events available on request. 17 /// Wraps a stream and makes individual events available on request.
17 /// 18 ///
18 /// You can request (and reserve) one or more events from the stream, 19 /// You can request (and reserve) one or more events from the stream,
19 /// and after all previous requests have been fulfilled, stream events 20 /// and after all previous requests have been fulfilled, stream events
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
71 // again when new events are available. 72 // again when new events are available.
72 // The request can remove events that it uses, or keep them in the event 73 // The request can remove events that it uses, or keep them in the event
73 // queue until it has all that it needs. 74 // queue until it has all that it needs.
74 // 75 //
75 // This model is very flexible and easily extensible. 76 // This model is very flexible and easily extensible.
76 // It allows requests that don't consume events (like [hasNext]) or 77 // It allows requests that don't consume events (like [hasNext]) or
77 // potentially a request that takes either five or zero events, determined 78 // potentially a request that takes either five or zero events, determined
78 // by the content of the fifth event. 79 // by the content of the fifth event.
79 80
80 /// Source of events. 81 /// Source of events.
81 final Stream _sourceStream; 82 final ForkableStream _sourceStream;
82 83
83 /// Subscription on [_sourceStream] while listening for events. 84 /// Subscription on [_sourceStream] while listening for events.
84 /// 85 ///
85 /// Set to subscription when listening, and set to `null` when the 86 /// Set to subscription when listening, and set to `null` when the
86 /// subscription is done (and [_isDone] is set to true). 87 /// subscription is done (and [_isDone] is set to true).
87 StreamSubscription _subscription; 88 StreamSubscription _subscription;
88 89
89 /// Whether we have listened on [_sourceStream] and the subscription is done. 90 /// Whether we have listened on [_sourceStream] and the subscription is done.
90 bool _isDone = false; 91 bool _isDone = false;
91 92
92 /// Whether a closing operation has been performed on the stream queue. 93 /// Whether a closing operation has been performed on the stream queue.
93 /// 94 ///
94 /// Closing operations are [cancel] and [rest]. 95 /// Closing operations are [cancel] and [rest].
95 bool _isClosed = false; 96 bool _isClosed = false;
96 97
97 /// Queue of events not used by a request yet. 98 /// Queue of events not used by a request yet.
98 final Queue<Result> _eventQueue = new Queue(); 99 final Queue<Result> _eventQueue = new Queue();
99 100
100 /// Queue of pending requests. 101 /// Queue of pending requests.
101 /// 102 ///
102 /// Access through methods below to ensure consistency. 103 /// Access through methods below to ensure consistency.
103 final Queue<_EventRequest> _requestQueue = new Queue(); 104 final Queue<_EventRequest> _requestQueue = new Queue();
104 105
105 /// Create a `StreamQueue` of the events of [source]. 106 /// Create a `StreamQueue` of the events of [source].
106 StreamQueue(Stream source) 107 StreamQueue(Stream source)
107 : _sourceStream = source; 108 : _sourceStream = source is ForkableStream
109 ? source
110 : new ForkableStream(source);
Lasse Reichstein Nielsen 2015/07/15 20:10:51 Would it be possible to make the fork method witho
nweiz 2015/07/15 22:19:43 I don't think this is possible without drastically
Lasse Reichstein Nielsen 2015/07/16 14:01:23 I still think this could/should be handled at a di
nweiz 2015/07/17 20:30:16 In your example, just forking a stream causes it t
108 111
109 /// Asks if the stream has any more events. 112 /// Asks if the stream has any more events.
110 /// 113 ///
111 /// Returns a future that completes with `true` if the stream has any 114 /// Returns a future that completes with `true` if the stream has any
112 /// more events, whether data or error. 115 /// more events, whether data or error.
113 /// If the stream closes without producing any more events, the returned 116 /// If the stream closes without producing any more events, the returned
114 /// future completes with `false`. 117 /// future completes with `false`.
115 /// 118 ///
116 /// Can be used before using [next] to avoid getting an error in the 119 /// Can be used before using [next] to avoid getting an error in the
117 /// future returned by `next` in the case where there are no more events. 120 /// future returned by `next` in the case where there are no more events.
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after
209 Future<List<T>> take(int count) { 212 Future<List<T>> take(int count) {
210 if (count < 0) throw new RangeError.range(count, 0, null, "count"); 213 if (count < 0) throw new RangeError.range(count, 0, null, "count");
211 if (!_isClosed) { 214 if (!_isClosed) {
212 var request = new _TakeRequest<T>(count); 215 var request = new _TakeRequest<T>(count);
213 _addRequest(request); 216 _addRequest(request);
214 return request.future; 217 return request.future;
215 } 218 }
216 throw _failClosed(); 219 throw _failClosed();
217 } 220 }
218 221
222 /// Creates a new stream queue in the same position as this one.
223 ///
224 /// The fork is subscribed to the same underlying stream as this queue, but
225 /// it's otherwise wholly independent. If requests are made on one, they don't
226 /// move the other forward; if one is closed, the other is still open.
227 ///
228 /// The underlying stream will only be paused when all forks have no
229 /// outstanding requests, and only canceled when all forks are canceled.
230 StreamQueue<T> fork() {
Lasse Reichstein Nielsen 2015/07/16 14:01:23 I have another problem with fork: It may cause buf
nweiz 2015/07/17 20:30:16 I can't emphasize enough how important forking is
231 if (_isClosed) throw _failClosed();
232
233 var request = new _ForkRequest<T>(this);
234 _addRequest(request);
235 return request.queue;
236 }
237
219 /// Cancels the underlying stream subscription. 238 /// Cancels the underlying stream subscription.
220 /// 239 ///
221 /// If [immediate] is `false` (the default), the cancel operation waits until 240 /// If [immediate] is `false` (the default), the cancel operation waits until
222 /// all previously requested events have been processed, then it cancels the 241 /// all previously requested events have been processed, then it cancels the
223 /// subscription providing the events. 242 /// subscription providing the events.
224 /// 243 ///
225 /// If [immediate] is `true`, the subscription is instead canceled 244 /// If [immediate] is `true`, the subscription is instead canceled
226 /// immediately. Any pending events are completed as though the underlying 245 /// immediately. Any pending events are completed as though the underlying
227 /// stream had closed. 246 /// stream had closed.
228 /// 247 ///
229 /// The returned future completes with the result of calling 248 /// The returned future completes with the result of calling
230 /// `cancel`. 249 /// `cancel`.
231 /// 250 ///
232 /// After calling `cancel`, no further events can be requested. 251 /// After calling `cancel`, no further events can be requested.
233 /// None of [next], [rest], [skip], [take] or [cancel] may be 252 /// None of [next], [rest], [skip], [take] or [cancel] may be
234 /// called again. 253 /// called again.
235 Future cancel({bool immediate: false}) { 254 Future cancel({bool immediate: false}) {
236 if (_isClosed) throw _failClosed(); 255 if (_isClosed) throw _failClosed();
237 _isClosed = true; 256 _isClosed = true;
238 257
258 if (_isDone) return new Future.value();
259 if (_subscription == null) _subscription = _sourceStream.listen(null);
260
239 if (!immediate) { 261 if (!immediate) {
240 var request = new _CancelRequest(this); 262 var request = new _CancelRequest(this);
241 _addRequest(request); 263 _addRequest(request);
242 return request.future; 264 return request.future;
243 } 265 }
244 266
245 if (_isDone) return new Future.value();
246 if (_subscription == null) _subscription = _sourceStream.listen(null);
247 var future = _subscription.cancel(); 267 var future = _subscription.cancel();
248 _onDone(); 268 _onDone();
249 return future; 269 return future;
250 } 270 }
251 271
252 /// Returns an error for when a request is made after cancel. 272 /// Returns an error for when a request is made after cancel.
253 /// 273 ///
254 /// Returns a [StateError] with a message saying that either 274 /// Returns a [StateError] with a message saying that either
255 /// [cancel] or [rest] have already been called. 275 /// [cancel] or [rest] have already been called.
256 Error _failClosed() { 276 Error _failClosed() {
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
326 /// 346 ///
327 /// Called after receiving an event. 347 /// Called after receiving an event.
328 void _checkQueues() { 348 void _checkQueues() {
329 while (_requestQueue.isNotEmpty) { 349 while (_requestQueue.isNotEmpty) {
330 if (_requestQueue.first.addEvents(_eventQueue)) { 350 if (_requestQueue.first.addEvents(_eventQueue)) {
331 _requestQueue.removeFirst(); 351 _requestQueue.removeFirst();
332 } else { 352 } else {
333 return; 353 return;
334 } 354 }
335 } 355 }
356
336 if (!_isDone) { 357 if (!_isDone) {
337 _subscription.pause(); 358 _subscription.pause();
338 } 359 }
339 } 360 }
340 361
341 /// Extracts the subscription and makes this stream queue unusable. 362 /// Extracts the subscription and makes this stream queue unusable.
342 /// 363 ///
343 /// Can only be used by the very last request. 364 /// Can only be used by the very last request.
344 StreamSubscription _dispose() { 365 StreamSubscription _dispose() {
345 assert(_isClosed); 366 assert(_isClosed);
(...skipping 274 matching lines...) Expand 10 before | Expand all | Expand 10 after
620 _completer.complete(true); 641 _completer.complete(true);
621 return true; 642 return true;
622 } 643 }
623 return false; 644 return false;
624 } 645 }
625 646
626 void close(_) { 647 void close(_) {
627 _completer.complete(false); 648 _completer.complete(false);
628 } 649 }
629 } 650 }
651
652 /// Request for a [StreamQueue.fork] call.
653 class _ForkRequest<T> implements _EventRequest {
654 /// Completer for the stream used by the queue by the `fork` call.
655 StreamCompleter _completer;
656
657 StreamQueue<T> queue;
658
659 /// The [StreamQueue] object that has this request queued.
660 final StreamQueue _streamQueue;
661
662 _ForkRequest(this._streamQueue) {
663 _completer = new StreamCompleter<T>();
664 queue = new StreamQueue<T>(_completer.stream);
665 }
666
667 bool addEvents(Queue<Result> events) {
668 _completeStream(events);
669 return true;
670 }
671
672 void close(Queue<Result> events) {
673 _completeStream(events);
674 }
675
676 void _completeStream(Queue<Result> events) {
677 if (events.isEmpty) {
678 if (_streamQueue._isDone) {
679 _completer.setEmpty();
680 } else {
681 _completer.setSourceStream(_streamQueue._sourceStream.fork());
682 }
683 } else {
684 // There are prefetched events which need to be added before the
685 // remaining stream.
686 var controller = new StreamController<T>();
687 for (var event in events) {
688 event.addTo(controller);
689 }
690
691 var fork = _streamQueue._sourceStream.fork();
692 controller.addStream(fork, cancelOnError: false)
693 .whenComplete(controller.close);
694 _completer.setSourceStream(controller.stream);
695 }
696 }
697 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698