Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(325)

Side by Side Diff: lib/src/stream_queue.dart

Issue 1238503004: Alternative implementation of StreamQueue.fork which has no impact if it Base URL: https://github.com/dart-lang/async@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | lib/src/subscription_fork.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.stream_events; 5 library async.stream_events;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 9
10 import "forkable_stream.dart"; 10 import "subscription_fork.dart";
11 import "subscription_stream.dart"; 11 import "subscription_stream.dart";
12 import "stream_completer.dart"; 12 import "stream_completer.dart";
13 import "../result.dart"; 13 import "../result.dart";
14 14
15 /// An asynchronous pull-based interface for accessing stream events. 15 /// An asynchronous pull-based interface for accessing stream events.
16 /// 16 ///
17 /// Wraps a stream and makes individual events available on request. 17 /// Wraps a stream and makes individual events available on request.
18 /// 18 ///
19 /// You can request (and reserve) one or more events from the stream, 19 /// You can request (and reserve) one or more events from the stream,
20 /// and after all previous requests have been fulfilled, stream events 20 /// and after all previous requests have been fulfilled, stream events
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
72 // again when new events are available. 72 // again when new events are available.
73 // The request can remove events that it uses, or keep them in the event 73 // The request can remove events that it uses, or keep them in the event
74 // queue until it has all that it needs. 74 // queue until it has all that it needs.
75 // 75 //
76 // This model is very flexible and easily extensible. 76 // This model is very flexible and easily extensible.
77 // It allows requests that don't consume events (like [hasNext]) or 77 // It allows requests that don't consume events (like [hasNext]) or
78 // potentially a request that takes either five or zero events, determined 78 // potentially a request that takes either five or zero events, determined
79 // by the content of the fifth event. 79 // by the content of the fifth event.
80 80
81 /// Source of events. 81 /// Source of events.
82 final ForkableStream _sourceStream; 82 final Stream _sourceStream;
83 83
84 /// Subscription on [_sourceStream] while listening for events. 84 /// Subscription on [_sourceStream] while listening for events.
85 /// 85 ///
86 /// Set to subscription when listening, and set to `null` when the 86 /// Set to subscription when listening, and set to `null` when the
87 /// subscription is done (and [_isDone] is set to true). 87 /// subscription is done (and [_isDone] is set to true).
88 StreamSubscription _subscription; 88 StreamSubscription _subscription;
89 89
90 /// Whether we have listened on [_sourceStream] and the subscription is done. 90 /// Whether we have listened on [_sourceStream] and the subscription is done.
91 bool _isDone = false; 91 bool _isDone = false;
92 92
93 /// Whether a closing operation has been performed on the stream queue. 93 /// Whether a closing operation has been performed on the stream queue.
94 /// 94 ///
95 /// Closing operations are [cancel] and [rest]. 95 /// Closing operations are [cancel] and [rest].
96 bool _isClosed = false; 96 bool _isClosed = false;
97 97
98 /// Queue of events not used by a request yet. 98 /// Queue of events not used by a request yet.
99 final Queue<Result> _eventQueue = new Queue(); 99 final Queue<Result> _eventQueue = new Queue();
100 100
101 /// Queue of pending requests. 101 /// Queue of pending requests.
102 /// 102 ///
103 /// Access through methods below to ensure consistency. 103 /// Access through methods below to ensure consistency.
104 final Queue<_EventRequest> _requestQueue = new Queue(); 104 final Queue<_EventRequest> _requestQueue = new Queue();
105 105
106 /// Create a `StreamQueue` of the events of [source]. 106 /// Create a `StreamQueue` of the events of [source].
107 StreamQueue(Stream source) 107 StreamQueue(Stream source)
108 : _sourceStream = source is ForkableStream 108 : _sourceStream = source;
109 ? source
110 : new ForkableStream(source);
111 109
112 /// Asks if the stream has any more events. 110 /// Asks if the stream has any more events.
113 /// 111 ///
114 /// Returns a future that completes with `true` if the stream has any 112 /// Returns a future that completes with `true` if the stream has any
115 /// more events, whether data or error. 113 /// more events, whether data or error.
116 /// If the stream closes without producing any more events, the returned 114 /// If the stream closes without producing any more events, the returned
117 /// future completes with `false`. 115 /// future completes with `false`.
118 /// 116 ///
119 /// Can be used before using [next] to avoid getting an error in the 117 /// Can be used before using [next] to avoid getting an error in the
120 /// future returned by `next` in the case where there are no more events. 118 /// future returned by `next` in the case where there are no more events.
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after
223 /// 221 ///
224 /// The fork is subscribed to the same underlying stream as this queue, but 222 /// The fork is subscribed to the same underlying stream as this queue, but
225 /// it's otherwise wholly independent. If requests are made on one, they don't 223 /// it's otherwise wholly independent. If requests are made on one, they don't
226 /// move the other forward; if one is closed, the other is still open. 224 /// move the other forward; if one is closed, the other is still open.
227 /// 225 ///
228 /// The underlying stream will only be paused when all forks have no 226 /// The underlying stream will only be paused when all forks have no
229 /// outstanding requests, and only canceled when all forks are canceled. 227 /// outstanding requests, and only canceled when all forks are canceled.
230 StreamQueue<T> fork() { 228 StreamQueue<T> fork() {
231 if (_isClosed) throw _failClosed(); 229 if (_isClosed) throw _failClosed();
232 230
231 _ensureListening();
232 if (_subscription is! ForkableSubscription) {
233 _subscription = new ForkableSubscription<T>(_subscription);
234 _subscription.onData(_onData);
235 _subscription.onError(_onError);
236 _subscription.onDone(_onDone);
237 }
233 var request = new _ForkRequest<T>(this); 238 var request = new _ForkRequest<T>(this);
234 _addRequest(request); 239 _addRequest(request);
235 return request.queue; 240 return request.queue;
236 } 241 }
237 242
238 /// Cancels the underlying stream subscription. 243 /// Cancels the underlying stream subscription.
239 /// 244 ///
240 /// If [immediate] is `false` (the default), the cancel operation waits until 245 /// If [immediate] is `false` (the default), the cancel operation waits until
241 /// all previously requested events have been processed, then it cancels the 246 /// all previously requested events have been processed, then it cancels the
242 /// subscription providing the events. 247 /// subscription providing the events.
(...skipping 429 matching lines...) Expand 10 before | Expand all | Expand 10 after
672 677
673 void close(Queue<Result> events) { 678 void close(Queue<Result> events) {
674 _completeStream(events); 679 _completeStream(events);
675 } 680 }
676 681
677 void _completeStream(Queue<Result> events) { 682 void _completeStream(Queue<Result> events) {
678 if (events.isEmpty) { 683 if (events.isEmpty) {
679 if (_streamQueue._isDone) { 684 if (_streamQueue._isDone) {
680 _completer.setEmpty(); 685 _completer.setEmpty();
681 } else { 686 } else {
682 _completer.setSourceStream(_streamQueue._sourceStream.fork()); 687 assert(_streamQueue._subscription is ForkableSubscription);
688 var stream = new SubscriptionStream<T>(_streamQueue._subscription.fork() );
689 _completer.setSourceStream(stream);
683 } 690 }
684 } else { 691 } else {
685 // There are prefetched events which need to be added before the 692 // There are prefetched events which need to be added before the
686 // remaining stream. 693 // remaining stream.
687 var controller = new StreamController<T>(); 694 var controller = new StreamController<T>();
688 for (var event in events) { 695 for (var event in events) {
689 event.addTo(controller); 696 event.addTo(controller);
690 } 697 }
691 698
692 var fork = _streamQueue._sourceStream.fork(); 699 var fork = new SubscriptionStream(_streamQueue._subscription.fork());
693 controller.addStream(fork, cancelOnError: false) 700 controller.addStream(fork, cancelOnError: false)
694 .whenComplete(controller.close); 701 .whenComplete(controller.close);
695 _completer.setSourceStream(controller.stream); 702 _completer.setSourceStream(controller.stream);
696 } 703 }
697 } 704 }
698 } 705 }
OLDNEW
« no previous file with comments | « no previous file | lib/src/subscription_fork.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698