Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(239)

Side by Side Diff: lib/src/stream_events.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: more docs and tests. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.streams.stream_events;
6
7 import 'dart:async';
8 import 'dart:collection';
9
10 import "subscription_stream.dart";
11 import "stream_completer.dart";
12
13 /// An asynchronous pull-based interface for accessing stream events.
14 ///
15 /// Wraps a stream and makes individual events available on request.
16 ///
17 /// The individual requests are served in the order they are requested.
nweiz 2015/06/12 01:24:25 Explain how this is different from a [StreamIterat
Lasse Reichstein Nielsen 2015/06/17 11:08:29 Done.
18 ///
19 /// Example:
20 ///
21 /// var events = new StreamEvents<String>(someStreamOfLines);
22 /// var first = await events.next;
23 /// while (first.startsWith('#')) {
Søren Gjesse 2015/06/11 07:57:06 Maybe add an isDone check in this sample.
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Hmm. Not good for the flow, but I guess I can try
24 /// // Skip comments.
25 /// first = await events.next;
26 /// }
27 ///
28 /// if (first.startsWith(MAGIC_MARKER)) {
29 /// var headerCount =
30 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1));
31 /// handleMessage(headers: await events.take(headerCount),
32 /// body: events.rest);
33 /// return;
34 /// }
35 /// // Error handling.
36 ///
37 class StreamEvents<T> {
nweiz 2015/06/12 01:24:26 Plural class names always seem weird to me. What d
Lasse Reichstein Nielsen 2015/06/12 13:04:22 It's not a stream, so "PullStream" won't work. The
nweiz 2015/06/16 01:05:23 Bob suggested "StreamPump", using the water analog
Lasse Reichstein Nielsen 2015/06/16 13:05:45 I'm not a great fan of analogies in nameing - it o
nweiz 2015/06/16 22:34:11 All names in programming are analogies at some lev
Lasse Reichstein Nielsen 2015/06/17 11:08:29 I'm trying out StreamQueue now. It is slightly irk
38 /// In the initial state, the stream has not been listened to yet.
nweiz 2015/06/12 01:24:25 Nit: The first paragraph of a doc comment should b
39 /// It will be listened to when the first event is requested.
40 /// The `stateData` field holds the stream and the request queue is empty.
41 static const int _INITIAL = 0;
nweiz 2015/06/12 01:24:25 Nit: since these fields aren't right up next to on
Lasse Reichstein Nielsen 2015/06/12 13:04:22 In my editor it still works very well to show the
42
43 /// Listening on the stream.
44 /// If the request queue is empty and the subscription isn't done,
45 /// the subscription is paused.
46 /// The `stateData` field holds the subscription.
47 static const int _LISTENING = 1;
48
49 /// The stream has completed.
50 /// The `stateData` field is `null` and the request queue is empty.
51 static const int _DONE = 2;
52
Søren Gjesse 2015/06/11 07:57:06 Maybe put an additional comment that 4 is reserved
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Will do.
53 /// Flag set when [close] is called.
54 /// The `StreamEvents` is closed and no further events can be requested.
55 /// The last request in the queue, if any,
56 /// is completed and is witing to clean up.
nweiz 2015/06/12 01:24:24 "witing" -> "waiting"
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done.
57 static const int _CLOSED = 8;
58
59 /// Flag set when [rest] is called.
60 /// Only used for error reporting, otherwise equivalent to [_CLOSED].
61 static const int _CLOSED_REST = 12;
62
63 /// Current state.
64 ///
65 /// Use getters below to check if the state is [_isListening] or [_isDone],
66 /// and whether the stream events object [_isClosed].
67 int _state = _INITIAL;
68
69 /// Value depending on state. Use getters below to get the value and assert
70 /// the expected state.
71 var _stateData;
72
73 /// Queue of pending requests while state is [_LISTENING].
74 /// Access through methods below to ensure consistency.
75 Queue<_EventRequest> _requestQueue = new Queue();
nweiz 2015/06/12 01:24:25 Make this final. Personally, I prefer to omit the
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Made final. I prefer type annotations on all field
76
77 StreamEvents(Stream source) : _stateData = source;
78
79 bool get _isListening => (_state & _LISTENING) != 0;
80 bool get _isClosed => (_state & _CLOSED) != 0;
81 bool get _isDone => (_state & _DONE) != 0;
82
83 /// Whether the underlying stream is spent.
nweiz 2015/06/12 01:24:24 "spent" -> "done" Just to use consistent terminol
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
84 ///
85 /// This may return true before all events have been delivered.
86 /// Requesting a new event when [isDone] returns true,
87 /// for example using [next], will always fail.
88 bool get isDone => _isDone;
nweiz 2015/06/12 01:24:25 What's the use case for exposing this to the user?
Lasse Reichstein Nielsen 2015/06/12 13:04:23 It might be a little speculative because it doesn'
nweiz 2015/06/16 01:05:23 The version of this I wrote for ScheduledTest has
Lasse Reichstein Nielsen 2015/06/16 13:05:45 It doesn't solve the problem of what to do with th
nweiz 2015/06/16 22:34:11 It's unlikely that someone will call [hasNext] fol
89
90 /// Return the stream subscription while state is listening.
nweiz 2015/06/12 01:24:25 Nit: use present tense (e.g. "Returns" rather than
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
91 StreamSubscription get _subscription {
92 assert(_isListening);
93 return _stateData;
94 }
95
96 /// Return the source stream while state is initial.
97 Stream get _sourceStream {
98 assert(!_isListening);
99 assert(!_isDone);
100 return _stateData;
101 }
102
103 // Set the subscription and transition to listening state.
104 void set _subscription(StreamSubscription subscription) {
105 assert(!_isListening);
106 assert(!_isDone);
107 _stateData = subscription;
108 _state |= _LISTENING;
109 }
110
111 void _setDone() {
112 assert(!_isDone);
113 _state = (_state & _CLOSED_REST) | _DONE;
114 _stateData = null;
115 }
116
117 /// Request the next (yet unrequested) event from the stream.
nweiz 2015/06/12 01:24:26 Clarify in this documentation that it's valid to h
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done.
118 ///
119 /// When the requested event arrives, the returned future is completed with
120 /// the event. This is independent of whether the event is a data event or
121 /// an error event.
nweiz 2015/06/12 01:24:26 I'd write this as: "If the event is a value event,
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
122 ///
123 /// If the stream closed before an event arrives, the future is completed
nweiz 2015/06/12 01:24:24 "closed" -> "closes", "is completed" -> "completes
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
124 /// with a [StateError].
125 Future<T> get next {
126 if (!_isClosed) {
127 Completer completer = new Completer<T>();
128 _addRequest(new _NextRequest(completer));
129 return completer.future;
130 }
131 throw _failClosed();
132 }
133
134 /// Request a stream of all the remaning events of the source stream.
nweiz 2015/06/12 01:24:25 "Request" -> "Returns"
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done.
135 ///
136 /// All requested [next], [skip] or [take] operations are completed
137 /// first, and then any remaining events are provided as events of
138 /// the returned stream.
139 ///
140 /// Using `rest` closes the stream events object. After getting the
141 /// `rest` it is no longer allowed to request other events, like
nweiz 2015/06/12 01:24:25 "it is no longer allowed to" -> "the caller may no
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
142 /// after calling [close].
143 Stream<T> get rest {
144 if (!_isClosed) {
nweiz 2015/06/12 01:24:25 Nit: Short-circuit if it's closed to save on inden
Lasse Reichstein Nielsen 2015/06/12 13:04:22 ACK. This is not going to be in any inner loops, s
145 _state |= _CLOSED_REST;
146 if (_isListening) {
147 // We have an active subscription that we want to take over.
148 var delayStream = new StreamCompleter<T>();
149 _addRequest(new _RestRequest<T>(delayStream, this));
150 return delayStream.stream;
151 }
152 assert(_requestQueue.isEmpty);
153 if (isDone) {
154 // TODO(lrn): Add Stream.empty() constructor.
155 return new Stream<T>.fromIterable(const []);
156 }
157 // We have never listened the source stream, so just return that directly.
nweiz 2015/06/12 01:24:26 "listened to"
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
158 Stream result = _sourceStream;
159 _setDone();
160 return result;
161 }
162 throw _failClosed();
163 }
164
165 /// Requests to skip the next [count] *data* events.
nweiz 2015/06/12 01:24:24 "Requests to skip" -> "Skips"
166 ///
167 /// The [count] value must be non-negative.
168 ///
169 /// When successful, this is equivalent to using [take]
170 /// and ignoring the result.
171 ///
172 /// If an error occurs before `count` data events has
nweiz 2015/06/12 01:24:25 "has" -> "have"
173 /// been skipped, the returned future completes with
174 /// that error instead.
175 ///
176 /// If the stream closes before `count` data events,
177 /// the remaining unskipped event count is returned.
178 /// If the returned future completes with the integer `0`,
179 /// then all events were succssfully skipped. If the value
180 /// is greater than zero then the stream ended early.
181 Future<int> skip(int count) {
182 if (count < 0) throw new RangeError.range(count, 0, null, "count");
183 if (!_isClosed) {
184 Completer completer = new Completer<int>();
185 _addRequest(new _SkipRequest(completer, count));
186 return completer.future;
187 }
188 throw _failClosed();
189 }
190
191 /// Requests the next [count] data events as a list.
192 ///
193 /// The [count] value must be non-negative.
nweiz 2015/06/12 01:24:25 "The [count] value" -> "[count]"
Lasse Reichstein Nielsen 2015/06/12 13:04:21 -> "The [count]". Otherwise it would start the sen
194 ///
195 /// Equivalent to calling [next] `count` times and
196 /// storing the data values in a list.
197 ///
198 /// If an error occurs before `count` data events has
199 /// been collected, the returned future completes with
200 /// that error instead.
201 ///
202 /// If the stream closes before `count` data events,
203 /// the returned future completes with the list
204 /// of data collected so far. That is, the returned
205 /// list may have fewer than [count] elements.
206 Future<List<T>> take(int count) {
207 if (count < 0) throw new RangeError.range(count, 0, null, "count");
208 if (!_isClosed) {
209 Completer completer = new Completer<List<T>>();
210 _addRequest(new _TakeRequest(completer, count));
211 return completer.future;
212 }
213 throw _failClosed();
214 }
215
216 /// Release the underlying stream subscription.
nweiz 2015/06/12 01:24:26 "Release" -> "Cancels"
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
217 ///
218 /// The close operation waits until all previously requested
219 /// events have been processed, then it cancels the subscription
220 /// providing the events.
221 ///
222 /// The returned future completes with the result of calling
223 /// `cancel`.
224 ///
225 /// After calling `close`, no further events can be requested.
226 /// None of [next], [rest], [skip], [take] or [close] may be
227 /// called again.
228 Future close() {
nweiz 2015/06/12 01:24:24 Consider calling this [cancel], to match [StreamSu
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Good idea!
229 if (!_isClosed) {
230 _state |= _CLOSED;
231 if (!_isListening) {
232 assert(_requestQueue.isEmpty);
nweiz 2015/06/12 01:24:24 Add a comment explaining why the request queue wil
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
233 if (!_isDone) _setDone();
234 return new Future.value();
235 }
236 Completer completer = new Completer();
237 _addRequest(new _CloseRequest(completer, this));
238 return completer.future;
239 }
240 throw _failClosed();
nweiz 2015/06/12 01:24:26 Everything in the core libraries that's closable a
Lasse Reichstein Nielsen 2015/06/12 13:04:22 True. One reason why I didn't allow that was that
241 }
242
243 /// Reused error message.
nweiz 2015/06/12 01:24:24 Expand on this.
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done.
244 Error _failClosed() {
245 String cause =
246 ((_state & _CLOSED_REST) == _CLOSED_REST) ? "rest" : "close";
247 return new StateError("Already closed by a call to $cause");
248 }
249
250 /// Called when requesting an event when the requeust queue is empty.
251 /// The underlying subscription is paused in that case, except the very
252 /// first time when the subscription needs to be created.
253 void _listen() {
nweiz 2015/06/12 01:24:24 Methods like this and [_pause] that are only calle
Lasse Reichstein Nielsen 2015/06/12 13:04:23 Done.
254 if (_isListening) {
255 _subscription.resume();
256 } else if (!_isDone) {
257 _subscription =
258 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
259 }
260 }
261
262 /// Pauses the underlying subscription.
263 /// Called when the request queue is empty and the subscription isn't closed.
264 void _pause() {
265 assert(_isListening);
266 _subscription.pause();
267 }
268
269 // Callbacks receiving the events of the source stream.
nweiz 2015/06/12 01:24:25 Nit: If this is referring to a group of methods, a
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done.
270 void _onData(T data) {
271 assert(_requestQueue.isNotEmpty);
272 _EventRequest action = _nextAction;
273 action.add(data);
274 _checkCompleted();
275 }
276
277 void _onError(error, StackTrace stack) {
278 assert(_requestQueue.isNotEmpty);
279 _EventRequest action = _nextAction;
280 action.addError(error, stack);
281 _checkCompleted();
282 }
283
284 void _onDone() {
285 _setDone();
286 _closeAllRequests();
287 }
288
289 // Request queue management.
290
291 /// Get the next action in the queue, but don't remove it.
292 _EventRequest get _nextAction {
293 assert(_requestQueue.isNotEmpty);
nweiz 2015/06/12 01:24:24 This assertion seems redundant, since the line bel
Lasse Reichstein Nielsen 2015/06/12 13:04:22 True.
294 return _requestQueue.first;
295 }
296
297 /// Add a new request to the queue.
298 void _addRequest(_EventRequest action) {
299 if (_isDone) {
300 action.close();
301 return;
302 }
303 if (_requestQueue.isEmpty) {
304 if (action.isComplete) {
nweiz 2015/06/12 01:24:24 Explain under what circumstances an action that wa
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
305 // Skip listening and complete this immediately.
306 action.close();
307 return;
308 }
309 _listen();
310 }
311 _requestQueue.add(action);
312 }
313
314 /// Remove all requests and call their `done` event.
315 ///
316 /// Used when the source stream dries up.
317 void _closeAllRequests() {
318 assert(_isDone);
319 while (_requestQueue.isNotEmpty) {
320 _requestQueue.removeFirst().close();
321 }
322 }
323
324 /// Check whether the next actions in the queue are complete.
325 ///
326 /// If so, remove them and call their `complete` method.
327 void _checkCompleted() {
328 // Close-actions are executed immediately when they become the
329 // next (and last) event in the queue.
330 // When _isClosed and the queue is not empty, the last element
331 // of the queue is the close action.
332 while (_requestQueue.isNotEmpty) {
333 if (!_requestQueue.first.isComplete) {
334 return;
335 }
336 _requestQueue.removeFirst().close();
337 }
338 assert(_requestQueue.isEmpty);
339 if (!_isDone) _pause();
340 }
341
342 /// Extracts the subscription and makes the events object unusable.
343 ///
344 /// Can only be used by the very last request.
345 StreamSubscription _dispose() {
346 assert(_isClosed);
347 assert(_isListening);
348 assert(_requestQueue.isEmpty);
349 StreamSubscription subscription = _subscription;
350 _setDone();
351 return subscription;
352 }
353 }
354
nweiz 2015/06/12 01:24:25 Nit: extra newline.
355
356 /// Action to take when a requested event arrives.
357 abstract class _EventRequest implements EventSink {
nweiz 2015/06/12 01:24:25 It's kind of weird that the documentation for thes
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Agree, I switched to "request" halfway through, an
358 bool get isComplete;
Søren Gjesse 2015/06/11 07:57:06 Add close method here as well I think the 'close'
nweiz 2015/06/12 01:24:25 This should also have [add] and [addError] abstrac
Lasse Reichstein Nielsen 2015/06/12 13:04:22 It's inherited from EventSink. I picked EventSink
359 }
360
361 /// Action completing a [StreamEvents.next] request.
362 class _NextRequest implements _EventRequest {
363 Completer completer;
nweiz 2015/06/12 01:24:25 Even though it doesn't do anything at the language
364 _NextRequest(this.completer);
nweiz 2015/06/12 01:24:24 Rather than passing in a completer, consider makin
Lasse Reichstein Nielsen 2015/06/12 13:04:23 Good idea. Will do. The type parameter needs to go
365
366 void add(data) {
367 completer.complete(data);
368 completer = null;
nweiz 2015/06/12 01:24:24 Does nulling this out buy you anything? It seems l
Lasse Reichstein Nielsen 2015/06/12 13:04:23 We have an isCompleted getter? Whoa! I'm too used
369 }
370
371 void addError(error, [StackTrace stack]) {
372 completer.completeError(error, stack);
373 completer = null;
374 }
375
376 void close() {
377 if (!isComplete) {
378 completer.completeError(new StateError("no elements"));
nweiz 2015/06/12 01:24:25 If you do continue to check against null, null out
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Technically not necessary since "close" is always
379 }
380 }
381
382 bool get isComplete => completer == null;
383 }
384
385 /// Action completing a [StreamEvents.skip] request.
386 class _SkipRequest implements _EventRequest {
387 final Completer completer;
388 int count;
nweiz 2015/06/12 01:24:24 Document this. Also consider calling it something
Lasse Reichstein Nielsen 2015/06/15 15:46:23 Done.
389 _SkipRequest(this.completer, this.count);
390
391 void add(data) {
392 count--;
393 }
394
395 void addError(error, [StackTrace stack]) {
396 completer.completeError(error, stack);
397 count = 0;
398 }
399
400 void close() {
401 if (!completer.isCompleted) {
402 completer.complete(count);
403 count = 0;
404 }
405 }
406
407 bool get isComplete {
408 return count == 0;
409 }
410 }
411
412 /// Action completing a [StreamEvents.take] request.
413 class _TakeRequest<T> implements _EventRequest {
414 final Completer completer;
415 final List list = <T>[];
416 int count;
417 _TakeRequest(this.completer, this.count);
418
419 void add(data) {
420 list.add(data);
421 count--;
422 }
423
424 void addError(error, [StackTrace stack]) {
425 completer.completeError(error, stack);
426 count = 0;
427 }
428
429 void close() {
430 if (!completer.isCompleted) {
431 completer.complete(list);
432 }
433 }
434
435 bool get isComplete => count == 0;
436 }
437
438 /// Action completing a [StreamEvents.close] request.
439 class _CloseRequest implements _EventRequest {
440 final Completer completer;
441 StreamEvents events;
442
443 _CloseRequest(this.completer, this.events);
444
445 void add(data) {
446 throw new UnsupportedError("event");
447 }
448
449 void addError(error, [StackTrace stack]) {
450 throw new UnsupportedError("event");
nweiz 2015/06/12 01:24:24 Shouldn't this be "error"?
Lasse Reichstein Nielsen 2015/06/12 13:04:22 No, we don't support receiving events, that's the
451 }
452
453 void close() {
454 if (events._isListening) {
455 completer.complete(events._dispose().cancel());
456 } else {
457 completer.complete();
458 }
459 }
460
461 bool get isComplete => true;
462 }
463
464 /// Action completing a [StreamEvents.rest] request.
465 class _RestRequest<T> implements _EventRequest {
466 final StreamCompleter delayStream;
467 final StreamEvents events;
468 _RestRequest(this.delayStream, this.events);
469
470 void add(data) {
471 throw new UnsupportedError("event");
472 }
473
474 void addError(error, [StackTrace stack]) {
475 throw new UnsupportedError("event");
nweiz 2015/06/12 01:24:26 Ditto.
Lasse Reichstein Nielsen 2015/06/15 15:46:23 Done.
476 }
477
478 void close() {
479 if (events._isListening) {
480 StreamSubscription subscription = events._dispose();
481 delayStream.setSourceStream(new SubscriptionStream<T>(subscription));
482 if (subscription.isPaused) subscription.resume();
483 } else {
484 assert(events._isDone);
485 delayStream.setEmpty();
486 }
487 }
488
489 bool get isComplete => true;
490 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698