OLD | NEW |
---|---|
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.stream_events; | 5 library async.stream_events; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 | 9 |
10 import "subscription_stream.dart"; | 10 import "subscription_stream.dart"; |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
53 /// var headerCount = | 53 /// var headerCount = |
54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); | 54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); |
55 /// handleMessage(headers: await events.take(headerCount), | 55 /// handleMessage(headers: await events.take(headerCount), |
56 /// body: events.rest); | 56 /// body: events.rest); |
57 /// return; | 57 /// return; |
58 /// } | 58 /// } |
59 /// // Error handling. | 59 /// // Error handling. |
60 /// | 60 /// |
61 /// When you need no further events the `StreamQueue` should be closed | 61 /// When you need no further events the `StreamQueue` should be closed |
62 /// using [cancel]. This releases the underlying stream subscription. | 62 /// using [cancel]. This releases the underlying stream subscription. |
63 class StreamQueue<T> { | 63 abstract class StreamQueue<T> { |
64 // This class maintains two queues: one of events and one of requests. | 64 // This class maintains two queues: one of events and one of requests. |
65 // The active request (the one in front of the queue) is called with | 65 // The active request (the one in front of the queue) is called with |
66 // the current event queue when it becomes active. | 66 // the current event queue when it becomes active, every time a |
67 // new event arrives, and when the event source closes. | |
67 // | 68 // |
68 // If the request returns true, it's complete and will be removed from the | 69 // If the request returns `true`, it's complete and will be removed from the |
69 // request queue. | 70 // request queue. |
70 // If the request returns false, it needs more events, and will be called | 71 // If the request returns `false`, it needs more events, and will be called |
71 // again when new events are available. | 72 // again when new events are available. It may trigger a call itself by |
73 // calling [_updateRequests]. | |
72 // The request can remove events that it uses, or keep them in the event | 74 // The request can remove events that it uses, or keep them in the event |
73 // queue until it has all that it needs. | 75 // queue until it has all that it needs. |
74 // | 76 // |
75 // This model is very flexible and easily extensible. | 77 // This model is very flexible and easily extensible. |
76 // It allows requests that don't consume events (like [hasNext]) or | 78 // It allows requests that don't consume events (like [hasNext]) or |
77 // potentially a request that takes either five or zero events, determined | 79 // potentially a request that takes either five or zero events, determined |
78 // by the content of the fifth event. | 80 // by the content of the fifth event. |
79 | 81 |
80 /// Source of events. | 82 /// Whether the event source is done. |
81 final Stream _sourceStream; | |
82 | |
83 /// Subscription on [_sourceStream] while listening for events. | |
84 /// | |
85 /// Set to subscription when listening, and set to `null` when the | |
86 /// subscription is done (and [_isDone] is set to true). | |
87 StreamSubscription _subscription; | |
88 | |
89 /// Whether we have listened on [_sourceStream] and the subscription is done. | |
90 bool _isDone = false; | 83 bool _isDone = false; |
91 | 84 |
92 /// Whether a closing operation has been performed on the stream queue. | 85 /// Whether a closing operation has been performed on the stream queue. |
93 /// | 86 /// |
94 /// Closing operations are [cancel] and [rest]. | 87 /// Closing operations are [cancel] and [rest]. |
95 bool _isClosed = false; | 88 bool _isClosed = false; |
96 | 89 |
97 /// Queue of events not used by a request yet. | 90 /// Queue of events not used by a request yet. |
98 final Queue<Result> _eventQueue = new Queue(); | 91 final Queue<Result> _eventQueue = new Queue(); |
99 | 92 |
100 /// Queue of pending requests. | 93 /// Queue of pending requests. |
101 /// | 94 /// |
102 /// Access through methods below to ensure consistency. | 95 /// Access through methods below to ensure consistency. |
103 final Queue<_EventRequest> _requestQueue = new Queue(); | 96 final Queue<_EventRequest> _requestQueue = new Queue(); |
104 | 97 |
105 /// Create a `StreamQueue` of the events of [source]. | 98 /// Create a `StreamQueue` of the events of [source]. |
106 StreamQueue(Stream source) | 99 factory StreamQueue(Stream source) = _StreamQueue<T>; |
107 : _sourceStream = source; | 100 |
101 StreamQueue._(); | |
108 | 102 |
109 /// Asks if the stream has any more events. | 103 /// Asks if the stream has any more events. |
110 /// | 104 /// |
111 /// Returns a future that completes with `true` if the stream has any | 105 /// Returns a future that completes with `true` if the stream has any |
112 /// more events, whether data or error. | 106 /// more events, whether data or error. |
113 /// If the stream closes without producing any more events, the returned | 107 /// If the stream closes without producing any more events, the returned |
114 /// future completes with `false`. | 108 /// future completes with `false`. |
115 /// | 109 /// |
116 /// Can be used before using [next] to avoid getting an error in the | 110 /// Can be used before using [next] to avoid getting an error in the |
117 /// future returned by `next` in the case where there are no more events. | 111 /// future returned by `next` in the case where there are no more events. |
112 /// Another alternative is to use `take(1)` which returns either zero or | |
113 /// one events. | |
118 Future<bool> get hasNext { | 114 Future<bool> get hasNext { |
119 if (!_isClosed) { | 115 if (!_isClosed) { |
120 var hasNextRequest = new _HasNextRequest(); | 116 var hasNextRequest = new _HasNextRequest(); |
121 _addRequest(hasNextRequest); | 117 _addRequest(hasNextRequest); |
122 return hasNextRequest.future; | 118 return hasNextRequest.future; |
123 } | 119 } |
124 throw _failClosed(); | 120 throw _failClosed(); |
125 } | 121 } |
126 | 122 |
127 /// Requests the next (yet unrequested) event from the stream. | 123 /// Requests the next (yet unrequested) event from the stream. |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
209 Future<List<T>> take(int count) { | 205 Future<List<T>> take(int count) { |
210 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 206 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
211 if (!_isClosed) { | 207 if (!_isClosed) { |
212 var request = new _TakeRequest<T>(count); | 208 var request = new _TakeRequest<T>(count); |
213 _addRequest(request); | 209 _addRequest(request); |
214 return request.future; | 210 return request.future; |
215 } | 211 } |
216 throw _failClosed(); | 212 throw _failClosed(); |
217 } | 213 } |
218 | 214 |
219 /// Cancels the underlying stream subscription. | 215 /// Cancels the underlying event source. |
220 /// | 216 /// |
221 /// If [immediate] is `false` (the default), the cancel operation waits until | 217 /// If [immediate] is `false` (the default), the cancel operation waits until |
222 /// all previously requested events have been processed, then it cancels the | 218 /// all previously requested events have been processed, then it cancels the |
223 /// subscription providing the events. | 219 /// subscription providing the events. |
224 /// | 220 /// |
225 /// If [immediate] is `true`, the subscription is instead canceled | 221 /// If [immediate] is `true`, the source is instead canceled |
226 /// immediately. Any pending events are completed as though the underlying | 222 /// immediately. Any pending events are completed as though the underlying |
227 /// stream had closed. | 223 /// stream had closed. |
228 /// | 224 /// |
229 /// The returned future completes with the result of calling | 225 /// The returned future completes with the result of calling |
230 /// `cancel`. | 226 /// `cancel`. |
231 /// | 227 /// |
232 /// After calling `cancel`, no further events can be requested. | 228 /// After calling `cancel`, no further events can be requested. |
233 /// None of [next], [rest], [skip], [take] or [cancel] may be | 229 /// None of [next], [rest], [skip], [take] or [cancel] may be |
234 /// called again. | 230 /// called again. |
235 Future cancel({bool immediate: false}) { | 231 Future cancel({bool immediate: false}) { |
236 if (_isClosed) throw _failClosed(); | 232 if (_isClosed) throw _failClosed(); |
237 _isClosed = true; | 233 _isClosed = true; |
238 | 234 |
239 if (!immediate) { | 235 if (!immediate) { |
240 var request = new _CancelRequest(this); | 236 var request = new _CancelRequest(this); |
241 _addRequest(request); | 237 _addRequest(request); |
242 return request.future; | 238 return request.future; |
243 } | 239 } |
244 | 240 |
245 if (_isDone) return new Future.value(); | 241 if (_isDone && _eventQueue.isEmpty) return new Future.value(); |
246 if (_subscription == null) _subscription = _sourceStream.listen(null); | 242 return _cancel(); |
247 var future = _subscription.cancel(); | |
248 _onDone(); | |
249 return future; | |
250 } | 243 } |
251 | 244 |
245 // ------------------------------------------------------------------ | |
246 // Methods that may be called from the request implementations to | |
247 // control the even stream. | |
248 | |
249 /// Matches events with requests. | |
250 /// | |
251 /// Called after receiving an event or when the event source closes. | |
252 /// | |
253 /// May be called by requests which have returned `false` (saying they | |
254 /// are not yet done) so they can be checked again before any new | |
255 /// events arrive. | |
256 /// Any request returing `false` from `update` when `isDone` is `true` | |
257 /// *must* call `_updateRequests` when they are ready to continue | |
258 /// (since no further events will trigger the call). | |
259 void _updateRequests() { | |
260 while (_requestQueue.isNotEmpty) { | |
261 if (_requestQueue.first.update(_eventQueue, _isDone)) { | |
262 _requestQueue.removeFirst(); | |
263 } else { | |
264 return; | |
265 } | |
266 } | |
267 | |
268 if (!_isDone) { | |
269 _pause(); | |
270 } | |
271 } | |
272 | |
273 /// Extracts a stream from the event source and makes this stream queue | |
274 /// unusable. | |
275 /// | |
276 /// Can only be used by the very last request (the stream queue must | |
277 /// be closed by that request). | |
278 /// Only used by [rest]. | |
279 Stream _extractStream(); | |
280 | |
281 /// Requests that the event source pauses events. | |
282 /// | |
283 /// This is called automatically when the request queue is empty. | |
284 /// | |
285 /// The event source is restarted by the next call to [_ensureListening]. | |
286 void _pause(); | |
287 | |
288 /// Ensures that we are listening on events from the event source. | |
289 /// | |
290 /// Starts listening for the first time or resumes after a [_pause]. | |
291 /// | |
292 /// Is called automatically if a request requires more events. | |
293 void _ensureListening(); | |
294 | |
295 /// Cancels the underlying event source. | |
296 Future _cancel(); | |
297 | |
298 // ------------------------------------------------------------------ | |
299 // Methods called by the event source to add events or say that it's | |
300 // done. | |
301 | |
302 /// Called when the event source adds a new data or error event. | |
303 /// Always calls [_updateRequests] after adding. | |
304 void _addResult(Result result) { | |
305 _eventQueue.add(result); | |
306 _updateRequests(); | |
307 } | |
308 | |
309 /// Called when the event source is done. | |
310 /// Always calls [_updateRequests] after adding. | |
311 void _close() { | |
312 _isDone = true; | |
313 _updateRequests(); | |
314 } | |
315 | |
316 // ------------------------------------------------------------------ | |
317 // Internal helper methods. | |
318 | |
252 /// Returns an error for when a request is made after cancel. | 319 /// Returns an error for when a request is made after cancel. |
253 /// | 320 /// |
254 /// Returns a [StateError] with a message saying that either | 321 /// Returns a [StateError] with a message saying that either |
255 /// [cancel] or [rest] have already been called. | 322 /// [cancel] or [rest] have already been called. |
256 Error _failClosed() { | 323 Error _failClosed() { |
257 return new StateError("Already cancelled"); | 324 return new StateError("Already cancelled"); |
258 } | 325 } |
259 | 326 |
260 // Callbacks receiving the events of the source stream. | |
261 | |
262 void _onData(T data) { | |
263 _eventQueue.add(new Result.value(data)); | |
264 _checkQueues(); | |
265 } | |
266 | |
267 void _onError(error, StackTrace stack) { | |
268 _eventQueue.add(new Result.error(error, stack)); | |
269 _checkQueues(); | |
270 } | |
271 | |
272 void _onDone() { | |
273 _subscription = null; | |
274 _isDone = true; | |
275 _closeAllRequests(); | |
276 } | |
277 | |
278 // Request queue management. | |
279 | |
280 /// Adds a new request to the queue. | 327 /// Adds a new request to the queue. |
328 /// | |
329 /// If the request queue is empty and the request can be completed | |
330 /// immediately, it skips the queue. | |
281 void _addRequest(_EventRequest request) { | 331 void _addRequest(_EventRequest request) { |
282 if (_isDone) { | |
283 assert(_requestQueue.isEmpty); | |
284 if (!request.addEvents(_eventQueue)) { | |
285 request.close(_eventQueue); | |
286 } | |
287 return; | |
288 } | |
289 if (_requestQueue.isEmpty) { | 332 if (_requestQueue.isEmpty) { |
290 if (request.addEvents(_eventQueue)) return; | 333 if (request.update(_eventQueue, _isDone)) return; |
291 _ensureListening(); | 334 _ensureListening(); |
292 } | 335 } |
293 _requestQueue.add(request); | 336 _requestQueue.add(request); |
294 } | 337 } |
338 } | |
295 | 339 |
296 /// Ensures that we are listening on events from [_sourceStream]. | 340 |
341 /// The default implementation of [StreamQueue]. | |
342 /// | |
343 /// This queue gets its events from a stream which is listened | |
344 /// to when a request needs events. | |
345 class _StreamQueue<T> extends StreamQueue<T> { | |
346 /// Source of events. | |
347 final Stream _sourceStream; | |
348 | |
349 /// Subscription on [_sourceStream] while listening for events. | |
297 /// | 350 /// |
298 /// Resumes subscription on [_sourceStream], or creates it if necessary. | 351 /// Set to subscription when listening, and set to `null` when the |
352 /// subscription is done (and [_isDone] is set to true). | |
353 StreamSubscription _subscription; | |
354 | |
355 _StreamQueue(this._sourceStream) : super._(); | |
356 | |
357 Future _cancel() { | |
358 if (_isDone) return null; | |
359 if (_subscription == null) _subscription = _sourceStream.listen(null); | |
360 var future = _subscription.cancel(); | |
361 _close(); | |
362 return future; | |
363 } | |
364 | |
299 void _ensureListening() { | 365 void _ensureListening() { |
300 assert(!_isDone); | 366 assert(!_isDone); |
301 if (_subscription == null) { | 367 if (_subscription == null) { |
302 _subscription = | 368 _subscription = |
303 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | 369 _sourceStream.listen( |
370 (data) { | |
371 _addResult(new Result.value(data)); | |
372 }, | |
373 onError: (error, StackTrace stackTrace) { | |
374 _addResult(new Result.error(error, stackTrace)); | |
375 }, | |
376 onDone: () { | |
377 _subscription = null; | |
378 this._close(); | |
379 }); | |
304 } else { | 380 } else { |
305 _subscription.resume(); | 381 _subscription.resume(); |
306 } | 382 } |
307 } | 383 } |
308 | 384 |
309 /// Removes all requests and closes them. | 385 void _pause() { |
310 /// | 386 _subscription.pause(); |
311 /// Used when the source stream is done. | |
312 /// After this, no further requests will be added to the queue, | |
313 /// requests are immediately served entirely by events already in the event | |
314 /// queue, if any. | |
315 void _closeAllRequests() { | |
316 assert(_isDone); | |
317 while (_requestQueue.isNotEmpty) { | |
318 var request = _requestQueue.removeFirst(); | |
319 if (!request.addEvents(_eventQueue)) { | |
320 request.close(_eventQueue); | |
321 } | |
322 } | |
323 } | 387 } |
324 | 388 |
325 /// Matches events with requests. | 389 Stream<T> _extractStream() { |
326 /// | 390 assert(_isClosed); |
327 /// Called after receiving an event. | 391 if (_isDone) { |
328 void _checkQueues() { | 392 return new Stream<T>.empty(); |
329 while (_requestQueue.isNotEmpty) { | |
330 if (_requestQueue.first.addEvents(_eventQueue)) { | |
331 _requestQueue.removeFirst(); | |
332 } else { | |
333 return; | |
334 } | |
335 } | 393 } |
336 if (!_isDone) { | 394 |
337 _subscription.pause(); | 395 if (_subscription == null) { |
396 return _sourceStream; | |
338 } | 397 } |
339 } | |
340 | 398 |
341 /// Extracts the subscription and makes this stream queue unusable. | |
342 /// | |
343 /// Can only be used by the very last request. | |
344 StreamSubscription _dispose() { | |
345 assert(_isClosed); | |
346 var subscription = _subscription; | 399 var subscription = _subscription; |
347 _subscription = null; | 400 _subscription = null; |
348 _isDone = true; | 401 _isDone = true; |
349 return subscription; | 402 |
403 var wasPaused = subscription.isPaused; | |
404 var result = new SubscriptionStream<T>(subscription); | |
405 // Resume after creating stream because that pauses the subscription too. | |
406 // This way there won't be a short resumption in the middle. | |
407 if (wasPaused) subscription.resume(); | |
408 return result; | |
350 } | 409 } |
351 } | 410 } |
352 | 411 |
412 | |
353 /// Request object that receives events when they arrive, until fulfilled. | 413 /// Request object that receives events when they arrive, until fulfilled. |
354 /// | 414 /// |
355 /// Each request that cannot be fulfilled immediately is represented by | 415 /// Each request that cannot be fulfilled immediately is represented by |
356 /// an `_EventRequest` object in the request queue. | 416 /// an `_EventRequest` object in the request queue. |
357 /// | 417 /// |
358 /// Events from the source stream are sent to the first request in the | 418 /// Events from the source stream are sent to the first request in the |
359 /// queue until it reports itself as [isComplete]. | 419 /// queue until it reports itself as [isComplete]. |
360 /// | 420 /// |
361 /// When the first request in the queue `isComplete`, either when becoming | 421 /// When the first request in the queue `isComplete`, either when becoming |
362 /// the first request or after receiving an event, its [close] methods is | 422 /// the first request or after receiving an event, its [close] methods is |
363 /// called. | 423 /// called. |
364 /// | 424 /// |
365 /// The [close] method is also called immediately when the source stream | 425 /// The [close] method is also called immediately when the source stream |
366 /// is done. | 426 /// is done. |
367 abstract class _EventRequest { | 427 abstract class _EventRequest { |
368 /// Handle available events. | 428 /// Handle available events. |
369 /// | 429 /// |
370 /// The available events are provided as a queue. The `addEvents` function | 430 /// The available events are provided as a queue. The `update` function |
371 /// should only remove events from the front of the event queue, e.g., | 431 /// should only remove events from the front of the event queue, e.g., |
372 /// using [removeFirst]. | 432 /// using [removeFirst]. |
373 /// | 433 /// |
374 /// Returns `true` if the request is completed, or `false` if it needs | 434 /// Returns `true` if the request is completed, or `false` if it needs |
375 /// more events. | 435 /// more events. |
376 /// The call may keep events in the queue until the requeust is complete, | 436 /// The call may keep events in the queue until the requeust is complete, |
377 /// or it may remove them immediately. | 437 /// or it may remove them immediately. |
378 /// | 438 /// |
379 /// If the method returns true, the request is considered fulfilled, and | 439 /// If the method returns true, the request is considered fulfilled, and |
380 /// will never be called again. | 440 /// will never be called again. |
381 /// | 441 /// |
382 /// This method is called when a request reaches the front of the request | 442 /// This method is called when a request reaches the front of the request |
383 /// queue, and if it returns `false`, it's called again every time a new event | 443 /// queue, and if it returns `false`, it's called again every time a new event |
384 /// becomes available, or when the stream closes. | 444 /// becomes available, or when the stream closes. |
385 bool addEvents(Queue<Result> events); | 445 /// If the function returns `false` when the stream has already closed |
386 | 446 /// ([isDone] is true), then the request must call [StreamQueue._updateRequest s] |
nweiz
2015/08/25 00:38:21
Long line
| |
387 /// Complete the request. | 447 /// itself when it's ready to continue. |
388 /// | 448 bool update(Queue<Result> events, bool isDone); |
389 /// This is called when the source stream is done before the request | |
390 /// had a chance to receive all its events. That is, after a call | |
391 /// to [addEvents] has returned `false`. | |
392 /// If there are any unused events available, they are in the [events] queue. | |
393 /// No further events will become available. | |
394 /// | |
395 /// The queue should only remove events from the front of the event queue, | |
396 /// e.g., using [removeFirst]. | |
397 /// | |
398 /// If the request kept events in the queue after an [addEvents] call, | |
399 /// this is the last chance to use them. | |
400 void close(Queue<Result> events); | |
401 } | 449 } |
402 | 450 |
403 /// Request for a [StreamQueue.next] call. | 451 /// Request for a [StreamQueue.next] call. |
404 /// | 452 /// |
405 /// Completes the returned future when receiving the first event, | 453 /// Completes the returned future when receiving the first event, |
406 /// and is then complete. | 454 /// and is then complete. |
407 class _NextRequest<T> implements _EventRequest { | 455 class _NextRequest<T> implements _EventRequest { |
408 /// Completer for the future returned by [StreamQueue.next]. | 456 /// Completer for the future returned by [StreamQueue.next]. |
409 final Completer _completer; | 457 final Completer _completer; |
410 | 458 |
411 _NextRequest() : _completer = new Completer<T>(); | 459 _NextRequest() : _completer = new Completer<T>(); |
412 | 460 |
413 Future<T> get future => _completer.future; | 461 Future<T> get future => _completer.future; |
414 | 462 |
415 bool addEvents(Queue<Result> events) { | 463 bool update(Queue<Result> events, bool isDone) { |
416 if (events.isEmpty) return false; | 464 if (events.isNotEmpty) { |
417 events.removeFirst().complete(_completer); | 465 events.removeFirst().complete(_completer); |
418 return true; | 466 return true; |
419 } | 467 } |
420 | 468 if (isDone) { |
421 void close(Queue<Result> events) { | 469 var errorFuture = |
422 var errorFuture = | 470 new Future.sync(() => throw new StateError("No elements")); |
423 new Future.sync(() => throw new StateError("No elements")); | 471 _completer.complete(errorFuture); |
424 _completer.complete(errorFuture); | 472 return true; |
473 } | |
474 return false; | |
425 } | 475 } |
426 } | 476 } |
427 | 477 |
428 /// Request for a [StreamQueue.skip] call. | 478 /// Request for a [StreamQueue.skip] call. |
429 class _SkipRequest implements _EventRequest { | 479 class _SkipRequest implements _EventRequest { |
430 /// Completer for the future returned by the skip call. | 480 /// Completer for the future returned by the skip call. |
431 final Completer _completer = new Completer<int>(); | 481 final Completer _completer = new Completer<int>(); |
432 | 482 |
433 /// Number of remaining events to skip. | 483 /// Number of remaining events to skip. |
434 /// | 484 /// |
435 /// The request [isComplete] when the values reaches zero. | 485 /// The request [isComplete] when the values reaches zero. |
436 /// | 486 /// |
437 /// Decremented when an event is seen. | 487 /// Decremented when an event is seen. |
438 /// Set to zero when an error is seen since errors abort the skip request. | 488 /// Set to zero when an error is seen since errors abort the skip request. |
439 int _eventsToSkip; | 489 int _eventsToSkip; |
440 | 490 |
441 _SkipRequest(this._eventsToSkip); | 491 _SkipRequest(this._eventsToSkip); |
442 | 492 |
443 /// The future completed when the correct number of events have been skipped. | 493 /// The future completed when the correct number of events have been skipped. |
444 Future get future => _completer.future; | 494 Future get future => _completer.future; |
445 | 495 |
446 bool addEvents(Queue<Result> events) { | 496 bool update(Queue<Result> events, bool isDone) { |
447 while (_eventsToSkip > 0) { | 497 while (_eventsToSkip > 0) { |
448 if (events.isEmpty) return false; | 498 if (events.isEmpty) { |
499 if (isDone) break; | |
500 return false; | |
501 } | |
449 _eventsToSkip--; | 502 _eventsToSkip--; |
503 | |
450 var event = events.removeFirst(); | 504 var event = events.removeFirst(); |
451 if (event.isError) { | 505 if (event.isError) { |
452 event.complete(_completer); | 506 event.complete(_completer); |
453 return true; | 507 return true; |
454 } | 508 } |
455 } | 509 } |
456 _completer.complete(0); | 510 _completer.complete(_eventsToSkip); |
457 return true; | 511 return true; |
458 } | 512 } |
459 | |
460 void close(Queue<Result> events) { | |
461 _completer.complete(_eventsToSkip); | |
462 } | |
463 } | 513 } |
464 | 514 |
465 /// Request for a [StreamQueue.take] call. | 515 /// Request for a [StreamQueue.take] call. |
466 class _TakeRequest<T> implements _EventRequest { | 516 class _TakeRequest<T> implements _EventRequest { |
467 /// Completer for the future returned by the take call. | 517 /// Completer for the future returned by the take call. |
468 final Completer _completer; | 518 final Completer _completer; |
469 | 519 |
470 /// List collecting events until enough have been seen. | 520 /// List collecting events until enough have been seen. |
471 final List _list = <T>[]; | 521 final List _list = <T>[]; |
472 | 522 |
473 /// Number of events to capture. | 523 /// Number of events to capture. |
474 /// | 524 /// |
475 /// The request [isComplete] when the length of [_list] reaches | 525 /// The request [isComplete] when the length of [_list] reaches |
476 /// this value. | 526 /// this value. |
477 final int _eventsToTake; | 527 final int _eventsToTake; |
478 | 528 |
479 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | 529 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
480 | 530 |
481 /// The future completed when the correct number of events have been captured. | 531 /// The future completed when the correct number of events have been captured. |
482 Future get future => _completer.future; | 532 Future get future => _completer.future; |
483 | 533 |
484 bool addEvents(Queue<Result> events) { | 534 bool update(Queue<Result> events, bool isDone) { |
485 while (_list.length < _eventsToTake) { | 535 while (_list.length < _eventsToTake) { |
486 if (events.isEmpty) return false; | 536 if (events.isEmpty) { |
537 if (isDone) break; | |
538 return false; | |
539 } | |
540 | |
487 var result = events.removeFirst(); | 541 var result = events.removeFirst(); |
488 if (result.isError) { | 542 if (result.isError) { |
489 result.complete(_completer); | 543 result.complete(_completer); |
490 return true; | 544 return true; |
491 } | 545 } |
492 _list.add(result.asValue.value); | 546 _list.add(result.asValue.value); |
493 } | 547 } |
494 _completer.complete(_list); | 548 _completer.complete(_list); |
495 return true; | 549 return true; |
496 } | 550 } |
497 | |
498 void close(Queue<Result> events) { | |
499 _completer.complete(_list); | |
500 } | |
501 } | 551 } |
502 | 552 |
503 /// Request for a [StreamQueue.cancel] call. | 553 /// Request for a [StreamQueue.cancel] call. |
504 /// | 554 /// |
505 /// The request needs no events, it just waits in the request queue | 555 /// The request needs no events, it just waits in the request queue |
506 /// until all previous events are fulfilled, then it cancels the stream queue | 556 /// until all previous events are fulfilled, then it cancels the stream queue |
507 /// source subscription. | 557 /// source subscription. |
508 class _CancelRequest implements _EventRequest { | 558 class _CancelRequest implements _EventRequest { |
509 /// Completer for the future returned by the `cancel` call. | 559 /// Completer for the future returned by the `cancel` call. |
510 final Completer _completer = new Completer(); | 560 final Completer _completer = new Completer(); |
511 | 561 |
512 /// The [StreamQueue] object that has this request queued. | 562 /// The [StreamQueue] object that has this request queued. |
513 /// | 563 /// |
514 /// When the event is completed, it needs to cancel the active subscription | 564 /// When the event is completed, it needs to cancel the active subscription |
515 /// of the `StreamQueue` object, if any. | 565 /// of the `StreamQueue` object, if any. |
516 final StreamQueue _streamQueue; | 566 final StreamQueue _streamQueue; |
517 | 567 |
518 _CancelRequest(this._streamQueue); | 568 _CancelRequest(this._streamQueue); |
519 | 569 |
520 /// The future completed when the cancel request is completed. | 570 /// The future completed when the cancel request is completed. |
521 Future get future => _completer.future; | 571 Future get future => _completer.future; |
522 | 572 |
523 bool addEvents(Queue<Result> events) { | 573 bool update(Queue<Result> events, bool isDone) { |
524 _shutdown(); | 574 _completer.complete(_streamQueue._cancel()); |
525 return true; | 575 return true; |
526 } | 576 } |
527 | |
528 void close(_) { | |
529 _shutdown(); | |
530 } | |
531 | |
532 void _shutdown() { | |
533 if (_streamQueue._subscription == null) { | |
534 _completer.complete(); | |
535 } else { | |
536 _completer.complete(_streamQueue._dispose().cancel()); | |
537 } | |
538 } | |
539 } | 577 } |
540 | 578 |
541 /// Request for a [StreamQueue.rest] call. | 579 /// Request for a [StreamQueue.rest] call. |
542 /// | 580 /// |
543 /// The request is always complete, it just waits in the request queue | 581 /// The request is always complete, it just waits in the request queue |
544 /// until all previous events are fulfilled, then it takes over the | 582 /// until all previous events are fulfilled, then it takes over the |
545 /// stream events subscription and creates a stream from it. | 583 /// stream events subscription and creates a stream from it. |
546 class _RestRequest<T> implements _EventRequest { | 584 class _RestRequest<T> implements _EventRequest { |
547 /// Completer for the stream returned by the `rest` call. | 585 /// Completer for the stream returned by the `rest` call. |
548 final StreamCompleter _completer = new StreamCompleter<T>(); | 586 final StreamCompleter _completer = new StreamCompleter<T>(); |
549 | 587 |
550 /// The [StreamQueue] object that has this request queued. | 588 /// The [StreamQueue] object that has this request queued. |
551 /// | 589 /// |
552 /// When the event is completed, it needs to cancel the active subscription | 590 /// When the event is completed, it needs to cancel the active subscription |
553 /// of the `StreamQueue` object, if any. | 591 /// of the `StreamQueue` object, if any. |
554 final StreamQueue _streamQueue; | 592 final StreamQueue _streamQueue; |
555 | 593 |
556 _RestRequest(this._streamQueue); | 594 _RestRequest(this._streamQueue); |
557 | 595 |
558 /// The stream which will contain the remaining events of [_streamQueue]. | 596 /// The stream which will contain the remaining events of [_streamQueue]. |
559 Stream<T> get stream => _completer.stream; | 597 Stream<T> get stream => _completer.stream; |
560 | 598 |
561 bool addEvents(Queue<Result> events) { | 599 bool update(Queue<Result> events, bool isDone) { |
562 _completeStream(events); | |
563 return true; | |
564 } | |
565 | |
566 void close(Queue<Result> events) { | |
567 _completeStream(events); | |
568 } | |
569 | |
570 void _completeStream(Queue<Result> events) { | |
571 if (events.isEmpty) { | 600 if (events.isEmpty) { |
572 if (_streamQueue._isDone) { | 601 if (_streamQueue._isDone) { |
573 _completer.setEmpty(); | 602 _completer.setEmpty(); |
574 } else { | 603 } else { |
575 _completer.setSourceStream(_getRestStream()); | 604 _completer.setSourceStream(_streamQueue._extractStream()); |
576 } | 605 } |
577 } else { | 606 } else { |
578 // There are prefetched events which needs to be added before the | 607 // There are prefetched events which needs to be added before the |
579 // remaining stream. | 608 // remaining stream. |
580 var controller = new StreamController<T>(); | 609 var controller = new StreamController<T>(); |
581 for (var event in events) { | 610 for (var event in events) { |
582 event.addTo(controller); | 611 event.addTo(controller); |
583 } | 612 } |
584 controller.addStream(_getRestStream(), cancelOnError: false) | 613 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) |
585 .whenComplete(controller.close); | 614 .whenComplete(controller.close); |
586 _completer.setSourceStream(controller.stream); | 615 _completer.setSourceStream(controller.stream); |
587 } | 616 } |
588 } | 617 return true; |
589 | |
590 /// Create a stream from the rest of [_streamQueue]'s subscription. | |
591 Stream _getRestStream() { | |
592 if (_streamQueue._isDone) { | |
593 var controller = new StreamController<T>()..close(); | |
594 return controller.stream; | |
595 // TODO(lrn). Use the following when 1.11 is released. | |
596 // return new Stream<T>.empty(); | |
597 } | |
598 if (_streamQueue._subscription == null) { | |
599 return _streamQueue._sourceStream; | |
600 } | |
601 var subscription = _streamQueue._dispose(); | |
602 subscription.resume(); | |
603 return new SubscriptionStream<T>(subscription); | |
604 } | 618 } |
605 } | 619 } |
606 | 620 |
607 /// Request for a [StreamQueue.hasNext] call. | 621 /// Request for a [StreamQueue.hasNext] call. |
608 /// | 622 /// |
609 /// Completes the [future] with `true` if it sees any event, | 623 /// Completes the [future] with `true` if it sees any event, |
610 /// but doesn't consume the event. | 624 /// but doesn't consume the event. |
611 /// If the request is closed without seeing an event, then | 625 /// If the request is closed without seeing an event, then |
612 /// the [future] is completed with `false`. | 626 /// the [future] is completed with `false`. |
613 class _HasNextRequest<T> implements _EventRequest { | 627 class _HasNextRequest<T> implements _EventRequest { |
614 final Completer _completer = new Completer<bool>(); | 628 final Completer _completer = new Completer<bool>(); |
615 | 629 |
616 Future<bool> get future => _completer.future; | 630 Future<bool> get future => _completer.future; |
617 | 631 |
618 bool addEvents(Queue<Result> events) { | 632 bool update(Queue<Result> events, bool isDone) { |
619 if (events.isNotEmpty) { | 633 if (events.isNotEmpty) { |
620 _completer.complete(true); | 634 _completer.complete(true); |
621 return true; | 635 return true; |
622 } | 636 } |
637 if (isDone) { | |
638 _completer.complete(false); | |
639 return true; | |
640 } | |
623 return false; | 641 return false; |
624 } | 642 } |
625 | |
626 void close(_) { | |
627 _completer.complete(false); | |
628 } | |
629 } | 643 } |
OLD | NEW |