OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.stream_events; | 5 library async.stream_events; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 | 9 |
10 import "subscription_stream.dart"; | 10 import "subscription_stream.dart"; |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
53 /// var headerCount = | 53 /// var headerCount = |
54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); | 54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); |
55 /// handleMessage(headers: await events.take(headerCount), | 55 /// handleMessage(headers: await events.take(headerCount), |
56 /// body: events.rest); | 56 /// body: events.rest); |
57 /// return; | 57 /// return; |
58 /// } | 58 /// } |
59 /// // Error handling. | 59 /// // Error handling. |
60 /// | 60 /// |
61 /// When you need no further events the `StreamQueue` should be closed | 61 /// When you need no further events the `StreamQueue` should be closed |
62 /// using [cancel]. This releases the underlying stream subscription. | 62 /// using [cancel]. This releases the underlying stream subscription. |
63 class StreamQueue<T> { | 63 abstract class StreamQueue<T> { |
64 // This class maintains two queues: one of events and one of requests. | 64 // This class maintains two queues: one of events and one of requests. |
65 // The active request (the one in front of the queue) is called with | 65 // The active request (the one in front of the queue) is called with |
66 // the current event queue when it becomes active. | 66 // the current event queue when it becomes active, every time a |
| 67 // new event arrives, and when the event source closes. |
67 // | 68 // |
68 // If the request returns true, it's complete and will be removed from the | 69 // If the request returns `true`, it's complete and will be removed from the |
69 // request queue. | 70 // request queue. |
70 // If the request returns false, it needs more events, and will be called | 71 // If the request returns `false`, it needs more events, and will be called |
71 // again when new events are available. | 72 // again when new events are available. It may trigger a call itself by |
| 73 // calling [_updateRequests]. |
72 // The request can remove events that it uses, or keep them in the event | 74 // The request can remove events that it uses, or keep them in the event |
73 // queue until it has all that it needs. | 75 // queue until it has all that it needs. |
74 // | 76 // |
75 // This model is very flexible and easily extensible. | 77 // This model is very flexible and easily extensible. |
76 // It allows requests that don't consume events (like [hasNext]) or | 78 // It allows requests that don't consume events (like [hasNext]) or |
77 // potentially a request that takes either five or zero events, determined | 79 // potentially a request that takes either five or zero events, determined |
78 // by the content of the fifth event. | 80 // by the content of the fifth event. |
79 | 81 |
80 /// Source of events. | 82 /// Whether the event source is done. |
81 final Stream _sourceStream; | |
82 | |
83 /// Subscription on [_sourceStream] while listening for events. | |
84 /// | |
85 /// Set to subscription when listening, and set to `null` when the | |
86 /// subscription is done (and [_isDone] is set to true). | |
87 StreamSubscription _subscription; | |
88 | |
89 /// Whether we have listened on [_sourceStream] and the subscription is done. | |
90 bool _isDone = false; | 83 bool _isDone = false; |
91 | 84 |
92 /// Whether a closing operation has been performed on the stream queue. | 85 /// Whether a closing operation has been performed on the stream queue. |
93 /// | 86 /// |
94 /// Closing operations are [cancel] and [rest]. | 87 /// Closing operations are [cancel] and [rest]. |
95 bool _isClosed = false; | 88 bool _isClosed = false; |
96 | 89 |
97 /// Queue of events not used by a request yet. | 90 /// Queue of events not used by a request yet. |
98 final Queue<Result> _eventQueue = new Queue(); | 91 final Queue<Result> _eventQueue = new Queue(); |
99 | 92 |
100 /// Queue of pending requests. | 93 /// Queue of pending requests. |
101 /// | 94 /// |
102 /// Access through methods below to ensure consistency. | 95 /// Access through methods below to ensure consistency. |
103 final Queue<_EventRequest> _requestQueue = new Queue(); | 96 final Queue<_EventRequest> _requestQueue = new Queue(); |
104 | 97 |
105 /// Create a `StreamQueue` of the events of [source]. | 98 /// Create a `StreamQueue` of the events of [source]. |
106 StreamQueue(Stream source) | 99 factory StreamQueue(Stream source) = _StreamQueue<T>; |
107 : _sourceStream = source; | 100 |
| 101 StreamQueue._(); |
108 | 102 |
109 /// Asks if the stream has any more events. | 103 /// Asks if the stream has any more events. |
110 /// | 104 /// |
111 /// Returns a future that completes with `true` if the stream has any | 105 /// Returns a future that completes with `true` if the stream has any |
112 /// more events, whether data or error. | 106 /// more events, whether data or error. |
113 /// If the stream closes without producing any more events, the returned | 107 /// If the stream closes without producing any more events, the returned |
114 /// future completes with `false`. | 108 /// future completes with `false`. |
115 /// | 109 /// |
116 /// Can be used before using [next] to avoid getting an error in the | 110 /// Can be used before using [next] to avoid getting an error in the |
117 /// future returned by `next` in the case where there are no more events. | 111 /// future returned by `next` in the case where there are no more events. |
| 112 /// Another alternative is to use `take(1)` which returns either zero or |
| 113 /// one events. |
118 Future<bool> get hasNext { | 114 Future<bool> get hasNext { |
119 if (!_isClosed) { | 115 if (!_isClosed) { |
120 var hasNextRequest = new _HasNextRequest(); | 116 var hasNextRequest = new _HasNextRequest(); |
121 _addRequest(hasNextRequest); | 117 _addRequest(hasNextRequest); |
122 return hasNextRequest.future; | 118 return hasNextRequest.future; |
123 } | 119 } |
124 throw _failClosed(); | 120 throw _failClosed(); |
125 } | 121 } |
126 | 122 |
127 /// Requests the next (yet unrequested) event from the stream. | 123 /// Requests the next (yet unrequested) event from the stream. |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
209 Future<List<T>> take(int count) { | 205 Future<List<T>> take(int count) { |
210 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 206 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
211 if (!_isClosed) { | 207 if (!_isClosed) { |
212 var request = new _TakeRequest<T>(count); | 208 var request = new _TakeRequest<T>(count); |
213 _addRequest(request); | 209 _addRequest(request); |
214 return request.future; | 210 return request.future; |
215 } | 211 } |
216 throw _failClosed(); | 212 throw _failClosed(); |
217 } | 213 } |
218 | 214 |
219 /// Cancels the underlying stream subscription. | 215 /// Cancels the underlying event source. |
220 /// | 216 /// |
221 /// If [immediate] is `false` (the default), the cancel operation waits until | 217 /// If [immediate] is `false` (the default), the cancel operation waits until |
222 /// all previously requested events have been processed, then it cancels the | 218 /// all previously requested events have been processed, then it cancels the |
223 /// subscription providing the events. | 219 /// subscription providing the events. |
224 /// | 220 /// |
225 /// If [immediate] is `true`, the subscription is instead canceled | 221 /// If [immediate] is `true`, the source is instead canceled |
226 /// immediately. Any pending events complete with a 'closed'-event, as though | 222 /// immediately. Any pending events are completed as though the underlying |
227 /// the stream had closed by itself. | 223 /// stream had closed. |
228 /// | 224 /// |
229 /// The returned future completes with the result of calling | 225 /// The returned future completes with the result of calling |
230 /// `cancel`. | 226 /// `cancel`. |
231 /// | 227 /// |
232 /// After calling `cancel`, no further events can be requested. | 228 /// After calling `cancel`, no further events can be requested. |
233 /// None of [next], [rest], [skip], [take] or [cancel] may be | 229 /// None of [next], [rest], [skip], [take] or [cancel] may be |
234 /// called again. | 230 /// called again. |
235 Future cancel({bool immediate: false}) { | 231 Future cancel({bool immediate: false}) { |
236 if (_isClosed) throw _failClosed(); | 232 if (_isClosed) throw _failClosed(); |
237 _isClosed = true; | 233 _isClosed = true; |
238 | 234 |
239 if (!immediate) { | 235 if (!immediate) { |
240 var request = new _CancelRequest(this); | 236 var request = new _CancelRequest(this); |
241 _addRequest(request); | 237 _addRequest(request); |
242 return request.future; | 238 return request.future; |
243 } | 239 } |
244 | 240 |
245 if (_isDone) return new Future.value(); | 241 if (_isDone && _eventQueue.isEmpty) return new Future.value(); |
246 if (_subscription == null) _subscription = _sourceStream.listen(null); | 242 return _cancel(); |
247 var future = _subscription.cancel(); | |
248 _onDone(); | |
249 return future; | |
250 } | 243 } |
251 | 244 |
| 245 // ------------------------------------------------------------------ |
| 246 // Methods that may be called from the request implementations to |
| 247 // control the even stream. |
| 248 |
| 249 /// Matches events with requests. |
| 250 /// |
| 251 /// Called after receiving an event or when the event source closes. |
| 252 /// |
| 253 /// May be called by requests which have returned `false` (saying they |
| 254 /// are not yet done) so they can be checked again before any new |
| 255 /// events arrive. |
| 256 /// Any request returing `false` from `update` when `isDone` is `true` |
| 257 /// *must* call `_updateRequests` when they are ready to continue |
| 258 /// (since no further events will trigger the call). |
| 259 void _updateRequests() { |
| 260 while (_requestQueue.isNotEmpty) { |
| 261 if (_requestQueue.first.update(_eventQueue, _isDone)) { |
| 262 _requestQueue.removeFirst(); |
| 263 } else { |
| 264 return; |
| 265 } |
| 266 } |
| 267 |
| 268 if (!_isDone) { |
| 269 _pause(); |
| 270 } |
| 271 } |
| 272 |
| 273 /// Extracts a stream from the event source and makes this stream queue |
| 274 /// unusable. |
| 275 /// |
| 276 /// Can only be used by the very last request (the stream queue must |
| 277 /// be closed by that request). |
| 278 /// Only used by [rest]. |
| 279 Stream _extractStream(); |
| 280 |
| 281 /// Requests that the event source pauses events. |
| 282 /// |
| 283 /// This is called automatically when the request queue is empty. |
| 284 /// |
| 285 /// The event source is restarted by the next call to [_ensureListening]. |
| 286 void _pause(); |
| 287 |
| 288 /// Ensures that we are listening on events from the event source. |
| 289 /// |
| 290 /// Starts listening for the first time or resumes after a [_pause]. |
| 291 /// |
| 292 /// Is called automatically if a request requires more events. |
| 293 void _ensureListening(); |
| 294 |
| 295 /// Cancels the underlying event source. |
| 296 Future _cancel(); |
| 297 |
| 298 // ------------------------------------------------------------------ |
| 299 // Methods called by the event source to add events or say that it's |
| 300 // done. |
| 301 |
| 302 /// Called when the event source adds a new data or error event. |
| 303 /// Always calls [_updateRequests] after adding. |
| 304 void _addResult(Result result) { |
| 305 _eventQueue.add(result); |
| 306 _updateRequests(); |
| 307 } |
| 308 |
| 309 /// Called when the event source is done. |
| 310 /// Always calls [_updateRequests] after adding. |
| 311 void _close() { |
| 312 _isDone = true; |
| 313 _updateRequests(); |
| 314 } |
| 315 |
| 316 // ------------------------------------------------------------------ |
| 317 // Internal helper methods. |
| 318 |
252 /// Returns an error for when a request is made after cancel. | 319 /// Returns an error for when a request is made after cancel. |
253 /// | 320 /// |
254 /// Returns a [StateError] with a message saying that either | 321 /// Returns a [StateError] with a message saying that either |
255 /// [cancel] or [rest] have already been called. | 322 /// [cancel] or [rest] have already been called. |
256 Error _failClosed() { | 323 Error _failClosed() { |
257 return new StateError("Already cancelled"); | 324 return new StateError("Already cancelled"); |
258 } | 325 } |
259 | 326 |
260 // Callbacks receiving the events of the source stream. | |
261 | |
262 void _onData(T data) { | |
263 _eventQueue.add(new Result.value(data)); | |
264 _checkQueues(); | |
265 } | |
266 | |
267 void _onError(error, StackTrace stack) { | |
268 _eventQueue.add(new Result.error(error, stack)); | |
269 _checkQueues(); | |
270 } | |
271 | |
272 void _onDone() { | |
273 _subscription = null; | |
274 _isDone = true; | |
275 _closeAllRequests(); | |
276 } | |
277 | |
278 // Request queue management. | |
279 | |
280 /// Adds a new request to the queue. | 327 /// Adds a new request to the queue. |
| 328 /// |
| 329 /// If the request queue is empty and the request can be completed |
| 330 /// immediately, it skips the queue. |
281 void _addRequest(_EventRequest request) { | 331 void _addRequest(_EventRequest request) { |
282 if (_isDone) { | |
283 assert(_requestQueue.isEmpty); | |
284 if (!request.addEvents(_eventQueue)) { | |
285 request.close(_eventQueue); | |
286 } | |
287 return; | |
288 } | |
289 if (_requestQueue.isEmpty) { | 332 if (_requestQueue.isEmpty) { |
290 if (request.addEvents(_eventQueue)) return; | 333 if (request.update(_eventQueue, _isDone)) return; |
291 _ensureListening(); | 334 _ensureListening(); |
292 } | 335 } |
293 _requestQueue.add(request); | 336 _requestQueue.add(request); |
294 } | 337 } |
| 338 } |
295 | 339 |
296 /// Ensures that we are listening on events from [_sourceStream]. | 340 |
| 341 /// The default implementation of [StreamQueue]. |
| 342 /// |
| 343 /// This queue gets its events from a stream which is listened |
| 344 /// to when a request needs events. |
| 345 class _StreamQueue<T> extends StreamQueue<T> { |
| 346 /// Source of events. |
| 347 final Stream _sourceStream; |
| 348 |
| 349 /// Subscription on [_sourceStream] while listening for events. |
297 /// | 350 /// |
298 /// Resumes subscription on [_sourceStream], or creates it if necessary. | 351 /// Set to subscription when listening, and set to `null` when the |
| 352 /// subscription is done (and [_isDone] is set to true). |
| 353 StreamSubscription _subscription; |
| 354 |
| 355 _StreamQueue(this._sourceStream) : super._(); |
| 356 |
| 357 Future _cancel() { |
| 358 if (_isDone) return null; |
| 359 if (_subscription == null) _subscription = _sourceStream.listen(null); |
| 360 var future = _subscription.cancel(); |
| 361 _close(); |
| 362 return future; |
| 363 } |
| 364 |
299 void _ensureListening() { | 365 void _ensureListening() { |
300 assert(!_isDone); | 366 assert(!_isDone); |
301 if (_subscription == null) { | 367 if (_subscription == null) { |
302 _subscription = | 368 _subscription = |
303 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | 369 _sourceStream.listen( |
| 370 (data) { |
| 371 _addResult(new Result.value(data)); |
| 372 }, |
| 373 onError: (error, StackTrace stackTrace) { |
| 374 _addResult(new Result.error(error, stackTrace)); |
| 375 }, |
| 376 onDone: () { |
| 377 _subscription = null; |
| 378 this._close(); |
| 379 }); |
304 } else { | 380 } else { |
305 _subscription.resume(); | 381 _subscription.resume(); |
306 } | 382 } |
307 } | 383 } |
308 | 384 |
309 /// Removes all requests and closes them. | 385 void _pause() { |
310 /// | 386 _subscription.pause(); |
311 /// Used when the source stream is done. | |
312 /// After this, no further requests will be added to the queue, | |
313 /// requests are immediately served entirely by events already in the event | |
314 /// queue, if any. | |
315 void _closeAllRequests() { | |
316 assert(_isDone); | |
317 while (_requestQueue.isNotEmpty) { | |
318 var request = _requestQueue.removeFirst(); | |
319 if (!request.addEvents(_eventQueue)) { | |
320 request.close(_eventQueue); | |
321 } | |
322 } | |
323 } | 387 } |
324 | 388 |
325 /// Matches events with requests. | 389 Stream<T> _extractStream() { |
326 /// | 390 assert(_isClosed); |
327 /// Called after receiving an event. | 391 if (_isDone) { |
328 void _checkQueues() { | 392 return new Stream<T>.empty(); |
329 while (_requestQueue.isNotEmpty) { | |
330 if (_requestQueue.first.addEvents(_eventQueue)) { | |
331 _requestQueue.removeFirst(); | |
332 } else { | |
333 return; | |
334 } | |
335 } | 393 } |
336 if (!_isDone) { | 394 |
337 _subscription.pause(); | 395 if (_subscription == null) { |
| 396 return _sourceStream; |
338 } | 397 } |
339 } | |
340 | 398 |
341 /// Extracts the subscription and makes this stream queue unusable. | |
342 /// | |
343 /// Can only be used by the very last request. | |
344 StreamSubscription _dispose() { | |
345 assert(_isClosed); | |
346 var subscription = _subscription; | 399 var subscription = _subscription; |
347 _subscription = null; | 400 _subscription = null; |
348 _isDone = true; | 401 _isDone = true; |
349 return subscription; | 402 |
| 403 var wasPaused = subscription.isPaused; |
| 404 var result = new SubscriptionStream<T>(subscription); |
| 405 // Resume after creating stream because that pauses the subscription too. |
| 406 // This way there won't be a short resumption in the middle. |
| 407 if (wasPaused) subscription.resume(); |
| 408 return result; |
350 } | 409 } |
351 } | 410 } |
352 | 411 |
| 412 |
353 /// Request object that receives events when they arrive, until fulfilled. | 413 /// Request object that receives events when they arrive, until fulfilled. |
354 /// | 414 /// |
355 /// Each request that cannot be fulfilled immediately is represented by | 415 /// Each request that cannot be fulfilled immediately is represented by |
356 /// an `_EventRequest` object in the request queue. | 416 /// an `_EventRequest` object in the request queue. |
357 /// | 417 /// |
358 /// Events from the source stream are sent to the first request in the | 418 /// Events from the source stream are sent to the first request in the |
359 /// queue until it reports itself as [isComplete]. | 419 /// queue until it reports itself as [isComplete]. |
360 /// | 420 /// |
361 /// When the first request in the queue `isComplete`, either when becoming | 421 /// When the first request in the queue `isComplete`, either when becoming |
362 /// the first request or after receiving an event, its [close] methods is | 422 /// the first request or after receiving an event, its [close] methods is |
363 /// called. | 423 /// called. |
364 /// | 424 /// |
365 /// The [close] method is also called immediately when the source stream | 425 /// The [close] method is also called immediately when the source stream |
366 /// is done. | 426 /// is done. |
367 abstract class _EventRequest { | 427 abstract class _EventRequest { |
368 /// Handle available events. | 428 /// Handle available events. |
369 /// | 429 /// |
370 /// The available events are provided as a queue. The `addEvents` function | 430 /// The available events are provided as a queue. The `update` function |
371 /// should only remove events from the front of the event queue, e.g., | 431 /// should only remove events from the front of the event queue, e.g., |
372 /// using [removeFirst]. | 432 /// using [removeFirst]. |
373 /// | 433 /// |
374 /// Returns `true` if the request is completed, or `false` if it needs | 434 /// Returns `true` if the request is completed, or `false` if it needs |
375 /// more events. | 435 /// more events. |
376 /// The call may keep events in the queue until the requeust is complete, | 436 /// The call may keep events in the queue until the requeust is complete, |
377 /// or it may remove them immediately. | 437 /// or it may remove them immediately. |
378 /// | 438 /// |
379 /// If the method returns true, the request is considered fulfilled, and | 439 /// If the method returns true, the request is considered fulfilled, and |
380 /// will never be called again. | 440 /// will never be called again. |
381 /// | 441 /// |
382 /// This method is called when a request reaches the front of the request | 442 /// This method is called when a request reaches the front of the request |
383 /// queue, and if it returns `false`, it's called again every time a new event | 443 /// queue, and if it returns `false`, it's called again every time a new event |
384 /// becomes available, or when the stream closes. | 444 /// becomes available, or when the stream closes. |
385 bool addEvents(Queue<Result> events); | 445 /// If the function returns `false` when the stream has already closed |
386 | 446 /// ([isDone] is true), then the request must call |
387 /// Complete the request. | 447 /// [StreamQueue._updateRequests] itself when it's ready to continue. |
388 /// | 448 bool update(Queue<Result> events, bool isDone); |
389 /// This is called when the source stream is done before the request | |
390 /// had a chance to receive all its events. That is, after a call | |
391 /// to [addEvents] has returned `false`. | |
392 /// If there are any unused events available, they are in the [events] queue. | |
393 /// No further events will become available. | |
394 /// | |
395 /// The queue should only remove events from the front of the event queue, | |
396 /// e.g., using [removeFirst]. | |
397 /// | |
398 /// If the request kept events in the queue after an [addEvents] call, | |
399 /// this is the last chance to use them. | |
400 void close(Queue<Result> events); | |
401 } | 449 } |
402 | 450 |
403 /// Request for a [StreamQueue.next] call. | 451 /// Request for a [StreamQueue.next] call. |
404 /// | 452 /// |
405 /// Completes the returned future when receiving the first event, | 453 /// Completes the returned future when receiving the first event, |
406 /// and is then complete. | 454 /// and is then complete. |
407 class _NextRequest<T> implements _EventRequest { | 455 class _NextRequest<T> implements _EventRequest { |
408 /// Completer for the future returned by [StreamQueue.next]. | 456 /// Completer for the future returned by [StreamQueue.next]. |
409 final Completer _completer; | 457 final Completer _completer; |
410 | 458 |
411 _NextRequest() : _completer = new Completer<T>(); | 459 _NextRequest() : _completer = new Completer<T>(); |
412 | 460 |
413 Future<T> get future => _completer.future; | 461 Future<T> get future => _completer.future; |
414 | 462 |
415 bool addEvents(Queue<Result> events) { | 463 bool update(Queue<Result> events, bool isDone) { |
416 if (events.isEmpty) return false; | 464 if (events.isNotEmpty) { |
417 events.removeFirst().complete(_completer); | 465 events.removeFirst().complete(_completer); |
418 return true; | 466 return true; |
419 } | 467 } |
420 | 468 if (isDone) { |
421 void close(Queue<Result> events) { | 469 var errorFuture = |
422 var errorFuture = | 470 new Future.sync(() => throw new StateError("No elements")); |
423 new Future.sync(() => throw new StateError("No elements")); | 471 _completer.complete(errorFuture); |
424 _completer.complete(errorFuture); | 472 return true; |
| 473 } |
| 474 return false; |
425 } | 475 } |
426 } | 476 } |
427 | 477 |
428 /// Request for a [StreamQueue.skip] call. | 478 /// Request for a [StreamQueue.skip] call. |
429 class _SkipRequest implements _EventRequest { | 479 class _SkipRequest implements _EventRequest { |
430 /// Completer for the future returned by the skip call. | 480 /// Completer for the future returned by the skip call. |
431 final Completer _completer = new Completer<int>(); | 481 final Completer _completer = new Completer<int>(); |
432 | 482 |
433 /// Number of remaining events to skip. | 483 /// Number of remaining events to skip. |
434 /// | 484 /// |
435 /// The request [isComplete] when the values reaches zero. | 485 /// The request [isComplete] when the values reaches zero. |
436 /// | 486 /// |
437 /// Decremented when an event is seen. | 487 /// Decremented when an event is seen. |
438 /// Set to zero when an error is seen since errors abort the skip request. | 488 /// Set to zero when an error is seen since errors abort the skip request. |
439 int _eventsToSkip; | 489 int _eventsToSkip; |
440 | 490 |
441 _SkipRequest(this._eventsToSkip); | 491 _SkipRequest(this._eventsToSkip); |
442 | 492 |
443 /// The future completed when the correct number of events have been skipped. | 493 /// The future completed when the correct number of events have been skipped. |
444 Future get future => _completer.future; | 494 Future get future => _completer.future; |
445 | 495 |
446 bool addEvents(Queue<Result> events) { | 496 bool update(Queue<Result> events, bool isDone) { |
447 while (_eventsToSkip > 0) { | 497 while (_eventsToSkip > 0) { |
448 if (events.isEmpty) return false; | 498 if (events.isEmpty) { |
| 499 if (isDone) break; |
| 500 return false; |
| 501 } |
449 _eventsToSkip--; | 502 _eventsToSkip--; |
| 503 |
450 var event = events.removeFirst(); | 504 var event = events.removeFirst(); |
451 if (event.isError) { | 505 if (event.isError) { |
452 event.complete(_completer); | 506 event.complete(_completer); |
453 return true; | 507 return true; |
454 } | 508 } |
455 } | 509 } |
456 _completer.complete(0); | 510 _completer.complete(_eventsToSkip); |
457 return true; | 511 return true; |
458 } | 512 } |
459 | |
460 void close(Queue<Result> events) { | |
461 _completer.complete(_eventsToSkip); | |
462 } | |
463 } | 513 } |
464 | 514 |
465 /// Request for a [StreamQueue.take] call. | 515 /// Request for a [StreamQueue.take] call. |
466 class _TakeRequest<T> implements _EventRequest { | 516 class _TakeRequest<T> implements _EventRequest { |
467 /// Completer for the future returned by the take call. | 517 /// Completer for the future returned by the take call. |
468 final Completer _completer; | 518 final Completer _completer; |
469 | 519 |
470 /// List collecting events until enough have been seen. | 520 /// List collecting events until enough have been seen. |
471 final List _list = <T>[]; | 521 final List _list = <T>[]; |
472 | 522 |
473 /// Number of events to capture. | 523 /// Number of events to capture. |
474 /// | 524 /// |
475 /// The request [isComplete] when the length of [_list] reaches | 525 /// The request [isComplete] when the length of [_list] reaches |
476 /// this value. | 526 /// this value. |
477 final int _eventsToTake; | 527 final int _eventsToTake; |
478 | 528 |
479 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | 529 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
480 | 530 |
481 /// The future completed when the correct number of events have been captured. | 531 /// The future completed when the correct number of events have been captured. |
482 Future get future => _completer.future; | 532 Future get future => _completer.future; |
483 | 533 |
484 bool addEvents(Queue<Result> events) { | 534 bool update(Queue<Result> events, bool isDone) { |
485 while (_list.length < _eventsToTake) { | 535 while (_list.length < _eventsToTake) { |
486 if (events.isEmpty) return false; | 536 if (events.isEmpty) { |
| 537 if (isDone) break; |
| 538 return false; |
| 539 } |
| 540 |
487 var result = events.removeFirst(); | 541 var result = events.removeFirst(); |
488 if (result.isError) { | 542 if (result.isError) { |
489 result.complete(_completer); | 543 result.complete(_completer); |
490 return true; | 544 return true; |
491 } | 545 } |
492 _list.add(result.asValue.value); | 546 _list.add(result.asValue.value); |
493 } | 547 } |
494 _completer.complete(_list); | 548 _completer.complete(_list); |
495 return true; | 549 return true; |
496 } | 550 } |
497 | |
498 void close(Queue<Result> events) { | |
499 _completer.complete(_list); | |
500 } | |
501 } | 551 } |
502 | 552 |
503 /// Request for a [StreamQueue.cancel] call. | 553 /// Request for a [StreamQueue.cancel] call. |
504 /// | 554 /// |
505 /// The request needs no events, it just waits in the request queue | 555 /// The request needs no events, it just waits in the request queue |
506 /// until all previous events are fulfilled, then it cancels the stream queue | 556 /// until all previous events are fulfilled, then it cancels the stream queue |
507 /// source subscription. | 557 /// source subscription. |
508 class _CancelRequest implements _EventRequest { | 558 class _CancelRequest implements _EventRequest { |
509 /// Completer for the future returned by the `cancel` call. | 559 /// Completer for the future returned by the `cancel` call. |
510 final Completer _completer = new Completer(); | 560 final Completer _completer = new Completer(); |
511 | 561 |
512 /// The [StreamQueue] object that has this request queued. | 562 /// The [StreamQueue] object that has this request queued. |
513 /// | 563 /// |
514 /// When the event is completed, it needs to cancel the active subscription | 564 /// When the event is completed, it needs to cancel the active subscription |
515 /// of the `StreamQueue` object, if any. | 565 /// of the `StreamQueue` object, if any. |
516 final StreamQueue _streamQueue; | 566 final StreamQueue _streamQueue; |
517 | 567 |
518 _CancelRequest(this._streamQueue); | 568 _CancelRequest(this._streamQueue); |
519 | 569 |
520 /// The future completed when the cancel request is completed. | 570 /// The future completed when the cancel request is completed. |
521 Future get future => _completer.future; | 571 Future get future => _completer.future; |
522 | 572 |
523 bool addEvents(Queue<Result> events) { | 573 bool update(Queue<Result> events, bool isDone) { |
524 _shutdown(); | |
525 return true; | |
526 } | |
527 | |
528 void close(_) { | |
529 _shutdown(); | |
530 } | |
531 | |
532 void _shutdown() { | |
533 if (_streamQueue._isDone) { | 574 if (_streamQueue._isDone) { |
534 _completer.complete(); | 575 _completer.complete(); |
535 } else { | 576 } else { |
536 _streamQueue._ensureListening(); | 577 _streamQueue._ensureListening(); |
537 _completer.complete(_streamQueue._dispose().cancel()); | 578 _completer.complete(_streamQueue._extractStream().listen(null).cancel()); |
538 } | 579 } |
| 580 return true; |
539 } | 581 } |
540 } | 582 } |
541 | 583 |
542 /// Request for a [StreamQueue.rest] call. | 584 /// Request for a [StreamQueue.rest] call. |
543 /// | 585 /// |
544 /// The request is always complete, it just waits in the request queue | 586 /// The request is always complete, it just waits in the request queue |
545 /// until all previous events are fulfilled, then it takes over the | 587 /// until all previous events are fulfilled, then it takes over the |
546 /// stream events subscription and creates a stream from it. | 588 /// stream events subscription and creates a stream from it. |
547 class _RestRequest<T> implements _EventRequest { | 589 class _RestRequest<T> implements _EventRequest { |
548 /// Completer for the stream returned by the `rest` call. | 590 /// Completer for the stream returned by the `rest` call. |
549 final StreamCompleter _completer = new StreamCompleter<T>(); | 591 final StreamCompleter _completer = new StreamCompleter<T>(); |
550 | 592 |
551 /// The [StreamQueue] object that has this request queued. | 593 /// The [StreamQueue] object that has this request queued. |
552 /// | 594 /// |
553 /// When the event is completed, it needs to cancel the active subscription | 595 /// When the event is completed, it needs to cancel the active subscription |
554 /// of the `StreamQueue` object, if any. | 596 /// of the `StreamQueue` object, if any. |
555 final StreamQueue _streamQueue; | 597 final StreamQueue _streamQueue; |
556 | 598 |
557 _RestRequest(this._streamQueue); | 599 _RestRequest(this._streamQueue); |
558 | 600 |
559 /// The stream which will contain the remaining events of [_streamQueue]. | 601 /// The stream which will contain the remaining events of [_streamQueue]. |
560 Stream<T> get stream => _completer.stream; | 602 Stream<T> get stream => _completer.stream; |
561 | 603 |
562 bool addEvents(Queue<Result> events) { | 604 bool update(Queue<Result> events, bool isDone) { |
563 _completeStream(events); | |
564 return true; | |
565 } | |
566 | |
567 void close(Queue<Result> events) { | |
568 _completeStream(events); | |
569 } | |
570 | |
571 void _completeStream(Queue<Result> events) { | |
572 if (events.isEmpty) { | 605 if (events.isEmpty) { |
573 if (_streamQueue._isDone) { | 606 if (_streamQueue._isDone) { |
574 _completer.setEmpty(); | 607 _completer.setEmpty(); |
575 } else { | 608 } else { |
576 _completer.setSourceStream(_getRestStream()); | 609 _completer.setSourceStream(_streamQueue._extractStream()); |
577 } | 610 } |
578 } else { | 611 } else { |
579 // There are prefetched events which needs to be added before the | 612 // There are prefetched events which needs to be added before the |
580 // remaining stream. | 613 // remaining stream. |
581 var controller = new StreamController<T>(); | 614 var controller = new StreamController<T>(); |
582 for (var event in events) { | 615 for (var event in events) { |
583 event.addTo(controller); | 616 event.addTo(controller); |
584 } | 617 } |
585 controller.addStream(_getRestStream(), cancelOnError: false) | 618 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) |
586 .whenComplete(controller.close); | 619 .whenComplete(controller.close); |
587 _completer.setSourceStream(controller.stream); | 620 _completer.setSourceStream(controller.stream); |
588 } | 621 } |
589 } | 622 return true; |
590 | |
591 /// Create a stream from the rest of [_streamQueue]'s subscription. | |
592 Stream _getRestStream() { | |
593 if (_streamQueue._isDone) { | |
594 var controller = new StreamController<T>()..close(); | |
595 return controller.stream; | |
596 // TODO(lrn). Use the following when 1.11 is released. | |
597 // return new Stream<T>.empty(); | |
598 } | |
599 if (_streamQueue._subscription == null) { | |
600 return _streamQueue._sourceStream; | |
601 } | |
602 var subscription = _streamQueue._dispose(); | |
603 subscription.resume(); | |
604 return new SubscriptionStream<T>(subscription); | |
605 } | 623 } |
606 } | 624 } |
607 | 625 |
608 /// Request for a [StreamQueue.hasNext] call. | 626 /// Request for a [StreamQueue.hasNext] call. |
609 /// | 627 /// |
610 /// Completes the [future] with `true` if it sees any event, | 628 /// Completes the [future] with `true` if it sees any event, |
611 /// but doesn't consume the event. | 629 /// but doesn't consume the event. |
612 /// If the request is closed without seeing an event, then | 630 /// If the request is closed without seeing an event, then |
613 /// the [future] is completed with `false`. | 631 /// the [future] is completed with `false`. |
614 class _HasNextRequest<T> implements _EventRequest { | 632 class _HasNextRequest<T> implements _EventRequest { |
615 final Completer _completer = new Completer<bool>(); | 633 final Completer _completer = new Completer<bool>(); |
616 | 634 |
617 Future<bool> get future => _completer.future; | 635 Future<bool> get future => _completer.future; |
618 | 636 |
619 bool addEvents(Queue<Result> events) { | 637 bool update(Queue<Result> events, bool isDone) { |
620 if (events.isNotEmpty) { | 638 if (events.isNotEmpty) { |
621 _completer.complete(true); | 639 _completer.complete(true); |
622 return true; | 640 return true; |
623 } | 641 } |
| 642 if (isDone) { |
| 643 _completer.complete(false); |
| 644 return true; |
| 645 } |
624 return false; | 646 return false; |
625 } | 647 } |
626 | |
627 void close(_) { | |
628 _completer.complete(false); | |
629 } | |
630 } | 648 } |
OLD | NEW |