OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library async.streams.stream_events; | |
6 | |
7 import 'dart:async'; | |
8 import 'dart:collection'; | |
9 | |
10 import "subscription_stream.dart"; | |
11 import "stream_completer.dart"; | |
12 | |
13 /// An asynchronous pull-based interface for accessing stream events. | |
14 /// | |
15 /// Wraps a stream and makes individual events available on request. | |
16 class StreamEvents<T> { | |
ahe
2015/06/08 16:55:16
It took me quite some time to realize this is the
Lasse Reichstein Nielsen
2015/06/09 07:33:53
This is a little hard to name well.
PullStream wo
| |
17 /// In the initial state, the stream has not been listened to yet. | |
18 /// It will be listened to when the first event is requested. | |
19 /// The `stateData` field holds the stream and the request queue is empty. | |
20 static const int _INITIAL = 0; | |
21 | |
22 /// Listening on the stream. | |
23 /// If the request queue is empty and the subscription isn't done, | |
24 /// the subscription is paused. | |
25 /// The `stateData` field holds the subscription. | |
26 static const int _LISTENING = 1; | |
27 | |
28 /// The stream has completed. | |
29 /// The `stateData` field is `null` and the request queue is empty. | |
30 static const int _DONE = 2; | |
31 | |
32 /// Flag set when [close] is called. | |
33 /// The `StreamEvents` is closed and no further events can be requested. | |
34 /// While set, the last elmement of the request queue is an | |
35 /// [_EventCloseAction]. | |
36 static const int _CLOSED = 8; | |
37 | |
38 /// Flag set when [rest] is called. | |
39 /// Only used for error reporting, otherwise equivalent to [_CLOSED]. | |
40 static const int _CLOSED_REST = 12; | |
41 | |
42 /// Current state. | |
43 /// | |
44 /// Use getters below to check if the state is [_isListening] or [_isDone], | |
45 /// and whether the stream events object [_isClosed]. | |
46 int _state = _INITIAL; | |
47 | |
48 /// Value depending on state. Use getters below to get the value and assert | |
49 /// the expected state. | |
50 var _stateData; | |
51 | |
52 /// Queue of pending requests while state is [_LISTENING]. | |
53 /// Access through methods below to ensure consistency. | |
54 Queue<_EventAction> _requestQueue = new Queue(); | |
55 | |
56 StreamEvents(Stream source) : _stateData = source; | |
57 | |
58 bool get _isListening => (_state & _LISTENING) != 0; | |
59 bool get _isClosed => (_state & _CLOSED) != 0; | |
60 bool get _isDone => (_state & _DONE) != 0; | |
61 | |
62 /// Whether the underlying stream is spent. | |
63 /// | |
64 /// This may return true before all events have been delivered. | |
65 /// Requesting a new event when [isDone] returns true, | |
66 /// for example using [next], will always fail. | |
67 bool get isDone => _isDone; | |
68 | |
69 /// Return the stream subscription while state is listening. | |
70 StreamSubscription get _subscription { | |
71 assert(_isListening); | |
72 return _stateData; | |
73 } | |
74 | |
75 /// Return the source stream while state is initial. | |
76 Stream get _sourceStream { | |
77 assert(!_isListening); | |
78 assert(!_isDone); | |
79 return _stateData; | |
80 } | |
81 | |
82 // Set the subscription and transition to listening state. | |
83 void set _subscription(StreamSubscription subscription) { | |
84 assert(!_isListening); | |
85 assert(!_isDone); | |
86 _stateData = subscription; | |
87 _state |= _LISTENING; | |
88 } | |
89 | |
90 void _setDone() { | |
91 assert(!_isDone); | |
92 _state = (_state & _CLOSED_REST) | _DONE; | |
93 _stateData = null; | |
94 } | |
95 | |
96 /// Request the next (yet unrequested) event from the stream. | |
97 /// | |
98 /// When the requested event arrives, the returned future is completed with | |
99 /// the event. This is independent of whether the event is a data event or | |
100 /// an error event. | |
101 /// | |
102 /// If the stream closed before an event arrives, the future is completed | |
103 /// with a [StateError]. | |
104 Future<T> get next { | |
105 if (!_isClosed) { | |
106 Completer completer = new Completer<T>(); | |
107 _addAction(new _NextAction(completer)); | |
108 return completer.future; | |
109 } | |
110 throw _failClosed(); | |
111 } | |
112 | |
113 /// Request a stream of all the remaning events of the source stream. | |
114 /// | |
115 /// All requested [next], [skip] or [take] operations are completed | |
116 /// first, and then any remaining events are provided as events of | |
117 /// the returned stream. | |
118 /// | |
119 /// Using `rest` closes the stream events object. After getting the | |
120 /// `rest` it is no longer allowed to request other events, like | |
121 /// after calling [close]. | |
122 Stream<T> get rest { | |
123 if (!_isClosed) { | |
124 _state |= _CLOSED_REST; | |
125 if (_isListening) { | |
126 // We have an active subscription that we want to take over. | |
127 var delayStream = new StreamCompleter<T>(); | |
128 _addAction(new _RestAction<T>(delayStream)); | |
129 return delayStream.stream; | |
130 } | |
131 assert(_requestQueue.isEmpty); | |
132 if (isDone) { | |
133 // TODO(lrn): Add Stream.empty() constructor. | |
134 return new Stream<T>.fromIterable(const []); | |
135 } | |
136 // We have never listened the source stream, so just return that directly. | |
137 Stream result = _sourceStream; | |
138 _setDone(); | |
139 return result; | |
140 } | |
141 throw _failClosed(); | |
142 } | |
143 | |
144 /// Requests to skip the next [count] *data* events. | |
145 /// | |
146 /// The [count] value must be greater than zero. | |
147 /// | |
148 /// When successful, this is equivalent to using [take] | |
149 /// and ignoring the result. | |
150 /// | |
151 /// If an error occurs before `count` data events has | |
152 /// been skipped, the returned future completes with | |
153 /// that error instead. | |
154 /// | |
155 /// If the stream closes before `count` data events, | |
156 /// the remaining unskipped event count is returned. | |
157 /// If the returned future completes with the integer `0`, | |
158 /// then all events were succssfully skipped. If the value | |
159 /// is greater than zero then the stream ended early. | |
160 Future<int> skip(int count) { | |
161 if (count <= 0) throw new RangeError.range(count, 0, null, "count"); | |
162 if (!_isClosed) { | |
163 Completer completer = new Completer<int>(); | |
164 _addAction(new _SkipAction(completer, count)); | |
165 return completer.future; | |
166 } | |
167 throw _failClosed(); | |
168 } | |
169 | |
170 /// Requests the next [count] data events as a list. | |
171 /// | |
172 /// The [count] value must be greater than zero. | |
173 /// | |
174 /// Equivalent to calling [next] `count` times and | |
175 /// storing the data values in a list. | |
176 /// | |
177 /// If an error occurs before `count` data events has | |
178 /// been collected, the returned future completes with | |
179 /// that error instead. | |
180 /// | |
181 /// If the stream closes before `count` data events, | |
182 /// the returned future completes with the list | |
183 /// of data collected so far. That is, the returned | |
184 /// list may have fewer than [count] elements. | |
185 Future<List<T>> take(int count) { | |
186 if (count <= 0) throw new RangeError.range(count, 0, null, "count"); | |
187 if (!_isClosed) { | |
188 Completer completer = new Completer<List<T>>(); | |
189 _addAction(new _TakeAction(completer, count)); | |
190 return completer.future; | |
191 } | |
192 throw _failClosed(); | |
193 } | |
194 | |
195 /// Release the underlying stream subscription. | |
196 /// | |
197 /// The close operation waits until all previously requested | |
198 /// events have been processed, then it cancels the subscription | |
199 /// providing the events. | |
200 /// | |
201 /// The returned future completes with the result of calling | |
202 /// `cancel`. | |
203 /// | |
204 /// After calling `close`, no further events can be requested. | |
205 /// None of [next], [rest], [skip], [take] or [close] may be | |
206 /// called again. | |
207 Future close() { | |
208 if (!_isClosed) { | |
209 _state |= _CLOSED; | |
210 if (!_isListening) { | |
211 assert(_requestQueue.isEmpty); | |
212 if (!_isDone) _setDone(); | |
213 return new Future.value(); | |
214 } | |
215 Completer completer = new Completer(); | |
216 _addAction(new _CloseAction(completer)); | |
217 return completer.future; | |
218 } | |
219 throw _failClosed(); | |
220 } | |
221 | |
222 /// Reused error message. | |
223 Error _failClosed() { | |
224 String cause = | |
225 ((_state & _CLOSED_REST) == _CLOSED_REST) ? "rest" : "close"; | |
226 return new StateError("Already closed by a call to $cause"); | |
227 } | |
228 | |
229 /// Called when requesting an event when the requeust queue is empty. | |
230 /// The underlying subscription is paused in that case, except the very | |
231 /// first time when the subscription needs to be created. | |
232 void _listen() { | |
233 if (_isListening) { | |
234 _subscription.resume(); | |
235 } else if (!_isDone) { | |
236 _subscription = | |
237 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | |
238 } | |
239 } | |
240 | |
241 /// Pauses the underlying subscription. | |
242 /// Called when the request queue is empty and the subscription isn't closed. | |
243 void _pause() { | |
244 assert(_isListening); | |
245 _subscription.pause(); | |
246 } | |
247 | |
248 // Callbacks receiving the events of the source stream. | |
249 void _onData(T data) { | |
250 assert(_requestQueue.isNotEmpty); | |
251 _EventAction action = _nextAction; | |
252 if (action.data(data)) { | |
253 _completeAction(); | |
254 } | |
255 } | |
256 | |
257 void _onError(error, StackTrace stack) { | |
258 assert(_requestQueue.isNotEmpty); | |
259 _EventAction action = _nextAction; | |
260 action.error(error, stack); | |
261 _completeAction(); | |
262 } | |
263 | |
264 void _onDone() { | |
265 _setDone(); | |
266 _flushQueue(); | |
267 } | |
268 | |
269 // Request queue management. | |
270 | |
271 /// Get the next action in the queue, but don't remove it. | |
272 _EventAction get _nextAction { | |
273 assert(_requestQueue.isNotEmpty); | |
274 return _requestQueue.first; | |
275 } | |
276 | |
277 /// Add a new request to the queue. | |
278 void _addAction(_EventAction action) { | |
279 if (_isDone) { | |
280 action.done(); | |
281 return; | |
282 } | |
283 if (_requestQueue.isEmpty) { | |
284 _listen(); | |
285 } | |
286 _requestQueue.add(action); | |
287 _checkClose(); | |
288 } | |
289 | |
290 /// Remove all requests and call their `done` event. | |
291 /// | |
292 /// Used when the source stream dries up. | |
293 void _flushQueue() { | |
294 while (_requestQueue.isNotEmpty) { | |
295 _requestQueue.removeFirst().done(); | |
296 } | |
297 } | |
298 | |
299 /// Remove a completed action from the queue. | |
300 /// | |
301 /// An action is complete when its `data` or `error` handler | |
302 /// returns true. For actions that use multiple | |
303 void _completeAction() { | |
304 _requestQueue.removeFirst(); | |
305 if (_requestQueue.isEmpty) { | |
306 _pause(); | |
307 } else { | |
308 _checkClose(); | |
309 } | |
310 } | |
311 | |
312 /// Check whether the only remaining action in the queue is a close action. | |
313 /// | |
314 /// If so, pass the subscription to the action and let it take over. | |
315 void _checkClose() { | |
316 // Close-actions are executed immediately when they become the | |
317 // next (and last) event in the queue. | |
318 // When _isClosed and the queue is not empty, the last element | |
319 // of the queue is the close action. | |
320 if (_isClosed && _requestQueue.length == 1) { | |
321 _EventCloseAction action = _requestQueue.removeFirst(); | |
322 StreamSubscription subscription = _subscription; | |
323 _setDone(); | |
324 action.close(subscription); | |
325 } | |
326 } | |
327 } | |
328 | |
329 | |
330 /// Action to take when a requested event arrives. | |
331 abstract class _EventAction { | |
332 bool data(data); | |
333 void error(error, StackTrace stack); | |
334 void done(); | |
335 } | |
336 | |
337 /// Action to take when closing the [StreamEvents] class. | |
338 abstract class _EventCloseAction implements _EventAction { | |
339 void close(StreamSubscription subscription); | |
340 } | |
341 | |
342 | |
343 /// Action completing a [StreamEvents.next] request. | |
344 class _NextAction implements _EventAction { | |
345 Completer completer; | |
346 _NextAction(this.completer); | |
347 | |
348 bool data(data) { | |
349 completer.complete(data); | |
350 return true; | |
351 } | |
352 | |
353 void error(error, StackTrace stack) { | |
354 completer.completeError(error, stack); | |
355 } | |
356 | |
357 void done() { | |
358 completer.completeError(new StateError("no elements")); | |
359 } | |
360 } | |
361 | |
362 /// Action completing a [StreamEvents.skip] request. | |
363 class _SkipAction implements _EventAction { | |
364 Completer completer; | |
365 int count; | |
366 _SkipAction(this.completer, this.count); | |
367 | |
368 bool data(data) { | |
369 count--; | |
370 if (count > 0) return false; | |
371 completer.complete(count); | |
372 return true; | |
373 } | |
374 | |
375 void error(error, StackTrace stack) { | |
376 completer.completeError(error, stack); | |
377 } | |
378 | |
379 void done() { | |
380 completer.complete(count); | |
381 } | |
382 } | |
383 | |
384 /// Action completing a [StreamEvents.take] request. | |
385 class _TakeAction<T> implements _EventAction { | |
386 final Completer completer; | |
387 final int count; | |
388 List list = <T>[]; | |
389 _TakeAction(this.completer, this.count); | |
390 | |
391 bool data(data) { | |
392 list.add(data); | |
393 if (list.length < count) return false; | |
394 completer.complete(list); | |
395 return true; | |
396 } | |
397 | |
398 void error(error, StackTrace stack) { | |
399 completer.completeError(error, stack); | |
400 } | |
401 | |
402 void done() { | |
403 completer.complete(list); | |
404 } | |
405 } | |
406 | |
407 /// Action completing a [StreamEvents.close] request. | |
408 class _CloseAction implements _EventCloseAction { | |
409 final Completer completer; | |
410 | |
411 _CloseAction(this.completer); | |
412 | |
413 bool data(data) { | |
414 throw new UnsupportedError("event"); | |
415 } | |
416 | |
417 void error(e, StackTrace stack) { | |
418 throw new UnsupportedError("event"); | |
419 } | |
420 | |
421 void done() { | |
422 completer.complete(); | |
423 } | |
424 | |
425 void close(StreamSubscription subscription) { | |
426 completer.complete(subscription.cancel()); | |
427 } | |
428 } | |
429 | |
430 /// Action completing a [StreamEvents.rest] request. | |
431 class _RestAction<T> implements _EventCloseAction { | |
432 final StreamCompleter delayStream; | |
433 _RestAction(this.delayStream); | |
434 | |
435 bool data(data) { | |
436 throw new UnsupportedError("event"); | |
437 } | |
438 | |
439 void error(e, StackTrace stack) { | |
440 throw new UnsupportedError("event"); | |
441 } | |
442 | |
443 void done() { | |
444 delayStream.setEmpty(); | |
445 } | |
446 | |
447 void close(StreamSubscription subscription) { | |
448 delayStream.setSourceStream(new SubscriptionStream<T>(subscription)); | |
449 } | |
450 } | |
OLD | NEW |