| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library async.stream_events; | 5 library async.stream_events; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection'; | 8 import 'dart:collection'; |
| 9 | 9 |
| 10 import "subscription_stream.dart"; | 10 import "subscription_stream.dart"; |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 53 /// var headerCount = | 53 /// var headerCount = |
| 54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); | 54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); |
| 55 /// handleMessage(headers: await events.take(headerCount), | 55 /// handleMessage(headers: await events.take(headerCount), |
| 56 /// body: events.rest); | 56 /// body: events.rest); |
| 57 /// return; | 57 /// return; |
| 58 /// } | 58 /// } |
| 59 /// // Error handling. | 59 /// // Error handling. |
| 60 /// | 60 /// |
| 61 /// When you need no further events the `StreamQueue` should be closed | 61 /// When you need no further events the `StreamQueue` should be closed |
| 62 /// using [cancel]. This releases the underlying stream subscription. | 62 /// using [cancel]. This releases the underlying stream subscription. |
| 63 class StreamQueue<T> { | 63 abstract class StreamQueue<T> { |
| 64 // This class maintains two queues: one of events and one of requests. | 64 // This class maintains two queues: one of events and one of requests. |
| 65 // The active request (the one in front of the queue) is called with | 65 // The active request (the one in front of the queue) is called with |
| 66 // the current event queue when it becomes active. | 66 // the current event queue when it becomes active, every time a |
| 67 // new event arrives, and when the event source closes. |
| 67 // | 68 // |
| 68 // If the request returns true, it's complete and will be removed from the | 69 // If the request returns `true`, it's complete and will be removed from the |
| 69 // request queue. | 70 // request queue. |
| 70 // If the request returns false, it needs more events, and will be called | 71 // If the request returns `false`, it needs more events, and will be called |
| 71 // again when new events are available. | 72 // again when new events are available. It may trigger a call itself by |
| 73 // calling [_updateRequests]. |
| 72 // The request can remove events that it uses, or keep them in the event | 74 // The request can remove events that it uses, or keep them in the event |
| 73 // queue until it has all that it needs. | 75 // queue until it has all that it needs. |
| 74 // | 76 // |
| 75 // This model is very flexible and easily extensible. | 77 // This model is very flexible and easily extensible. |
| 76 // It allows requests that don't consume events (like [hasNext]) or | 78 // It allows requests that don't consume events (like [hasNext]) or |
| 77 // potentially a request that takes either five or zero events, determined | 79 // potentially a request that takes either five or zero events, determined |
| 78 // by the content of the fifth event. | 80 // by the content of the fifth event. |
| 79 | 81 |
| 80 /// Source of events. | 82 /// Whether the event source is done. |
| 81 final Stream _sourceStream; | |
| 82 | |
| 83 /// Subscription on [_sourceStream] while listening for events. | |
| 84 /// | |
| 85 /// Set to subscription when listening, and set to `null` when the | |
| 86 /// subscription is done (and [_isDone] is set to true). | |
| 87 StreamSubscription _subscription; | |
| 88 | |
| 89 /// Whether we have listened on [_sourceStream] and the subscription is done. | |
| 90 bool _isDone = false; | 83 bool _isDone = false; |
| 91 | 84 |
| 92 /// Whether a closing operation has been performed on the stream queue. | 85 /// Whether a closing operation has been performed on the stream queue. |
| 93 /// | 86 /// |
| 94 /// Closing operations are [cancel] and [rest]. | 87 /// Closing operations are [cancel] and [rest]. |
| 95 bool _isClosed = false; | 88 bool _isClosed = false; |
| 96 | 89 |
| 97 /// Queue of events not used by a request yet. | 90 /// Queue of events not used by a request yet. |
| 98 final Queue<Result> _eventQueue = new Queue(); | 91 final Queue<Result> _eventQueue = new Queue(); |
| 99 | 92 |
| 100 /// Queue of pending requests. | 93 /// Queue of pending requests. |
| 101 /// | 94 /// |
| 102 /// Access through methods below to ensure consistency. | 95 /// Access through methods below to ensure consistency. |
| 103 final Queue<_EventRequest> _requestQueue = new Queue(); | 96 final Queue<_EventRequest> _requestQueue = new Queue(); |
| 104 | 97 |
| 105 /// Create a `StreamQueue` of the events of [source]. | 98 /// Create a `StreamQueue` of the events of [source]. |
| 106 StreamQueue(Stream source) | 99 factory StreamQueue(Stream source) = _StreamQueue<T>; |
| 107 : _sourceStream = source; | 100 |
| 101 StreamQueue._(); |
| 108 | 102 |
| 109 /// Asks if the stream has any more events. | 103 /// Asks if the stream has any more events. |
| 110 /// | 104 /// |
| 111 /// Returns a future that completes with `true` if the stream has any | 105 /// Returns a future that completes with `true` if the stream has any |
| 112 /// more events, whether data or error. | 106 /// more events, whether data or error. |
| 113 /// If the stream closes without producing any more events, the returned | 107 /// If the stream closes without producing any more events, the returned |
| 114 /// future completes with `false`. | 108 /// future completes with `false`. |
| 115 /// | 109 /// |
| 116 /// Can be used before using [next] to avoid getting an error in the | 110 /// Can be used before using [next] to avoid getting an error in the |
| 117 /// future returned by `next` in the case where there are no more events. | 111 /// future returned by `next` in the case where there are no more events. |
| 112 /// Another alternative is to use `take(1)` which returns either zero or |
| 113 /// one events. |
| 118 Future<bool> get hasNext { | 114 Future<bool> get hasNext { |
| 119 if (!_isClosed) { | 115 if (!_isClosed) { |
| 120 var hasNextRequest = new _HasNextRequest(); | 116 var hasNextRequest = new _HasNextRequest(); |
| 121 _addRequest(hasNextRequest); | 117 _addRequest(hasNextRequest); |
| 122 return hasNextRequest.future; | 118 return hasNextRequest.future; |
| 123 } | 119 } |
| 124 throw _failClosed(); | 120 throw _failClosed(); |
| 125 } | 121 } |
| 126 | 122 |
| 127 /// Requests the next (yet unrequested) event from the stream. | 123 /// Requests the next (yet unrequested) event from the stream. |
| (...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 209 Future<List<T>> take(int count) { | 205 Future<List<T>> take(int count) { |
| 210 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 206 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| 211 if (!_isClosed) { | 207 if (!_isClosed) { |
| 212 var request = new _TakeRequest<T>(count); | 208 var request = new _TakeRequest<T>(count); |
| 213 _addRequest(request); | 209 _addRequest(request); |
| 214 return request.future; | 210 return request.future; |
| 215 } | 211 } |
| 216 throw _failClosed(); | 212 throw _failClosed(); |
| 217 } | 213 } |
| 218 | 214 |
| 219 /// Cancels the underlying stream subscription. | 215 /// Cancels the underlying event source. |
| 220 /// | 216 /// |
| 221 /// If [immediate] is `false` (the default), the cancel operation waits until | 217 /// If [immediate] is `false` (the default), the cancel operation waits until |
| 222 /// all previously requested events have been processed, then it cancels the | 218 /// all previously requested events have been processed, then it cancels the |
| 223 /// subscription providing the events. | 219 /// subscription providing the events. |
| 224 /// | 220 /// |
| 225 /// If [immediate] is `true`, the subscription is instead canceled | 221 /// If [immediate] is `true`, the source is instead canceled |
| 226 /// immediately. Any pending events complete with a 'closed'-event, as though | 222 /// immediately. Any pending events are completed as though the underlying |
| 227 /// the stream had closed by itself. | 223 /// stream had closed. |
| 228 /// | 224 /// |
| 229 /// The returned future completes with the result of calling | 225 /// The returned future completes with the result of calling |
| 230 /// `cancel`. | 226 /// `cancel`. |
| 231 /// | 227 /// |
| 232 /// After calling `cancel`, no further events can be requested. | 228 /// After calling `cancel`, no further events can be requested. |
| 233 /// None of [next], [rest], [skip], [take] or [cancel] may be | 229 /// None of [next], [rest], [skip], [take] or [cancel] may be |
| 234 /// called again. | 230 /// called again. |
| 235 Future cancel({bool immediate: false}) { | 231 Future cancel({bool immediate: false}) { |
| 236 if (_isClosed) throw _failClosed(); | 232 if (_isClosed) throw _failClosed(); |
| 237 _isClosed = true; | 233 _isClosed = true; |
| 238 | 234 |
| 239 if (!immediate) { | 235 if (!immediate) { |
| 240 var request = new _CancelRequest(this); | 236 var request = new _CancelRequest(this); |
| 241 _addRequest(request); | 237 _addRequest(request); |
| 242 return request.future; | 238 return request.future; |
| 243 } | 239 } |
| 244 | 240 |
| 245 if (_isDone) return new Future.value(); | 241 if (_isDone && _eventQueue.isEmpty) return new Future.value(); |
| 246 if (_subscription == null) _subscription = _sourceStream.listen(null); | 242 return _cancel(); |
| 247 var future = _subscription.cancel(); | |
| 248 _onDone(); | |
| 249 return future; | |
| 250 } | 243 } |
| 251 | 244 |
| 245 // ------------------------------------------------------------------ |
| 246 // Methods that may be called from the request implementations to |
| 247 // control the even stream. |
| 248 |
| 249 /// Matches events with requests. |
| 250 /// |
| 251 /// Called after receiving an event or when the event source closes. |
| 252 /// |
| 253 /// May be called by requests which have returned `false` (saying they |
| 254 /// are not yet done) so they can be checked again before any new |
| 255 /// events arrive. |
| 256 /// Any request returing `false` from `update` when `isDone` is `true` |
| 257 /// *must* call `_updateRequests` when they are ready to continue |
| 258 /// (since no further events will trigger the call). |
| 259 void _updateRequests() { |
| 260 while (_requestQueue.isNotEmpty) { |
| 261 if (_requestQueue.first.update(_eventQueue, _isDone)) { |
| 262 _requestQueue.removeFirst(); |
| 263 } else { |
| 264 return; |
| 265 } |
| 266 } |
| 267 |
| 268 if (!_isDone) { |
| 269 _pause(); |
| 270 } |
| 271 } |
| 272 |
| 273 /// Extracts a stream from the event source and makes this stream queue |
| 274 /// unusable. |
| 275 /// |
| 276 /// Can only be used by the very last request (the stream queue must |
| 277 /// be closed by that request). |
| 278 /// Only used by [rest]. |
| 279 Stream _extractStream(); |
| 280 |
| 281 /// Requests that the event source pauses events. |
| 282 /// |
| 283 /// This is called automatically when the request queue is empty. |
| 284 /// |
| 285 /// The event source is restarted by the next call to [_ensureListening]. |
| 286 void _pause(); |
| 287 |
| 288 /// Ensures that we are listening on events from the event source. |
| 289 /// |
| 290 /// Starts listening for the first time or resumes after a [_pause]. |
| 291 /// |
| 292 /// Is called automatically if a request requires more events. |
| 293 void _ensureListening(); |
| 294 |
| 295 /// Cancels the underlying event source. |
| 296 Future _cancel(); |
| 297 |
| 298 // ------------------------------------------------------------------ |
| 299 // Methods called by the event source to add events or say that it's |
| 300 // done. |
| 301 |
| 302 /// Called when the event source adds a new data or error event. |
| 303 /// Always calls [_updateRequests] after adding. |
| 304 void _addResult(Result result) { |
| 305 _eventQueue.add(result); |
| 306 _updateRequests(); |
| 307 } |
| 308 |
| 309 /// Called when the event source is done. |
| 310 /// Always calls [_updateRequests] after adding. |
| 311 void _close() { |
| 312 _isDone = true; |
| 313 _updateRequests(); |
| 314 } |
| 315 |
| 316 // ------------------------------------------------------------------ |
| 317 // Internal helper methods. |
| 318 |
| 252 /// Returns an error for when a request is made after cancel. | 319 /// Returns an error for when a request is made after cancel. |
| 253 /// | 320 /// |
| 254 /// Returns a [StateError] with a message saying that either | 321 /// Returns a [StateError] with a message saying that either |
| 255 /// [cancel] or [rest] have already been called. | 322 /// [cancel] or [rest] have already been called. |
| 256 Error _failClosed() { | 323 Error _failClosed() { |
| 257 return new StateError("Already cancelled"); | 324 return new StateError("Already cancelled"); |
| 258 } | 325 } |
| 259 | 326 |
| 260 // Callbacks receiving the events of the source stream. | |
| 261 | |
| 262 void _onData(T data) { | |
| 263 _eventQueue.add(new Result.value(data)); | |
| 264 _checkQueues(); | |
| 265 } | |
| 266 | |
| 267 void _onError(error, StackTrace stack) { | |
| 268 _eventQueue.add(new Result.error(error, stack)); | |
| 269 _checkQueues(); | |
| 270 } | |
| 271 | |
| 272 void _onDone() { | |
| 273 _subscription = null; | |
| 274 _isDone = true; | |
| 275 _closeAllRequests(); | |
| 276 } | |
| 277 | |
| 278 // Request queue management. | |
| 279 | |
| 280 /// Adds a new request to the queue. | 327 /// Adds a new request to the queue. |
| 328 /// |
| 329 /// If the request queue is empty and the request can be completed |
| 330 /// immediately, it skips the queue. |
| 281 void _addRequest(_EventRequest request) { | 331 void _addRequest(_EventRequest request) { |
| 282 if (_isDone) { | |
| 283 assert(_requestQueue.isEmpty); | |
| 284 if (!request.addEvents(_eventQueue)) { | |
| 285 request.close(_eventQueue); | |
| 286 } | |
| 287 return; | |
| 288 } | |
| 289 if (_requestQueue.isEmpty) { | 332 if (_requestQueue.isEmpty) { |
| 290 if (request.addEvents(_eventQueue)) return; | 333 if (request.update(_eventQueue, _isDone)) return; |
| 291 _ensureListening(); | 334 _ensureListening(); |
| 292 } | 335 } |
| 293 _requestQueue.add(request); | 336 _requestQueue.add(request); |
| 294 } | 337 } |
| 338 } |
| 295 | 339 |
| 296 /// Ensures that we are listening on events from [_sourceStream]. | 340 |
| 341 /// The default implementation of [StreamQueue]. |
| 342 /// |
| 343 /// This queue gets its events from a stream which is listened |
| 344 /// to when a request needs events. |
| 345 class _StreamQueue<T> extends StreamQueue<T> { |
| 346 /// Source of events. |
| 347 final Stream _sourceStream; |
| 348 |
| 349 /// Subscription on [_sourceStream] while listening for events. |
| 297 /// | 350 /// |
| 298 /// Resumes subscription on [_sourceStream], or creates it if necessary. | 351 /// Set to subscription when listening, and set to `null` when the |
| 352 /// subscription is done (and [_isDone] is set to true). |
| 353 StreamSubscription _subscription; |
| 354 |
| 355 _StreamQueue(this._sourceStream) : super._(); |
| 356 |
| 357 Future _cancel() { |
| 358 if (_isDone) return null; |
| 359 if (_subscription == null) _subscription = _sourceStream.listen(null); |
| 360 var future = _subscription.cancel(); |
| 361 _close(); |
| 362 return future; |
| 363 } |
| 364 |
| 299 void _ensureListening() { | 365 void _ensureListening() { |
| 300 assert(!_isDone); | 366 assert(!_isDone); |
| 301 if (_subscription == null) { | 367 if (_subscription == null) { |
| 302 _subscription = | 368 _subscription = |
| 303 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | 369 _sourceStream.listen( |
| 370 (data) { |
| 371 _addResult(new Result.value(data)); |
| 372 }, |
| 373 onError: (error, StackTrace stackTrace) { |
| 374 _addResult(new Result.error(error, stackTrace)); |
| 375 }, |
| 376 onDone: () { |
| 377 _subscription = null; |
| 378 this._close(); |
| 379 }); |
| 304 } else { | 380 } else { |
| 305 _subscription.resume(); | 381 _subscription.resume(); |
| 306 } | 382 } |
| 307 } | 383 } |
| 308 | 384 |
| 309 /// Removes all requests and closes them. | 385 void _pause() { |
| 310 /// | 386 _subscription.pause(); |
| 311 /// Used when the source stream is done. | |
| 312 /// After this, no further requests will be added to the queue, | |
| 313 /// requests are immediately served entirely by events already in the event | |
| 314 /// queue, if any. | |
| 315 void _closeAllRequests() { | |
| 316 assert(_isDone); | |
| 317 while (_requestQueue.isNotEmpty) { | |
| 318 var request = _requestQueue.removeFirst(); | |
| 319 if (!request.addEvents(_eventQueue)) { | |
| 320 request.close(_eventQueue); | |
| 321 } | |
| 322 } | |
| 323 } | 387 } |
| 324 | 388 |
| 325 /// Matches events with requests. | 389 Stream<T> _extractStream() { |
| 326 /// | 390 assert(_isClosed); |
| 327 /// Called after receiving an event. | 391 if (_isDone) { |
| 328 void _checkQueues() { | 392 return new Stream<T>.empty(); |
| 329 while (_requestQueue.isNotEmpty) { | |
| 330 if (_requestQueue.first.addEvents(_eventQueue)) { | |
| 331 _requestQueue.removeFirst(); | |
| 332 } else { | |
| 333 return; | |
| 334 } | |
| 335 } | 393 } |
| 336 if (!_isDone) { | 394 |
| 337 _subscription.pause(); | 395 if (_subscription == null) { |
| 396 return _sourceStream; |
| 338 } | 397 } |
| 339 } | |
| 340 | 398 |
| 341 /// Extracts the subscription and makes this stream queue unusable. | |
| 342 /// | |
| 343 /// Can only be used by the very last request. | |
| 344 StreamSubscription _dispose() { | |
| 345 assert(_isClosed); | |
| 346 var subscription = _subscription; | 399 var subscription = _subscription; |
| 347 _subscription = null; | 400 _subscription = null; |
| 348 _isDone = true; | 401 _isDone = true; |
| 349 return subscription; | 402 |
| 403 var wasPaused = subscription.isPaused; |
| 404 var result = new SubscriptionStream<T>(subscription); |
| 405 // Resume after creating stream because that pauses the subscription too. |
| 406 // This way there won't be a short resumption in the middle. |
| 407 if (wasPaused) subscription.resume(); |
| 408 return result; |
| 350 } | 409 } |
| 351 } | 410 } |
| 352 | 411 |
| 412 |
| 353 /// Request object that receives events when they arrive, until fulfilled. | 413 /// Request object that receives events when they arrive, until fulfilled. |
| 354 /// | 414 /// |
| 355 /// Each request that cannot be fulfilled immediately is represented by | 415 /// Each request that cannot be fulfilled immediately is represented by |
| 356 /// an `_EventRequest` object in the request queue. | 416 /// an `_EventRequest` object in the request queue. |
| 357 /// | 417 /// |
| 358 /// Events from the source stream are sent to the first request in the | 418 /// Events from the source stream are sent to the first request in the |
| 359 /// queue until it reports itself as [isComplete]. | 419 /// queue until it reports itself as [isComplete]. |
| 360 /// | 420 /// |
| 361 /// When the first request in the queue `isComplete`, either when becoming | 421 /// When the first request in the queue `isComplete`, either when becoming |
| 362 /// the first request or after receiving an event, its [close] methods is | 422 /// the first request or after receiving an event, its [close] methods is |
| 363 /// called. | 423 /// called. |
| 364 /// | 424 /// |
| 365 /// The [close] method is also called immediately when the source stream | 425 /// The [close] method is also called immediately when the source stream |
| 366 /// is done. | 426 /// is done. |
| 367 abstract class _EventRequest { | 427 abstract class _EventRequest { |
| 368 /// Handle available events. | 428 /// Handle available events. |
| 369 /// | 429 /// |
| 370 /// The available events are provided as a queue. The `addEvents` function | 430 /// The available events are provided as a queue. The `update` function |
| 371 /// should only remove events from the front of the event queue, e.g., | 431 /// should only remove events from the front of the event queue, e.g., |
| 372 /// using [removeFirst]. | 432 /// using [removeFirst]. |
| 373 /// | 433 /// |
| 374 /// Returns `true` if the request is completed, or `false` if it needs | 434 /// Returns `true` if the request is completed, or `false` if it needs |
| 375 /// more events. | 435 /// more events. |
| 376 /// The call may keep events in the queue until the requeust is complete, | 436 /// The call may keep events in the queue until the requeust is complete, |
| 377 /// or it may remove them immediately. | 437 /// or it may remove them immediately. |
| 378 /// | 438 /// |
| 379 /// If the method returns true, the request is considered fulfilled, and | 439 /// If the method returns true, the request is considered fulfilled, and |
| 380 /// will never be called again. | 440 /// will never be called again. |
| 381 /// | 441 /// |
| 382 /// This method is called when a request reaches the front of the request | 442 /// This method is called when a request reaches the front of the request |
| 383 /// queue, and if it returns `false`, it's called again every time a new event | 443 /// queue, and if it returns `false`, it's called again every time a new event |
| 384 /// becomes available, or when the stream closes. | 444 /// becomes available, or when the stream closes. |
| 385 bool addEvents(Queue<Result> events); | 445 /// If the function returns `false` when the stream has already closed |
| 386 | 446 /// ([isDone] is true), then the request must call |
| 387 /// Complete the request. | 447 /// [StreamQueue._updateRequests] itself when it's ready to continue. |
| 388 /// | 448 bool update(Queue<Result> events, bool isDone); |
| 389 /// This is called when the source stream is done before the request | |
| 390 /// had a chance to receive all its events. That is, after a call | |
| 391 /// to [addEvents] has returned `false`. | |
| 392 /// If there are any unused events available, they are in the [events] queue. | |
| 393 /// No further events will become available. | |
| 394 /// | |
| 395 /// The queue should only remove events from the front of the event queue, | |
| 396 /// e.g., using [removeFirst]. | |
| 397 /// | |
| 398 /// If the request kept events in the queue after an [addEvents] call, | |
| 399 /// this is the last chance to use them. | |
| 400 void close(Queue<Result> events); | |
| 401 } | 449 } |
| 402 | 450 |
| 403 /// Request for a [StreamQueue.next] call. | 451 /// Request for a [StreamQueue.next] call. |
| 404 /// | 452 /// |
| 405 /// Completes the returned future when receiving the first event, | 453 /// Completes the returned future when receiving the first event, |
| 406 /// and is then complete. | 454 /// and is then complete. |
| 407 class _NextRequest<T> implements _EventRequest { | 455 class _NextRequest<T> implements _EventRequest { |
| 408 /// Completer for the future returned by [StreamQueue.next]. | 456 /// Completer for the future returned by [StreamQueue.next]. |
| 409 final Completer _completer; | 457 final Completer _completer; |
| 410 | 458 |
| 411 _NextRequest() : _completer = new Completer<T>(); | 459 _NextRequest() : _completer = new Completer<T>(); |
| 412 | 460 |
| 413 Future<T> get future => _completer.future; | 461 Future<T> get future => _completer.future; |
| 414 | 462 |
| 415 bool addEvents(Queue<Result> events) { | 463 bool update(Queue<Result> events, bool isDone) { |
| 416 if (events.isEmpty) return false; | 464 if (events.isNotEmpty) { |
| 417 events.removeFirst().complete(_completer); | 465 events.removeFirst().complete(_completer); |
| 418 return true; | 466 return true; |
| 419 } | 467 } |
| 420 | 468 if (isDone) { |
| 421 void close(Queue<Result> events) { | 469 var errorFuture = |
| 422 var errorFuture = | 470 new Future.sync(() => throw new StateError("No elements")); |
| 423 new Future.sync(() => throw new StateError("No elements")); | 471 _completer.complete(errorFuture); |
| 424 _completer.complete(errorFuture); | 472 return true; |
| 473 } |
| 474 return false; |
| 425 } | 475 } |
| 426 } | 476 } |
| 427 | 477 |
| 428 /// Request for a [StreamQueue.skip] call. | 478 /// Request for a [StreamQueue.skip] call. |
| 429 class _SkipRequest implements _EventRequest { | 479 class _SkipRequest implements _EventRequest { |
| 430 /// Completer for the future returned by the skip call. | 480 /// Completer for the future returned by the skip call. |
| 431 final Completer _completer = new Completer<int>(); | 481 final Completer _completer = new Completer<int>(); |
| 432 | 482 |
| 433 /// Number of remaining events to skip. | 483 /// Number of remaining events to skip. |
| 434 /// | 484 /// |
| 435 /// The request [isComplete] when the values reaches zero. | 485 /// The request [isComplete] when the values reaches zero. |
| 436 /// | 486 /// |
| 437 /// Decremented when an event is seen. | 487 /// Decremented when an event is seen. |
| 438 /// Set to zero when an error is seen since errors abort the skip request. | 488 /// Set to zero when an error is seen since errors abort the skip request. |
| 439 int _eventsToSkip; | 489 int _eventsToSkip; |
| 440 | 490 |
| 441 _SkipRequest(this._eventsToSkip); | 491 _SkipRequest(this._eventsToSkip); |
| 442 | 492 |
| 443 /// The future completed when the correct number of events have been skipped. | 493 /// The future completed when the correct number of events have been skipped. |
| 444 Future get future => _completer.future; | 494 Future get future => _completer.future; |
| 445 | 495 |
| 446 bool addEvents(Queue<Result> events) { | 496 bool update(Queue<Result> events, bool isDone) { |
| 447 while (_eventsToSkip > 0) { | 497 while (_eventsToSkip > 0) { |
| 448 if (events.isEmpty) return false; | 498 if (events.isEmpty) { |
| 499 if (isDone) break; |
| 500 return false; |
| 501 } |
| 449 _eventsToSkip--; | 502 _eventsToSkip--; |
| 503 |
| 450 var event = events.removeFirst(); | 504 var event = events.removeFirst(); |
| 451 if (event.isError) { | 505 if (event.isError) { |
| 452 event.complete(_completer); | 506 event.complete(_completer); |
| 453 return true; | 507 return true; |
| 454 } | 508 } |
| 455 } | 509 } |
| 456 _completer.complete(0); | 510 _completer.complete(_eventsToSkip); |
| 457 return true; | 511 return true; |
| 458 } | 512 } |
| 459 | |
| 460 void close(Queue<Result> events) { | |
| 461 _completer.complete(_eventsToSkip); | |
| 462 } | |
| 463 } | 513 } |
| 464 | 514 |
| 465 /// Request for a [StreamQueue.take] call. | 515 /// Request for a [StreamQueue.take] call. |
| 466 class _TakeRequest<T> implements _EventRequest { | 516 class _TakeRequest<T> implements _EventRequest { |
| 467 /// Completer for the future returned by the take call. | 517 /// Completer for the future returned by the take call. |
| 468 final Completer _completer; | 518 final Completer _completer; |
| 469 | 519 |
| 470 /// List collecting events until enough have been seen. | 520 /// List collecting events until enough have been seen. |
| 471 final List _list = <T>[]; | 521 final List _list = <T>[]; |
| 472 | 522 |
| 473 /// Number of events to capture. | 523 /// Number of events to capture. |
| 474 /// | 524 /// |
| 475 /// The request [isComplete] when the length of [_list] reaches | 525 /// The request [isComplete] when the length of [_list] reaches |
| 476 /// this value. | 526 /// this value. |
| 477 final int _eventsToTake; | 527 final int _eventsToTake; |
| 478 | 528 |
| 479 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | 529 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
| 480 | 530 |
| 481 /// The future completed when the correct number of events have been captured. | 531 /// The future completed when the correct number of events have been captured. |
| 482 Future get future => _completer.future; | 532 Future get future => _completer.future; |
| 483 | 533 |
| 484 bool addEvents(Queue<Result> events) { | 534 bool update(Queue<Result> events, bool isDone) { |
| 485 while (_list.length < _eventsToTake) { | 535 while (_list.length < _eventsToTake) { |
| 486 if (events.isEmpty) return false; | 536 if (events.isEmpty) { |
| 537 if (isDone) break; |
| 538 return false; |
| 539 } |
| 540 |
| 487 var result = events.removeFirst(); | 541 var result = events.removeFirst(); |
| 488 if (result.isError) { | 542 if (result.isError) { |
| 489 result.complete(_completer); | 543 result.complete(_completer); |
| 490 return true; | 544 return true; |
| 491 } | 545 } |
| 492 _list.add(result.asValue.value); | 546 _list.add(result.asValue.value); |
| 493 } | 547 } |
| 494 _completer.complete(_list); | 548 _completer.complete(_list); |
| 495 return true; | 549 return true; |
| 496 } | 550 } |
| 497 | |
| 498 void close(Queue<Result> events) { | |
| 499 _completer.complete(_list); | |
| 500 } | |
| 501 } | 551 } |
| 502 | 552 |
| 503 /// Request for a [StreamQueue.cancel] call. | 553 /// Request for a [StreamQueue.cancel] call. |
| 504 /// | 554 /// |
| 505 /// The request needs no events, it just waits in the request queue | 555 /// The request needs no events, it just waits in the request queue |
| 506 /// until all previous events are fulfilled, then it cancels the stream queue | 556 /// until all previous events are fulfilled, then it cancels the stream queue |
| 507 /// source subscription. | 557 /// source subscription. |
| 508 class _CancelRequest implements _EventRequest { | 558 class _CancelRequest implements _EventRequest { |
| 509 /// Completer for the future returned by the `cancel` call. | 559 /// Completer for the future returned by the `cancel` call. |
| 510 final Completer _completer = new Completer(); | 560 final Completer _completer = new Completer(); |
| 511 | 561 |
| 512 /// The [StreamQueue] object that has this request queued. | 562 /// The [StreamQueue] object that has this request queued. |
| 513 /// | 563 /// |
| 514 /// When the event is completed, it needs to cancel the active subscription | 564 /// When the event is completed, it needs to cancel the active subscription |
| 515 /// of the `StreamQueue` object, if any. | 565 /// of the `StreamQueue` object, if any. |
| 516 final StreamQueue _streamQueue; | 566 final StreamQueue _streamQueue; |
| 517 | 567 |
| 518 _CancelRequest(this._streamQueue); | 568 _CancelRequest(this._streamQueue); |
| 519 | 569 |
| 520 /// The future completed when the cancel request is completed. | 570 /// The future completed when the cancel request is completed. |
| 521 Future get future => _completer.future; | 571 Future get future => _completer.future; |
| 522 | 572 |
| 523 bool addEvents(Queue<Result> events) { | 573 bool update(Queue<Result> events, bool isDone) { |
| 524 _shutdown(); | |
| 525 return true; | |
| 526 } | |
| 527 | |
| 528 void close(_) { | |
| 529 _shutdown(); | |
| 530 } | |
| 531 | |
| 532 void _shutdown() { | |
| 533 if (_streamQueue._isDone) { | 574 if (_streamQueue._isDone) { |
| 534 _completer.complete(); | 575 _completer.complete(); |
| 535 } else { | 576 } else { |
| 536 _streamQueue._ensureListening(); | 577 _streamQueue._ensureListening(); |
| 537 _completer.complete(_streamQueue._dispose().cancel()); | 578 _completer.complete(_streamQueue._extractStream().listen(null).cancel()); |
| 538 } | 579 } |
| 580 return true; |
| 539 } | 581 } |
| 540 } | 582 } |
| 541 | 583 |
| 542 /// Request for a [StreamQueue.rest] call. | 584 /// Request for a [StreamQueue.rest] call. |
| 543 /// | 585 /// |
| 544 /// The request is always complete, it just waits in the request queue | 586 /// The request is always complete, it just waits in the request queue |
| 545 /// until all previous events are fulfilled, then it takes over the | 587 /// until all previous events are fulfilled, then it takes over the |
| 546 /// stream events subscription and creates a stream from it. | 588 /// stream events subscription and creates a stream from it. |
| 547 class _RestRequest<T> implements _EventRequest { | 589 class _RestRequest<T> implements _EventRequest { |
| 548 /// Completer for the stream returned by the `rest` call. | 590 /// Completer for the stream returned by the `rest` call. |
| 549 final StreamCompleter _completer = new StreamCompleter<T>(); | 591 final StreamCompleter _completer = new StreamCompleter<T>(); |
| 550 | 592 |
| 551 /// The [StreamQueue] object that has this request queued. | 593 /// The [StreamQueue] object that has this request queued. |
| 552 /// | 594 /// |
| 553 /// When the event is completed, it needs to cancel the active subscription | 595 /// When the event is completed, it needs to cancel the active subscription |
| 554 /// of the `StreamQueue` object, if any. | 596 /// of the `StreamQueue` object, if any. |
| 555 final StreamQueue _streamQueue; | 597 final StreamQueue _streamQueue; |
| 556 | 598 |
| 557 _RestRequest(this._streamQueue); | 599 _RestRequest(this._streamQueue); |
| 558 | 600 |
| 559 /// The stream which will contain the remaining events of [_streamQueue]. | 601 /// The stream which will contain the remaining events of [_streamQueue]. |
| 560 Stream<T> get stream => _completer.stream; | 602 Stream<T> get stream => _completer.stream; |
| 561 | 603 |
| 562 bool addEvents(Queue<Result> events) { | 604 bool update(Queue<Result> events, bool isDone) { |
| 563 _completeStream(events); | |
| 564 return true; | |
| 565 } | |
| 566 | |
| 567 void close(Queue<Result> events) { | |
| 568 _completeStream(events); | |
| 569 } | |
| 570 | |
| 571 void _completeStream(Queue<Result> events) { | |
| 572 if (events.isEmpty) { | 605 if (events.isEmpty) { |
| 573 if (_streamQueue._isDone) { | 606 if (_streamQueue._isDone) { |
| 574 _completer.setEmpty(); | 607 _completer.setEmpty(); |
| 575 } else { | 608 } else { |
| 576 _completer.setSourceStream(_getRestStream()); | 609 _completer.setSourceStream(_streamQueue._extractStream()); |
| 577 } | 610 } |
| 578 } else { | 611 } else { |
| 579 // There are prefetched events which needs to be added before the | 612 // There are prefetched events which needs to be added before the |
| 580 // remaining stream. | 613 // remaining stream. |
| 581 var controller = new StreamController<T>(); | 614 var controller = new StreamController<T>(); |
| 582 for (var event in events) { | 615 for (var event in events) { |
| 583 event.addTo(controller); | 616 event.addTo(controller); |
| 584 } | 617 } |
| 585 controller.addStream(_getRestStream(), cancelOnError: false) | 618 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) |
| 586 .whenComplete(controller.close); | 619 .whenComplete(controller.close); |
| 587 _completer.setSourceStream(controller.stream); | 620 _completer.setSourceStream(controller.stream); |
| 588 } | 621 } |
| 589 } | 622 return true; |
| 590 | |
| 591 /// Create a stream from the rest of [_streamQueue]'s subscription. | |
| 592 Stream _getRestStream() { | |
| 593 if (_streamQueue._isDone) { | |
| 594 var controller = new StreamController<T>()..close(); | |
| 595 return controller.stream; | |
| 596 // TODO(lrn). Use the following when 1.11 is released. | |
| 597 // return new Stream<T>.empty(); | |
| 598 } | |
| 599 if (_streamQueue._subscription == null) { | |
| 600 return _streamQueue._sourceStream; | |
| 601 } | |
| 602 var subscription = _streamQueue._dispose(); | |
| 603 subscription.resume(); | |
| 604 return new SubscriptionStream<T>(subscription); | |
| 605 } | 623 } |
| 606 } | 624 } |
| 607 | 625 |
| 608 /// Request for a [StreamQueue.hasNext] call. | 626 /// Request for a [StreamQueue.hasNext] call. |
| 609 /// | 627 /// |
| 610 /// Completes the [future] with `true` if it sees any event, | 628 /// Completes the [future] with `true` if it sees any event, |
| 611 /// but doesn't consume the event. | 629 /// but doesn't consume the event. |
| 612 /// If the request is closed without seeing an event, then | 630 /// If the request is closed without seeing an event, then |
| 613 /// the [future] is completed with `false`. | 631 /// the [future] is completed with `false`. |
| 614 class _HasNextRequest<T> implements _EventRequest { | 632 class _HasNextRequest<T> implements _EventRequest { |
| 615 final Completer _completer = new Completer<bool>(); | 633 final Completer _completer = new Completer<bool>(); |
| 616 | 634 |
| 617 Future<bool> get future => _completer.future; | 635 Future<bool> get future => _completer.future; |
| 618 | 636 |
| 619 bool addEvents(Queue<Result> events) { | 637 bool update(Queue<Result> events, bool isDone) { |
| 620 if (events.isNotEmpty) { | 638 if (events.isNotEmpty) { |
| 621 _completer.complete(true); | 639 _completer.complete(true); |
| 622 return true; | 640 return true; |
| 623 } | 641 } |
| 642 if (isDone) { |
| 643 _completer.complete(false); |
| 644 return true; |
| 645 } |
| 624 return false; | 646 return false; |
| 625 } | 647 } |
| 626 | |
| 627 void close(_) { | |
| 628 _completer.complete(false); | |
| 629 } | |
| 630 } | 648 } |
| OLD | NEW |