OLD | NEW |
---|---|
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.stream_events; | 5 library async.stream_events; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 | 9 |
10 import "subscription_stream.dart"; | 10 import "subscription_stream.dart"; |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
53 /// var headerCount = | 53 /// var headerCount = |
54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); | 54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); |
55 /// handleMessage(headers: await events.take(headerCount), | 55 /// handleMessage(headers: await events.take(headerCount), |
56 /// body: events.rest); | 56 /// body: events.rest); |
57 /// return; | 57 /// return; |
58 /// } | 58 /// } |
59 /// // Error handling. | 59 /// // Error handling. |
60 /// | 60 /// |
61 /// When you need no further events the `StreamQueue` should be closed | 61 /// When you need no further events the `StreamQueue` should be closed |
62 /// using [cancel]. This releases the underlying stream subscription. | 62 /// using [cancel]. This releases the underlying stream subscription. |
63 class StreamQueue<T> { | 63 abstract class StreamQueue<T> { |
64 // This class maintains two queues: one of events and one of requests. | 64 // This class maintains two queues: one of events and one of requests. |
65 // The active request (the one in front of the queue) is called with | 65 // The active request (the one in front of the queue) is called with |
66 // the current event queue when it becomes active. | 66 // the current event queue when it becomes active. |
67 // | 67 // |
68 // If the request returns true, it's complete and will be removed from the | 68 // If the request returns true, it's complete and will be removed from the |
69 // request queue. | 69 // request queue. |
70 // If the request returns false, it needs more events, and will be called | 70 // If the request returns false, it needs more events, and will be called |
71 // again when new events are available. | 71 // again when new events are available. |
72 // The request can remove events that it uses, or keep them in the event | 72 // The request can remove events that it uses, or keep them in the event |
73 // queue until it has all that it needs. | 73 // queue until it has all that it needs. |
74 // | 74 // |
75 // This model is very flexible and easily extensible. | 75 // This model is very flexible and easily extensible. |
76 // It allows requests that don't consume events (like [hasNext]) or | 76 // It allows requests that don't consume events (like [hasNext]) or |
77 // potentially a request that takes either five or zero events, determined | 77 // potentially a request that takes either five or zero events, determined |
78 // by the content of the fifth event. | 78 // by the content of the fifth event. |
79 | 79 |
80 /// Source of events. | 80 /// Whether the event source done. |
nweiz
2015/08/21 19:43:57
"is done"
Lasse Reichstein Nielsen
2015/08/24 08:31:33
Done.
I mean: Is done! :)
| |
81 final Stream _sourceStream; | |
82 | |
83 /// Subscription on [_sourceStream] while listening for events. | |
84 /// | |
85 /// Set to subscription when listening, and set to `null` when the | |
86 /// subscription is done (and [_isDone] is set to true). | |
87 StreamSubscription _subscription; | |
88 | |
89 /// Whether we have listened on [_sourceStream] and the subscription is done. | |
90 bool _isDone = false; | 81 bool _isDone = false; |
91 | 82 |
92 /// Whether a closing operation has been performed on the stream queue. | 83 /// Whether a closing operation has been performed on the stream queue. |
93 /// | 84 /// |
94 /// Closing operations are [cancel] and [rest]. | 85 /// Closing operations are [cancel] and [rest]. |
95 bool _isClosed = false; | 86 bool _isClosed = false; |
96 | 87 |
97 /// Queue of events not used by a request yet. | 88 /// Queue of events not used by a request yet. |
98 final Queue<Result> _eventQueue = new Queue(); | 89 final Queue<Result> _eventQueue = new Queue(); |
99 | 90 |
100 /// Queue of pending requests. | 91 /// Queue of pending requests. |
101 /// | 92 /// |
102 /// Access through methods below to ensure consistency. | 93 /// Access through methods below to ensure consistency. |
103 final Queue<_EventRequest> _requestQueue = new Queue(); | 94 final Queue<_EventRequest> _requestQueue = new Queue(); |
104 | 95 |
105 /// Create a `StreamQueue` of the events of [source]. | 96 /// Create a `StreamQueue` of the events of [source]. |
106 StreamQueue(Stream source) | 97 factory StreamQueue(Stream source) = _StreamQueue<T>; |
107 : _sourceStream = source; | 98 |
99 StreamQueue._(); | |
108 | 100 |
109 /// Asks if the stream has any more events. | 101 /// Asks if the stream has any more events. |
110 /// | 102 /// |
111 /// Returns a future that completes with `true` if the stream has any | 103 /// Returns a future that completes with `true` if the stream has any |
112 /// more events, whether data or error. | 104 /// more events, whether data or error. |
113 /// If the stream closes without producing any more events, the returned | 105 /// If the stream closes without producing any more events, the returned |
114 /// future completes with `false`. | 106 /// future completes with `false`. |
115 /// | 107 /// |
116 /// Can be used before using [next] to avoid getting an error in the | 108 /// Can be used before using [next] to avoid getting an error in the |
117 /// future returned by `next` in the case where there are no more events. | 109 /// future returned by `next` in the case where there are no more events. |
110 /// Another alternative is to use `take(1)` which returns either zero or | |
111 /// one events. | |
118 Future<bool> get hasNext { | 112 Future<bool> get hasNext { |
119 if (!_isClosed) { | 113 if (!_isClosed) { |
120 var hasNextRequest = new _HasNextRequest(); | 114 var hasNextRequest = new _HasNextRequest(); |
121 _addRequest(hasNextRequest); | 115 _addRequest(hasNextRequest); |
122 return hasNextRequest.future; | 116 return hasNextRequest.future; |
123 } | 117 } |
124 throw _failClosed(); | 118 throw _failClosed(); |
125 } | 119 } |
126 | 120 |
127 /// Requests the next (yet unrequested) event from the stream. | 121 /// Requests the next (yet unrequested) event from the stream. |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
209 Future<List<T>> take(int count) { | 203 Future<List<T>> take(int count) { |
210 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 204 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
211 if (!_isClosed) { | 205 if (!_isClosed) { |
212 var request = new _TakeRequest<T>(count); | 206 var request = new _TakeRequest<T>(count); |
213 _addRequest(request); | 207 _addRequest(request); |
214 return request.future; | 208 return request.future; |
215 } | 209 } |
216 throw _failClosed(); | 210 throw _failClosed(); |
217 } | 211 } |
218 | 212 |
219 /// Cancels the underlying stream subscription. | 213 /// Cancels the underlying event source. |
220 /// | 214 /// |
221 /// If [immediate] is `false` (the default), the cancel operation waits until | 215 /// If [immediate] is `false` (the default), the cancel operation waits until |
222 /// all previously requested events have been processed, then it cancels the | 216 /// all previously requested events have been processed, then it cancels the |
223 /// subscription providing the events. | 217 /// subscription providing the events. |
224 /// | 218 /// |
225 /// If [immediate] is `true`, the subscription is instead canceled | 219 /// If [immediate] is `true`, the source is instead canceled |
226 /// immediately. Any pending events are completed as though the underlying | 220 /// immediately. Any pending events are completed as though the underlying |
227 /// stream had closed. | 221 /// stream had closed. |
228 /// | 222 /// |
229 /// The returned future completes with the result of calling | 223 /// The returned future completes with the result of calling |
230 /// `cancel`. | 224 /// `cancel`. |
231 /// | 225 /// |
232 /// After calling `cancel`, no further events can be requested. | 226 /// After calling `cancel`, no further events can be requested. |
233 /// None of [next], [rest], [skip], [take] or [cancel] may be | 227 /// None of [next], [rest], [skip], [take] or [cancel] may be |
234 /// called again. | 228 /// called again. |
235 Future cancel({bool immediate: false}) { | 229 Future cancel({bool immediate: false}) { |
236 if (_isClosed) throw _failClosed(); | 230 if (_isClosed) throw _failClosed(); |
237 _isClosed = true; | 231 _isClosed = true; |
238 | 232 |
239 if (!immediate) { | 233 if (!immediate) { |
240 var request = new _CancelRequest(this); | 234 var request = new _CancelRequest(this); |
241 _addRequest(request); | 235 _addRequest(request); |
242 return request.future; | 236 return request.future; |
243 } | 237 } |
244 | 238 |
245 if (_isDone) return new Future.value(); | 239 if (_isDone && _eventQueue.isEmpty) return new Future.value(); |
246 if (_subscription == null) _subscription = _sourceStream.listen(null); | 240 return _cancel(); |
247 var future = _subscription.cancel(); | |
248 _onDone(); | |
249 return future; | |
250 } | 241 } |
251 | 242 |
243 /// Cancels the underlying event source. | |
244 Future _cancel(); | |
nweiz
2015/08/21 19:43:57
It might be clearer to explicitly divide these pri
Lasse Reichstein Nielsen
2015/08/24 08:31:34
Good idea.
| |
245 | |
252 /// Returns an error for when a request is made after cancel. | 246 /// Returns an error for when a request is made after cancel. |
253 /// | 247 /// |
254 /// Returns a [StateError] with a message saying that either | 248 /// Returns a [StateError] with a message saying that either |
255 /// [cancel] or [rest] have already been called. | 249 /// [cancel] or [rest] have already been called. |
256 Error _failClosed() { | 250 Error _failClosed() { |
257 return new StateError("Already cancelled"); | 251 return new StateError("Already cancelled"); |
258 } | 252 } |
259 | 253 |
260 // Callbacks receiving the events of the source stream. | 254 // Callbacks receiving the events of the source stream. |
261 | 255 void _addResult(Result result) { |
nweiz
2015/08/21 19:43:57
Nit: keep this empty line.
Lasse Reichstein Nielsen
2015/08/24 08:31:34
I've documented the function instead.
| |
262 void _onData(T data) { | 256 _eventQueue.add(result); |
263 _eventQueue.add(new Result.value(data)); | |
264 _checkQueues(); | |
265 } | |
266 | |
267 void _onError(error, StackTrace stack) { | |
268 _eventQueue.add(new Result.error(error, stack)); | |
269 _checkQueues(); | 257 _checkQueues(); |
270 } | 258 } |
271 | 259 |
272 void _onDone() { | 260 void _onDone() { |
nweiz
2015/08/21 19:43:57
Document this.
Lasse Reichstein Nielsen
2015/08/24 08:31:33
Done.
| |
273 _subscription = null; | |
274 _isDone = true; | 261 _isDone = true; |
275 _closeAllRequests(); | 262 _checkQueues(); |
276 } | 263 } |
277 | 264 |
278 // Request queue management. | 265 // Request queue management. |
279 | 266 |
280 /// Adds a new request to the queue. | 267 /// Adds a new request to the queue. |
281 void _addRequest(_EventRequest request) { | 268 void _addRequest(_EventRequest request) { |
282 if (_isDone) { | |
283 assert(_requestQueue.isEmpty); | |
284 if (!request.addEvents(_eventQueue)) { | |
285 request.close(_eventQueue); | |
286 } | |
287 return; | |
288 } | |
289 if (_requestQueue.isEmpty) { | 269 if (_requestQueue.isEmpty) { |
290 if (request.addEvents(_eventQueue)) return; | 270 if (request.addEvents(_eventQueue, _isDone)) return; |
291 _ensureListening(); | 271 _ensureListening(); |
292 } | 272 } |
293 _requestQueue.add(request); | 273 _requestQueue.add(request); |
294 } | 274 } |
295 | 275 |
296 /// Ensures that we are listening on events from [_sourceStream]. | 276 /// Ensures that we are listening on events from the event source. |
297 /// | 277 /// |
298 /// Resumes subscription on [_sourceStream], or creates it if necessary. | 278 /// Starts listening for the first time or resumes after a [_pause]. |
279 void _ensureListening(); | |
280 | |
281 /// Matches events with requests. | |
282 /// | |
283 /// Called after receiving an event or when the event source closes. | |
284 /// May be called by requests which have stalled after the event queue | |
nweiz
2015/08/21 19:43:57
"stalled" is confusing for me here—it's not a term
Lasse Reichstein Nielsen
2015/08/24 08:31:34
Done.
| |
285 /// is empty or when the event source is done. | |
286 void _checkQueues() { | |
287 while (_requestQueue.isNotEmpty) { | |
288 if (_requestQueue.first.addEvents(_eventQueue, _isDone)) { | |
289 _requestQueue.removeFirst(); | |
290 } else { | |
291 return; | |
292 } | |
293 } | |
294 if (!_isDone) { | |
nweiz
2015/08/21 19:43:57
Nit: I find it tends to look better when block-lev
Lasse Reichstein Nielsen
2015/08/24 08:31:34
I've added some extra lines (not all between block
| |
295 _pause(); | |
296 } | |
297 } | |
298 | |
299 /// Requests that the event source pauses events. | |
300 /// | |
301 /// The event source is restarted by the next call to [_ensureListening]. | |
302 void _pause(); | |
303 | |
304 /// Extracts a stream from the event source and makes this stream queue | |
305 /// unusable. | |
306 /// | |
307 /// Can only be used by the very last request. | |
308 /// Only used by [rest]. | |
309 Stream _extractStream(); | |
310 } | |
311 | |
312 | |
313 /// The default implementation of [StreamQueue]. | |
314 /// | |
315 /// This queue gets its events from a stream which is listened | |
316 /// to when a request needs events. | |
317 class _StreamQueue<T> extends StreamQueue<T> { | |
318 | |
nweiz
2015/08/21 19:43:57
Nit: extra newline.
Lasse Reichstein Nielsen
2015/08/24 08:31:33
Done.
| |
319 /// Source of events. | |
320 final Stream _sourceStream; | |
321 | |
322 /// Subscription on [_sourceStream] while listening for events. | |
323 /// | |
324 /// Set to subscription when listening, and set to `null` when the | |
325 /// subscription is done (and [_isDone] is set to true). | |
326 StreamSubscription _subscription; | |
327 | |
328 _StreamQueue(this._sourceStream) : super._(); | |
329 | |
330 Future _cancel() { | |
331 if (_isDone) return null; | |
332 if (_subscription == null) _subscription = _sourceStream.listen(null); | |
333 var future = _subscription.cancel(); | |
334 _onDone(); | |
335 return future; | |
336 } | |
337 | |
338 void _onData(T data) { | |
nweiz
2015/08/21 19:43:57
For what it's worth, these methods are so small I'
Lasse Reichstein Nielsen
2015/08/24 08:31:33
Done.
| |
339 _addResult(new Result.value(data)); | |
340 } | |
341 | |
342 void _onError(error, StackTrace stack) { | |
343 _addResult(new Result.error(error, stack)); | |
344 } | |
345 | |
346 void _onDone() { | |
347 _subscription = null; | |
348 super._onDone(); | |
349 } | |
350 | |
299 void _ensureListening() { | 351 void _ensureListening() { |
300 assert(!_isDone); | 352 assert(!_isDone); |
301 if (_subscription == null) { | 353 if (_subscription == null) { |
302 _subscription = | 354 _subscription = |
303 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | 355 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
304 } else { | 356 } else { |
305 _subscription.resume(); | 357 _subscription.resume(); |
306 } | 358 } |
307 } | 359 } |
308 | 360 |
309 /// Removes all requests and closes them. | 361 void _pause() { |
310 /// | 362 _subscription.pause(); |
311 /// Used when the source stream is done. | |
312 /// After this, no further requests will be added to the queue, | |
313 /// requests are immediately served entirely by events already in the event | |
314 /// queue, if any. | |
315 void _closeAllRequests() { | |
316 assert(_isDone); | |
317 while (_requestQueue.isNotEmpty) { | |
318 var request = _requestQueue.removeFirst(); | |
319 if (!request.addEvents(_eventQueue)) { | |
320 request.close(_eventQueue); | |
321 } | |
322 } | |
323 } | 363 } |
324 | 364 |
325 /// Matches events with requests. | 365 Stream<T> _extractStream() { |
326 /// | 366 assert(_isClosed); |
327 /// Called after receiving an event. | 367 if (_isDone) { |
328 void _checkQueues() { | 368 return new Stream<T>.empty(); |
329 while (_requestQueue.isNotEmpty) { | |
330 if (_requestQueue.first.addEvents(_eventQueue)) { | |
331 _requestQueue.removeFirst(); | |
332 } else { | |
333 return; | |
334 } | |
335 } | 369 } |
336 if (!_isDone) { | 370 if (_subscription == null) { |
337 _subscription.pause(); | 371 return _sourceStream; |
nweiz
2015/08/21 19:43:57
Nit: consider making this a single-line if.
Lasse Reichstein Nielsen
2015/08/24 08:31:34
I prefer to not hide significant control flow (ret
| |
338 } | 372 } |
339 } | |
340 | |
341 /// Extracts the subscription and makes this stream queue unusable. | |
342 /// | |
343 /// Can only be used by the very last request. | |
344 StreamSubscription _dispose() { | |
345 assert(_isClosed); | |
346 var subscription = _subscription; | 373 var subscription = _subscription; |
347 _subscription = null; | 374 _subscription = null; |
348 _isDone = true; | 375 _isDone = true; |
349 return subscription; | 376 if (!subscription.isPaused) subscription.pause(); |
nweiz
2015/08/21 19:43:57
Why pause and re-resume it?
Lasse Reichstein Nielsen
2015/08/24 08:31:34
We need to resume it if it was paused, because the
| |
377 var result = new SubscriptionStream<T>(subscription); | |
378 subscription.resume(); | |
379 return result; | |
350 } | 380 } |
351 } | 381 } |
352 | 382 |
383 | |
384 | |
nweiz
2015/08/21 19:43:57
Nit: three newlines here seems excessive. It's als
Lasse Reichstein Nielsen
2015/08/24 08:31:33
True, should only be two.
| |
353 /// Request object that receives events when they arrive, until fulfilled. | 385 /// Request object that receives events when they arrive, until fulfilled. |
354 /// | 386 /// |
355 /// Each request that cannot be fulfilled immediately is represented by | 387 /// Each request that cannot be fulfilled immediately is represented by |
356 /// an `_EventRequest` object in the request queue. | 388 /// an `_EventRequest` object in the request queue. |
357 /// | 389 /// |
358 /// Events from the source stream are sent to the first request in the | 390 /// Events from the source stream are sent to the first request in the |
359 /// queue until it reports itself as [isComplete]. | 391 /// queue until it reports itself as [isComplete]. |
360 /// | 392 /// |
361 /// When the first request in the queue `isComplete`, either when becoming | 393 /// When the first request in the queue `isComplete`, either when becoming |
362 /// the first request or after receiving an event, its [close] methods is | 394 /// the first request or after receiving an event, its [close] methods is |
(...skipping 12 matching lines...) Expand all Loading... | |
375 /// more events. | 407 /// more events. |
376 /// The call may keep events in the queue until the requeust is complete, | 408 /// The call may keep events in the queue until the requeust is complete, |
377 /// or it may remove them immediately. | 409 /// or it may remove them immediately. |
378 /// | 410 /// |
379 /// If the method returns true, the request is considered fulfilled, and | 411 /// If the method returns true, the request is considered fulfilled, and |
380 /// will never be called again. | 412 /// will never be called again. |
381 /// | 413 /// |
382 /// This method is called when a request reaches the front of the request | 414 /// This method is called when a request reaches the front of the request |
383 /// queue, and if it returns `false`, it's called again every time a new event | 415 /// queue, and if it returns `false`, it's called again every time a new event |
384 /// becomes available, or when the stream closes. | 416 /// becomes available, or when the stream closes. |
385 bool addEvents(Queue<Result> events); | 417 /// If the function returns `false` when the stream has already closed |
nweiz
2015/08/21 19:43:57
"the function" -> "this method" or "it"
What are
Lasse Reichstein Nielsen
2015/08/23 11:30:00
That's exactly for "lookahead". The look-ahead may
nweiz
2015/08/25 00:38:21
SGTM
| |
386 | 418 /// ([isDone] is true), then the request must call [StreamQueue._checkQueues] |
387 /// Complete the request. | 419 /// itself when it's ready to continue. |
388 /// | 420 bool addEvents(Queue<Result> events, bool isDone); |
389 /// This is called when the source stream is done before the request | |
390 /// had a chance to receive all its events. That is, after a call | |
391 /// to [addEvents] has returned `false`. | |
392 /// If there are any unused events available, they are in the [events] queue. | |
393 /// No further events will become available. | |
394 /// | |
395 /// The queue should only remove events from the front of the event queue, | |
396 /// e.g., using [removeFirst]. | |
397 /// | |
398 /// If the request kept events in the queue after an [addEvents] call, | |
399 /// this is the last chance to use them. | |
400 void close(Queue<Result> events); | |
401 } | 421 } |
402 | 422 |
403 /// Request for a [StreamQueue.next] call. | 423 /// Request for a [StreamQueue.next] call. |
404 /// | 424 /// |
405 /// Completes the returned future when receiving the first event, | 425 /// Completes the returned future when receiving the first event, |
406 /// and is then complete. | 426 /// and is then complete. |
407 class _NextRequest<T> implements _EventRequest { | 427 class _NextRequest<T> implements _EventRequest { |
408 /// Completer for the future returned by [StreamQueue.next]. | 428 /// Completer for the future returned by [StreamQueue.next]. |
409 final Completer _completer; | 429 final Completer _completer; |
410 | 430 |
411 _NextRequest() : _completer = new Completer<T>(); | 431 _NextRequest() : _completer = new Completer<T>(); |
412 | 432 |
413 Future<T> get future => _completer.future; | 433 Future<T> get future => _completer.future; |
414 | 434 |
415 bool addEvents(Queue<Result> events) { | 435 bool addEvents(Queue<Result> events, bool isDone) { |
416 if (events.isEmpty) return false; | 436 if (events.isNotEmpty) { |
417 events.removeFirst().complete(_completer); | 437 events.removeFirst().complete(_completer); |
418 return true; | 438 return true; |
419 } | 439 } |
420 | 440 if (isDone) { |
421 void close(Queue<Result> events) { | 441 var errorFuture = |
422 var errorFuture = | 442 new Future.sync(() => throw new StateError("No elements")); |
423 new Future.sync(() => throw new StateError("No elements")); | 443 _completer.complete(errorFuture); |
424 _completer.complete(errorFuture); | 444 return true; |
445 } | |
446 return false; | |
425 } | 447 } |
426 } | 448 } |
427 | 449 |
428 /// Request for a [StreamQueue.skip] call. | 450 /// Request for a [StreamQueue.skip] call. |
429 class _SkipRequest implements _EventRequest { | 451 class _SkipRequest implements _EventRequest { |
430 /// Completer for the future returned by the skip call. | 452 /// Completer for the future returned by the skip call. |
431 final Completer _completer = new Completer<int>(); | 453 final Completer _completer = new Completer<int>(); |
432 | 454 |
433 /// Number of remaining events to skip. | 455 /// Number of remaining events to skip. |
434 /// | 456 /// |
435 /// The request [isComplete] when the values reaches zero. | 457 /// The request [isComplete] when the values reaches zero. |
436 /// | 458 /// |
437 /// Decremented when an event is seen. | 459 /// Decremented when an event is seen. |
438 /// Set to zero when an error is seen since errors abort the skip request. | 460 /// Set to zero when an error is seen since errors abort the skip request. |
439 int _eventsToSkip; | 461 int _eventsToSkip; |
440 | 462 |
441 _SkipRequest(this._eventsToSkip); | 463 _SkipRequest(this._eventsToSkip); |
442 | 464 |
443 /// The future completed when the correct number of events have been skipped. | 465 /// The future completed when the correct number of events have been skipped. |
444 Future get future => _completer.future; | 466 Future get future => _completer.future; |
445 | 467 |
446 bool addEvents(Queue<Result> events) { | 468 bool addEvents(Queue<Result> events, bool isDone) { |
447 while (_eventsToSkip > 0) { | 469 while (_eventsToSkip > 0) { |
448 if (events.isEmpty) return false; | 470 if (events.isEmpty) { |
471 if (isDone) break; | |
472 return false; | |
473 } | |
449 _eventsToSkip--; | 474 _eventsToSkip--; |
450 var event = events.removeFirst(); | 475 var event = events.removeFirst(); |
451 if (event.isError) { | 476 if (event.isError) { |
452 event.complete(_completer); | 477 event.complete(_completer); |
453 return true; | 478 return true; |
454 } | 479 } |
455 } | 480 } |
456 _completer.complete(0); | 481 _completer.complete(_eventsToSkip); |
457 return true; | 482 return true; |
458 } | 483 } |
459 | |
460 void close(Queue<Result> events) { | |
461 _completer.complete(_eventsToSkip); | |
462 } | |
463 } | 484 } |
464 | 485 |
465 /// Request for a [StreamQueue.take] call. | 486 /// Request for a [StreamQueue.take] call. |
466 class _TakeRequest<T> implements _EventRequest { | 487 class _TakeRequest<T> implements _EventRequest { |
467 /// Completer for the future returned by the take call. | 488 /// Completer for the future returned by the take call. |
468 final Completer _completer; | 489 final Completer _completer; |
469 | 490 |
470 /// List collecting events until enough have been seen. | 491 /// List collecting events until enough have been seen. |
471 final List _list = <T>[]; | 492 final List _list = <T>[]; |
472 | 493 |
473 /// Number of events to capture. | 494 /// Number of events to capture. |
474 /// | 495 /// |
475 /// The request [isComplete] when the length of [_list] reaches | 496 /// The request [isComplete] when the length of [_list] reaches |
476 /// this value. | 497 /// this value. |
477 final int _eventsToTake; | 498 final int _eventsToTake; |
478 | 499 |
479 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | 500 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
480 | 501 |
481 /// The future completed when the correct number of events have been captured. | 502 /// The future completed when the correct number of events have been captured. |
482 Future get future => _completer.future; | 503 Future get future => _completer.future; |
483 | 504 |
484 bool addEvents(Queue<Result> events) { | 505 bool addEvents(Queue<Result> events, bool isDone) { |
485 while (_list.length < _eventsToTake) { | 506 while (_list.length < _eventsToTake) { |
486 if (events.isEmpty) return false; | 507 if (events.isEmpty) { |
508 if (isDone) break; | |
509 return false; | |
510 } | |
487 var result = events.removeFirst(); | 511 var result = events.removeFirst(); |
488 if (result.isError) { | 512 if (result.isError) { |
489 result.complete(_completer); | 513 result.complete(_completer); |
490 return true; | 514 return true; |
491 } | 515 } |
492 _list.add(result.asValue.value); | 516 _list.add(result.asValue.value); |
493 } | 517 } |
494 _completer.complete(_list); | 518 _completer.complete(_list); |
495 return true; | 519 return true; |
496 } | 520 } |
497 | |
498 void close(Queue<Result> events) { | |
499 _completer.complete(_list); | |
500 } | |
501 } | 521 } |
502 | 522 |
503 /// Request for a [StreamQueue.cancel] call. | 523 /// Request for a [StreamQueue.cancel] call. |
504 /// | 524 /// |
505 /// The request needs no events, it just waits in the request queue | 525 /// The request needs no events, it just waits in the request queue |
506 /// until all previous events are fulfilled, then it cancels the stream queue | 526 /// until all previous events are fulfilled, then it cancels the stream queue |
507 /// source subscription. | 527 /// source subscription. |
508 class _CancelRequest implements _EventRequest { | 528 class _CancelRequest implements _EventRequest { |
509 /// Completer for the future returned by the `cancel` call. | 529 /// Completer for the future returned by the `cancel` call. |
510 final Completer _completer = new Completer(); | 530 final Completer _completer = new Completer(); |
511 | 531 |
512 /// The [StreamQueue] object that has this request queued. | 532 /// The [StreamQueue] object that has this request queued. |
513 /// | 533 /// |
514 /// When the event is completed, it needs to cancel the active subscription | 534 /// When the event is completed, it needs to cancel the active subscription |
515 /// of the `StreamQueue` object, if any. | 535 /// of the `StreamQueue` object, if any. |
516 final StreamQueue _streamQueue; | 536 final StreamQueue _streamQueue; |
517 | 537 |
518 _CancelRequest(this._streamQueue); | 538 _CancelRequest(this._streamQueue); |
519 | 539 |
520 /// The future completed when the cancel request is completed. | 540 /// The future completed when the cancel request is completed. |
521 Future get future => _completer.future; | 541 Future get future => _completer.future; |
522 | 542 |
523 bool addEvents(Queue<Result> events) { | 543 bool addEvents(Queue<Result> events, bool isDone) { |
524 _shutdown(); | 544 _completer.complete(_streamQueue._cancel()); |
525 return true; | 545 return true; |
526 } | 546 } |
527 | |
528 void close(_) { | |
529 _shutdown(); | |
530 } | |
531 | |
532 void _shutdown() { | |
533 if (_streamQueue._subscription == null) { | |
534 _completer.complete(); | |
535 } else { | |
536 _completer.complete(_streamQueue._dispose().cancel()); | |
537 } | |
538 } | |
539 } | 547 } |
540 | 548 |
541 /// Request for a [StreamQueue.rest] call. | 549 /// Request for a [StreamQueue.rest] call. |
542 /// | 550 /// |
543 /// The request is always complete, it just waits in the request queue | 551 /// The request is always complete, it just waits in the request queue |
544 /// until all previous events are fulfilled, then it takes over the | 552 /// until all previous events are fulfilled, then it takes over the |
545 /// stream events subscription and creates a stream from it. | 553 /// stream events subscription and creates a stream from it. |
546 class _RestRequest<T> implements _EventRequest { | 554 class _RestRequest<T> implements _EventRequest { |
547 /// Completer for the stream returned by the `rest` call. | 555 /// Completer for the stream returned by the `rest` call. |
548 final StreamCompleter _completer = new StreamCompleter<T>(); | 556 final StreamCompleter _completer = new StreamCompleter<T>(); |
549 | 557 |
550 /// The [StreamQueue] object that has this request queued. | 558 /// The [StreamQueue] object that has this request queued. |
551 /// | 559 /// |
552 /// When the event is completed, it needs to cancel the active subscription | 560 /// When the event is completed, it needs to cancel the active subscription |
553 /// of the `StreamQueue` object, if any. | 561 /// of the `StreamQueue` object, if any. |
554 final StreamQueue _streamQueue; | 562 final StreamQueue _streamQueue; |
555 | 563 |
556 _RestRequest(this._streamQueue); | 564 _RestRequest(this._streamQueue); |
557 | 565 |
558 /// The stream which will contain the remaining events of [_streamQueue]. | 566 /// The stream which will contain the remaining events of [_streamQueue]. |
559 Stream<T> get stream => _completer.stream; | 567 Stream<T> get stream => _completer.stream; |
560 | 568 |
561 bool addEvents(Queue<Result> events) { | 569 bool addEvents(Queue<Result> events, bool isDone) { |
562 _completeStream(events); | |
563 return true; | |
564 } | |
565 | |
566 void close(Queue<Result> events) { | |
567 _completeStream(events); | |
568 } | |
569 | |
570 void _completeStream(Queue<Result> events) { | |
571 if (events.isEmpty) { | 570 if (events.isEmpty) { |
572 if (_streamQueue._isDone) { | 571 if (_streamQueue._isDone) { |
573 _completer.setEmpty(); | 572 _completer.setEmpty(); |
574 } else { | 573 } else { |
575 _completer.setSourceStream(_getRestStream()); | 574 _completer.setSourceStream(_streamQueue._extractStream()); |
576 } | 575 } |
577 } else { | 576 } else { |
578 // There are prefetched events which needs to be added before the | 577 // There are prefetched events which needs to be added before the |
579 // remaining stream. | 578 // remaining stream. |
580 var controller = new StreamController<T>(); | 579 var controller = new StreamController<T>(); |
581 for (var event in events) { | 580 for (var event in events) { |
582 event.addTo(controller); | 581 event.addTo(controller); |
583 } | 582 } |
584 controller.addStream(_getRestStream(), cancelOnError: false) | 583 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) |
585 .whenComplete(controller.close); | 584 .whenComplete(controller.close); |
586 _completer.setSourceStream(controller.stream); | 585 _completer.setSourceStream(controller.stream); |
587 } | 586 } |
588 } | 587 return true; |
589 | |
590 /// Create a stream from the rest of [_streamQueue]'s subscription. | |
591 Stream _getRestStream() { | |
592 if (_streamQueue._isDone) { | |
593 var controller = new StreamController<T>()..close(); | |
594 return controller.stream; | |
595 // TODO(lrn). Use the following when 1.11 is released. | |
596 // return new Stream<T>.empty(); | |
597 } | |
598 if (_streamQueue._subscription == null) { | |
599 return _streamQueue._sourceStream; | |
600 } | |
601 var subscription = _streamQueue._dispose(); | |
602 subscription.resume(); | |
603 return new SubscriptionStream<T>(subscription); | |
604 } | 588 } |
605 } | 589 } |
606 | 590 |
607 /// Request for a [StreamQueue.hasNext] call. | 591 /// Request for a [StreamQueue.hasNext] call. |
608 /// | 592 /// |
609 /// Completes the [future] with `true` if it sees any event, | 593 /// Completes the [future] with `true` if it sees any event, |
610 /// but doesn't consume the event. | 594 /// but doesn't consume the event. |
611 /// If the request is closed without seeing an event, then | 595 /// If the request is closed without seeing an event, then |
612 /// the [future] is completed with `false`. | 596 /// the [future] is completed with `false`. |
613 class _HasNextRequest<T> implements _EventRequest { | 597 class _HasNextRequest<T> implements _EventRequest { |
614 final Completer _completer = new Completer<bool>(); | 598 final Completer _completer = new Completer<bool>(); |
615 | 599 |
616 Future<bool> get future => _completer.future; | 600 Future<bool> get future => _completer.future; |
617 | 601 |
618 bool addEvents(Queue<Result> events) { | 602 bool addEvents(Queue<Result> events, bool isDone) { |
619 if (events.isNotEmpty) { | 603 if (events.isNotEmpty) { |
620 _completer.complete(true); | 604 _completer.complete(true); |
621 return true; | 605 return true; |
622 } | 606 } |
607 if (isDone) { | |
608 _completer.complete(false); | |
609 return true; | |
610 } | |
623 return false; | 611 return false; |
624 } | 612 } |
625 | |
626 void close(_) { | |
627 _completer.complete(false); | |
628 } | |
629 } | 613 } |
OLD | NEW |