OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library async.stream_events; | |
6 | |
7 import 'dart:async'; | |
8 import 'dart:collection'; | |
9 | |
10 import "subscription_stream.dart"; | |
11 import "stream_completer.dart"; | |
12 import "../result.dart"; | |
13 | |
14 /// An asynchronous pull-based interface for accessing stream events. | |
15 /// | |
16 /// Wraps a stream and makes individual events available on request. | |
17 /// | |
18 /// You can request (and reserve) one or more events from the stream, | |
19 /// and after all previous requestes have been fulfilled, stream events | |
nweiz
2015/06/18 23:44:26
"requestes" -> "requests"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
20 /// go towards fulfilling your request. | |
21 /// | |
22 /// For example, if you ask for [next] two times, the returned futures | |
23 /// will be completed by the next two unreserved events from the stream. | |
nweiz
2015/06/18 23:44:25
"unreserved" -> "unrequested"? Just to keep the te
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
24 /// | |
25 /// The stream subscription is paused when there are no active | |
26 /// requests. | |
27 /// | |
28 /// Some streams, including broadcast streams, will buffer | |
29 /// events while paused, so waiting too long between requests may | |
30 /// cause memory bloat somewhere else. | |
31 /// | |
32 /// The individual requests are served in the order they are requested, | |
33 /// and the stream subscription is paused when there are no active requests. | |
nweiz
2015/06/18 23:44:27
Both of these are already mentioned above, so this
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
34 /// | |
35 /// This is similar to, but more convenient than, a [StreamIterator]. | |
36 /// A `StreamIterator` requires you to manually check when a new event is | |
37 /// available and you can only access the value of that event until you | |
38 /// check for the next one. A `StreamQueue` allows you to request, for example, | |
39 /// three events at a time, either individually, as a group using [take] | |
40 /// or [skip], or in any combination. | |
41 /// | |
42 /// You can also ask to have the [rest] of the stream provided as | |
43 /// a new stream. This allows, for example, taking the first event | |
44 /// out of a stream and continue using the rest of the stream as a stream. | |
nweiz
2015/06/18 23:44:27
"continue using" -> "continuing to use"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
45 /// | |
46 /// Example: | |
47 /// | |
48 /// var events = new StreamQueue<String>(someStreamOfLines); | |
49 /// var first = await events.next; | |
50 /// while (first.startsWith('#')) { | |
51 /// // Skip comments. | |
52 /// first = await events.next; | |
53 /// } | |
54 /// | |
55 /// if (first.startsWith(MAGIC_MARKER)) { | |
56 /// var headerCount = | |
57 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); | |
58 /// handleMessage(headers: await events.take(headerCount), | |
59 /// body: events.rest); | |
60 /// return; | |
61 /// } | |
62 /// // Error handling. | |
63 /// | |
64 /// When you need no further events the `StreamQueue` should be closed | |
65 /// using [cancel]. This releases the underlying stream subscription. | |
66 /// | |
67 /// The underlying stream subscription is paused when there | |
68 /// are no requeusts. Some subscriptions, including those of broadcast streams, | |
69 /// will still buffer events while paused. Creating a `StreamQueue` from | |
70 /// such a stream and stopping to request events, will cause memory to fill up | |
nweiz
2015/06/18 23:44:27
"stopping" -> "ceasing", get rid of the comma
| |
71 /// unnecessarily. | |
nweiz
2015/06/18 23:44:26
This paragraph also seems redundant with the sixth
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
| |
72 class StreamQueue<T> { | |
73 // This class maintains two queues: one of events and one of requests. | |
74 // The active request (the one in front of the queue) is called with | |
75 // the current event queue when it becomes active. | |
nweiz
2015/06/18 23:44:27
"it" -> "the request" (to clarify whether you're r
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
76 // If it returns true, it's done and will be removed from the request queue. | |
nweiz
2015/06/18 23:44:26
"it" -> "the request" again. The next "it" is fine
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
77 // If it returns false, it needs more events, and will be called again when | |
78 // new events are available. | |
79 // The request can remove events that it uses, or keep them in the event | |
80 // queue until it has all that it needs. | |
81 // | |
82 // This model is very flexible and easily extensible. | |
83 // It allows requests that don't consume events (like [hasNext]) or | |
84 // potentially a request that takes either five or zero events, determined | |
85 // by the content of the fifth event. | |
86 | |
87 /// Source of events. | |
88 final Stream _sourceStream; | |
89 | |
90 /// Number of events that may be prefetched when there is no request. | |
91 final int _prefetchCount; | |
92 | |
93 /// Subscription on [_sourceStream] while listening for events. | |
94 /// | |
95 /// Set to subscription when listening, and set to `null` when the | |
96 /// subscription is done (and [_isDone] is set to true). | |
97 StreamSubscription _subscription; | |
98 | |
99 /// Whether we have listened on [_sourceStream] and the subscription is done. | |
100 bool _isDone = false; | |
101 | |
102 /// Whether a closing operation has been performed on the stream queue. | |
103 /// | |
104 /// Closing operations are [cancel] and [rest]. | |
105 bool _isClosed = false; | |
106 | |
107 /// Queue of events not used by a request yet. | |
108 final Queue<Result> _eventQueue = new Queue(); | |
109 | |
110 /// Queue of pending requests. | |
111 /// Access through methods below to ensure consistency. | |
nweiz
2015/06/18 23:44:26
Nit: add a newline above
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
112 final Queue<_EventRequest> _requestQueue = new Queue(); | |
113 | |
114 /// Create a `StreamQueue` of the events of source. | |
nweiz
2015/06/18 23:44:26
"source" -> "[source]"
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
| |
115 /// | |
116 /// Allow prefetching [prefetch] events before pausing the source | |
117 /// stream even if there are no current requests for them. | |
118 /// The default is to pause immediately when there is no pending request. | |
119 /// Even if `prefetch` is greater than zero, the stream won't listened on | |
120 /// before the first request. | |
nweiz
2015/06/18 23:44:25
Consider allowing "prefetch: -1" to mean "never pa
Lasse Reichstein Nielsen
2015/06/30 10:34:14
I don't like -1 being magical, the alternative wou
| |
121 StreamQueue(Stream source, {int prefetch: 0}) | |
nweiz
2015/06/18 23:44:26
Consider making [prefetch] default to null and ass
Lasse Reichstein Nielsen
2015/06/30 10:34:12
True. I'll just remove it for now (effectively fix
| |
122 : _sourceStream = source, _prefetchCount = prefetch; | |
123 | |
124 /// Asks if the stream has any more events. | |
125 /// | |
126 /// Returns a future that completes with `true` if the stream has any | |
127 /// more events, whether data or error. | |
128 /// If the stream closes without producing any more events, the returned | |
129 /// future completes with `false`. | |
130 /// | |
131 /// Can be used before using [next] to avoid getting an error in the | |
132 /// future returned by `next` in the case where there are no more events. | |
133 Future<bool> get hasNext { | |
134 if (!_isClosed) { | |
135 _HasNextRequest hasNextRequest = new _HasNextRequest(); | |
nweiz
2015/06/18 23:44:26
Nit: "var" (also below)
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
| |
136 _addRequest(hasNextRequest); | |
137 return hasNextRequest.future; | |
138 } | |
139 throw _failClosed(); | |
140 } | |
141 | |
142 /// Requests the next (yet unrequested) event from the stream. | |
143 /// | |
144 /// When the requested event arrives, the returned future is completed with | |
145 /// the event. | |
146 /// If the event is a data event, the returned future completes | |
147 /// with its value. | |
148 /// If the event is an error event, the returned future completes with | |
149 /// its error and stack trace. | |
150 /// If the stream closes before an event arrives, the returned future | |
151 /// completes with a [StateError]. | |
152 /// | |
153 /// It's possible to have several pending [next] calls (or other requests), | |
154 /// and they will be completed in the order they were requested, by the | |
155 /// first events that were not used by previous requeusts. | |
nweiz
2015/06/18 23:44:26
"used" -> "consumed" (slightly more correct for re
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
156 Future<T> get next { | |
157 if (!_isClosed) { | |
158 _NextRequest nextRequest = new _NextRequest<T>(); | |
159 _addRequest(nextRequest); | |
160 return nextRequest.future; | |
161 } | |
162 throw _failClosed(); | |
163 } | |
164 | |
165 /// Returns a stream of all the remaning events of the source stream. | |
166 /// | |
167 /// All requested [next], [skip] or [take] operations are completed | |
168 /// first, and then any remaining events are provided as events of | |
169 /// the returned stream. | |
170 /// | |
171 /// Using `rest` closes the stream events object. After getting the | |
nweiz
2015/06/18 23:44:26
"the stream events object" -> "this stream queue"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
172 /// `rest` the caller may no longer request other events, like | |
173 /// after calling [cancel]. | |
174 Stream<T> get rest { | |
175 if (_isClosed) { | |
176 throw _failClosed(); | |
177 } | |
178 var request = new _RestRequest<T>(this); | |
179 _isClosed = true; | |
180 _addRequest(request); | |
181 return request.stream; | |
182 } | |
183 | |
184 /// Skips the next [count] *data* events. | |
185 /// | |
186 /// The [count] must be non-negative. | |
187 /// | |
188 /// When successful, this is equivalent to using [take] | |
189 /// and ignoring the result. | |
190 /// | |
191 /// If an error occurs before `count` data events have been skipped, | |
192 /// the returned future completes with that error instead. | |
193 /// | |
194 /// If the stream closes before `count` data events, | |
195 /// the remaining unskipped event count is returned. | |
196 /// If the returned future completes with the integer `0`, | |
197 /// then all events were succssfully skipped. If the value | |
198 /// is greater than zero then the stream ended early. | |
199 Future<int> skip(int count) { | |
200 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | |
201 if (!_isClosed) { | |
202 var request = new _SkipRequest(count); | |
203 _addRequest(request); | |
204 return request.future; | |
205 } | |
206 throw _failClosed(); | |
207 } | |
208 | |
209 /// Requests the next [count] data events as a list. | |
210 /// | |
211 /// The [count] must be non-negative. | |
212 /// | |
213 /// Equivalent to calling [next] `count` times and | |
214 /// storing the data values in a list. | |
215 /// | |
216 /// If an error occurs before `count` data events has | |
217 /// been collected, the returned future completes with | |
218 /// that error instead. | |
219 /// | |
220 /// If the stream closes before `count` data events, | |
221 /// the returned future completes with the list | |
222 /// of data collected so far. That is, the returned | |
223 /// list may have fewer than [count] elements. | |
224 Future<List<T>> take(int count) { | |
225 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | |
226 if (!_isClosed) { | |
227 var request = new _TakeRequest<T>(count); | |
228 _addRequest(request); | |
229 return request.future; | |
230 } | |
231 throw _failClosed(); | |
232 } | |
233 | |
234 /// Cancels the underlying stream subscription. | |
235 /// | |
236 /// The cancel operation waits until all previously requested | |
237 /// events have been processed, then it cancels the subscription | |
238 /// providing the events. | |
239 /// | |
240 /// The returned future completes with the result of calling | |
241 /// `cancel`. | |
242 /// | |
243 /// After calling `cancel`, no further events can be requested. | |
244 /// None of [next], [rest], [skip], [take] or [cancel] may be | |
245 /// called again. | |
246 Future cancel() { | |
247 if (!_isClosed) { | |
248 _isClosed = true; | |
249 var request = new _CancelRequest(this); | |
250 _addRequest(request); | |
251 return request.future; | |
252 } | |
253 throw _failClosed(); | |
254 } | |
255 | |
256 /// Returns an error for when a request is made after cancel. | |
257 /// | |
258 /// Returns a [StateError] with a message saying that either | |
259 /// [cancel] or [rest] have already been called. | |
260 Error _failClosed() { | |
261 return new StateError("Already cancelled"); | |
262 } | |
263 | |
264 // Callbacks receiving the events of the source stream. | |
265 | |
266 void _onData(T data) { | |
267 _eventQueue.add(new Result.value(data)); | |
268 _checkQueues(); | |
269 } | |
270 | |
271 void _onError(error, StackTrace stack) { | |
272 _eventQueue.add(new Result.error(error, stack)); | |
273 _checkQueues(); | |
274 } | |
275 | |
276 void _onDone() { | |
277 _subscription = null; | |
278 _isDone = true; | |
279 _closeAllRequests(); | |
280 } | |
281 | |
282 // Request queue management. | |
283 | |
284 /// Add a new request to the queue. | |
nweiz
2015/06/18 23:44:26
"Add" -> "Adds"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
285 void _addRequest(_EventRequest request) { | |
286 if (_isDone) { | |
287 assert(_requestQueue.isEmpty); | |
288 if (!request.addEvents(_eventQueue)) { | |
289 request.close(_eventQueue); | |
290 } | |
291 return; | |
292 } | |
293 if (_requestQueue.isEmpty) { | |
294 if (request.addEvents(_eventQueue)) return; | |
295 _ensureListening(); | |
296 } | |
297 _requestQueue.add(request); | |
298 | |
nweiz
2015/06/18 23:44:26
Nit: extra newline
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
299 } | |
300 | |
301 /// Ensures that we are listening on events from [_sourceStream]. | |
302 /// | |
303 /// Resumes subscription on [_sourceStream], or create it if necessary. | |
nweiz
2015/06/18 23:44:26
"create" -> "creates"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
304 StreamSubscription _ensureListening() { | |
305 assert(!_isDone); | |
306 if (_subscription == null) { | |
307 _subscription = | |
308 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | |
309 } else { | |
310 _subscription.resume(); | |
311 } | |
312 } | |
313 | |
314 | |
315 /// Remove all requests and close them. | |
nweiz
2015/06/18 23:44:26
"Remove" -> "Removes", "close" -> "closes"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
316 /// | |
317 /// Used when the source stream is done. | |
318 /// After this, no further requests will be added to the queue, | |
319 /// requests are immediately served entirely by events already in the event | |
320 /// queue, if any. | |
321 void _closeAllRequests() { | |
322 assert(_isDone); | |
323 while (_requestQueue.isNotEmpty) { | |
324 var request = _requestQueue.removeFirst(); | |
325 if (!request.addEvents(_eventQueue)) { | |
nweiz
2015/06/18 23:44:26
Isn't this guaranteed to return false? [request.ad
Lasse Reichstein Nielsen
2015/06/30 10:34:12
It's only guaranteed to return false for the first
| |
326 request.close(_eventQueue); | |
327 } | |
328 } | |
329 } | |
330 | |
331 /// Matches events with requests. | |
332 /// | |
333 /// Called after receiving an event. | |
334 void _checkQueues() { | |
335 while (_requestQueue.isNotEmpty) { | |
336 if (_requestQueue.first.addEvents(_eventQueue)) { | |
337 _requestQueue.removeFirst(); | |
338 } else { | |
339 return; | |
340 } | |
341 } | |
342 if (!_isDone && _eventQueue.length >= _prefetchCount) { | |
343 _subscription.pause(); | |
344 } | |
345 } | |
346 | |
347 /// Extracts the subscription and makes the events object unusable. | |
nweiz
2015/06/18 23:44:26
"events object" -> "stream queue"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
348 /// | |
349 /// Can only be used by the very last request. | |
350 StreamSubscription _dispose() { | |
351 assert(_isClosed); | |
352 StreamSubscription subscription = _subscription; | |
nweiz
2015/06/18 23:44:25
Nit: "var"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
353 _subscription = null; | |
354 _isDone = true; | |
355 return subscription; | |
356 } | |
357 } | |
358 | |
359 /// Request object that receives events when they arrive, until fulfilled. | |
360 /// | |
361 /// Each request that cannot be fulfilled immediately is represented by | |
362 /// an `_EventRequest` object in the request queue. | |
363 /// | |
364 /// Events from the source stream are sent to the first request in the | |
365 /// queue until it reports itself as [isComplete]. | |
366 /// | |
367 /// When the first request in the queue `isComplete`, either when becoming | |
368 /// the first request or after receiving an event, its [close] methods is | |
369 /// called. | |
370 /// | |
371 /// The [close] method is also called immediately when the source stream | |
372 /// is done. | |
373 abstract class _EventRequest implements EventSink { | |
374 /// Handle available events. | |
375 /// | |
376 /// The available events are provided as a queue. The `addEvents` function | |
377 /// should only access the queue from the start, e.g., using [removeFirst]. | |
nweiz
2015/06/18 23:44:27
"access the queue from the start" -> "remove event
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
378 /// | |
379 /// Returns `true` if if the request is completed, or `false` if it needs | |
nweiz
2015/06/18 23:44:27
"if if" -> "if"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
380 /// more events. | |
381 /// The call may keep events in the queue until the requeust is complete, | |
382 /// or it may remove them immediately. | |
383 /// | |
384 /// This method is called when a request reaches the front of the request | |
385 /// queue, and if it returns `false`, it's called again every time an event | |
nweiz
2015/06/18 23:44:25
"an event" -> "a new event"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
386 /// becomes available. | |
nweiz
2015/06/18 23:44:27
"and finally when the stream closes"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
387 bool addEvents(Queue<Result> events); | |
388 | |
389 /// Complete the request. | |
390 /// | |
391 /// This is called when the source stream is done before the request | |
392 /// had a chance to receive events. If there are any events available, | |
393 /// they are in the [events] queue. No further events will become available. | |
394 /// | |
395 /// The queue should only be accessed from the start, e.g., | |
396 /// using [removeFirst]. | |
397 /// | |
398 /// If the requests kept events in the queue after an [addEvents] call, | |
nweiz
2015/06/18 23:44:26
"requests" -> "request"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
399 /// it should remove them here. | |
400 void close(Queue<Result> events); | |
401 } | |
402 | |
403 /// Request for a [StreamQueue.next] call. | |
404 /// | |
405 /// Completes the returned future when receiving the first event, | |
406 /// and is then complete. | |
407 class _NextRequest<T> implements _EventRequest { | |
408 /// Completer for the future returned by [StreamQueue.next]. | |
409 /// | |
410 /// Set to `null` when it is completed, to mark it as already complete. | |
nweiz
2015/06/18 23:44:27
This is no longer accurate.
| |
411 final Completer _completer; | |
412 | |
413 _NextRequest() : _completer = new Completer<T>(); | |
nweiz
2015/06/18 23:44:26
Nit: assign _completer in the declaration
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
| |
414 | |
415 Future<T> get future => _completer.future; | |
416 | |
417 bool addEvents(Queue<Result> events) { | |
418 if (events.isEmpty) return false; | |
419 events.removeFirst().complete(_completer); | |
420 return true; | |
421 } | |
422 | |
423 void close(Queue<Result> events) { | |
424 _completer.completeError(new StateError("no elements")); | |
nweiz
2015/06/18 23:44:27
"no" -> "No"
Also include a stack trace here so t
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
425 } | |
426 } | |
427 | |
428 /// Request for a [StreamQueue.skip] call. | |
429 class _SkipRequest implements _EventRequest { | |
430 /// Completer for the future returned by the skip call. | |
431 final Completer _completer = new Completer<int>(); | |
432 | |
433 /// Number of remaining events to skip. | |
434 /// | |
435 /// The request [isComplete] when the values reaches zero. | |
436 /// | |
437 /// Decremented when an event is seen. | |
438 /// Set to zero when an error is seen since errors abort the skip request. | |
439 int _eventsToSkip; | |
440 | |
441 _SkipRequest(this._eventsToSkip); | |
442 | |
443 /// The future completed when the correct number of events have been skipped. | |
444 Future get future => _completer.future; | |
445 | |
446 bool addEvents(Queue<Result> events) { | |
447 while (_eventsToSkip > 0) { | |
448 if (events.isEmpty) return false; | |
449 _eventsToSkip--; | |
450 var event = events.removeFirst(); | |
451 if (event.isError) { | |
452 event.complete(_completer); | |
453 return true; | |
454 } | |
455 } | |
456 _completer.complete(0); | |
457 return true; | |
458 } | |
459 | |
460 void close(Queue<Result> events) { | |
461 _completer.complete(_eventsToSkip); | |
462 } | |
463 } | |
464 | |
465 /// Request for a [StreamQueue.take] call. | |
466 class _TakeRequest<T> implements _EventRequest { | |
467 /// Completer for the future returned by the take call. | |
468 final Completer _completer; | |
469 | |
470 /// List collecting events until enough have been seen. | |
471 final List _list = <T>[]; | |
472 | |
473 /// Number of events to capture. | |
474 /// | |
475 /// The request [isComplete] when the length of [_list] reaches | |
476 /// this value. | |
477 final int _eventsToTake; | |
478 | |
479 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | |
480 | |
481 /// The future completed when the correct number of events have been captured. | |
482 Future get future => _completer.future; | |
483 | |
484 bool addEvents(Queue<Events> events) { | |
485 while (_list.length < _eventsToTake) { | |
486 if (events.isEmpty) return false; | |
487 var result = events.removeFirst(); | |
488 if (result.isError) { | |
489 result.complete(_completer); | |
490 return true; | |
491 } | |
492 _list.add(result.asValue.value); | |
493 } | |
494 _completer.complete(_list); | |
495 return true; | |
496 } | |
497 | |
498 void close(Queue<Events> events) { | |
499 _completer.complete(_list); | |
500 } | |
501 } | |
502 | |
503 /// Request for a [StreamQueue.cancel] call. | |
504 /// | |
505 /// The request is always complete, it just waits in the request queue | |
506 /// until all previous events are fulfilled, then it cancels the stream events | |
507 /// subscription. | |
508 class _CancelRequest implements _EventRequest { | |
509 /// Completer for the future returned by the `cancel` call. | |
510 final Completer _completer = new Completer(); | |
511 | |
512 /// The [StreamQueue] object that has this request queued. | |
513 /// | |
514 /// When the event is completed, it needs to cancel the active subscription | |
515 /// of the `StreamQueue` object, if any. | |
516 final StreamQueue _streamQueue; | |
517 | |
518 _CancelRequest(this._streamQueue); | |
519 | |
520 /// The future completed when the cancel request is completed. | |
521 Future get future => _completer.future; | |
522 | |
523 bool addEvents(Queue<Result> events) { | |
524 _shutdown(); | |
525 return true; | |
526 } | |
527 | |
528 void close(_) { | |
529 _shutdown(); | |
530 } | |
531 | |
532 void _shutdown() { | |
533 if (_streamQueue._subscription == null) { | |
534 _completer.complete(); | |
535 } else { | |
536 _completer.complete(_streamQueue._dispose().cancel()); | |
537 } | |
538 } | |
539 } | |
540 | |
541 /// Request for a [StreamQueue.rest] call. | |
542 /// | |
543 /// The request is always complete, it just waits in the request queue | |
544 /// until all previous events are fulfilled, then it takes over the | |
545 /// stream events subscription and creates a stream from it. | |
546 class _RestRequest<T> implements _EventRequest { | |
547 /// Completer for the stream returned by the `rest` call. | |
548 final StreamCompleter _completer; | |
549 | |
550 /// The [StreamQueue] object that has this request queued. | |
551 /// | |
552 /// When the event is completed, it needs to cancel the active subscription | |
553 /// of the `StreamQueue` object, if any. | |
554 final StreamQueue _streamQueue; | |
555 _RestRequest(this._streamQueue) : _completer = new StreamCompleter<T>(); | |
nweiz
2015/06/18 23:44:27
Nit: newline above.
Also move [_completer]'s init
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
| |
556 | |
557 /// The future which will contain the remaining events of [_streamQueue]. | |
nweiz
2015/06/18 23:44:26
"future" -> "stream"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
558 Stream<T> get stream => _completer.stream; | |
559 | |
560 bool addEvents(Queue<Result> events) { | |
561 _completeStream(events); | |
562 return true; | |
563 } | |
564 | |
565 void close(Queue<Result> events) { | |
566 _completeStream(events); | |
567 } | |
568 | |
569 void _completeStream(Queue<Result> events) { | |
570 Stream getRestStream() { | |
nweiz
2015/06/18 23:44:26
Why is this a local method (as opposed to a privat
Lasse Reichstein Nielsen
2015/06/30 10:34:14
It's only used here, so I didn't see a need to giv
nweiz
2015/06/30 23:39:47
There are a lot of private methods at the top leve
Lasse Reichstein Nielsen
2015/07/01 08:24:56
Agree. Moved to private instance method.
| |
571 if (_streamQueue._isDone) { | |
572 return new Stream<T>.empty(); | |
nweiz
2015/06/18 23:44:25
If you're going to use this here, you'll need to u
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Ack. I SOOO wish 1.11 would be released soon. All
nweiz
2015/06/30 23:39:47
Yeah, this is one of the burdens of developing in
| |
573 } | |
574 if (_streamQueue._subscription == null) { | |
575 return _streamQueue._sourceStream; | |
576 } | |
577 StreamSubscription subscription = _streamQueue._dispose(); | |
578 subscription.resume(); | |
579 return new SubscriptionStream<T>(subscription); | |
580 } | |
581 if (events.isEmpty) { | |
582 if (_streamQueue._isDone) { | |
583 _completer.setEmpty(); | |
584 } else { | |
585 _completer.setSourceStream(getRestStream()); | |
586 } | |
587 } else { | |
588 // There are prefetched events which needs to be added before the | |
589 // remaining stream. | |
590 StreamController controller = new StreamController<T>(); | |
nweiz
2015/06/18 23:44:27
Nit: "var"
| |
591 for (var event in events) event.addTo(controller); | |
nweiz
2015/06/18 23:44:25
I like this style of loop, but unfortunately it's
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Do they want me to write
events.forEach((even
| |
592 controller.addStream(getRestStream(), cancelOnError: false) | |
593 .whenComplete(controller.close); | |
594 _completer.setSourceStream(controller.stream); | |
595 } | |
596 } | |
597 } | |
598 | |
599 /// Request for a [StreamQueue.hasNext] call. | |
600 /// | |
601 /// Completes the [future] with `true` if it sees any event, | |
602 /// but doesn't consume the event. | |
603 /// If the request is closed without seeing an event, then | |
604 /// the [future] is completed with `false`. | |
605 class _HasNextRequest<T> implements _EventRequest { | |
606 final Completer _completer = new Completer<bool>(); | |
607 | |
608 Future<bool> get future => _completer.future; | |
609 | |
610 bool addEvents(Queue<Result> events) { | |
611 if (events.isNotEmpty) { | |
612 _completer.complete(true); | |
613 return true; | |
614 } | |
615 return false; | |
616 } | |
617 | |
618 void close(_) { | |
619 _completer.complete(false); | |
620 } | |
621 } | |
OLD | NEW |