Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library async.stream_events; | 5 library async.stream_events; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection'; | 8 import 'dart:collection'; |
| 9 | 9 |
| 10 import "subscription_stream.dart"; | 10 import "subscription_stream.dart"; |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 53 /// var headerCount = | 53 /// var headerCount = |
| 54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); | 54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); |
| 55 /// handleMessage(headers: await events.take(headerCount), | 55 /// handleMessage(headers: await events.take(headerCount), |
| 56 /// body: events.rest); | 56 /// body: events.rest); |
| 57 /// return; | 57 /// return; |
| 58 /// } | 58 /// } |
| 59 /// // Error handling. | 59 /// // Error handling. |
| 60 /// | 60 /// |
| 61 /// When you need no further events the `StreamQueue` should be closed | 61 /// When you need no further events the `StreamQueue` should be closed |
| 62 /// using [cancel]. This releases the underlying stream subscription. | 62 /// using [cancel]. This releases the underlying stream subscription. |
| 63 class StreamQueue<T> { | 63 abstract class StreamQueue<T> { |
| 64 // This class maintains two queues: one of events and one of requests. | 64 // This class maintains two queues: one of events and one of requests. |
| 65 // The active request (the one in front of the queue) is called with | 65 // The active request (the one in front of the queue) is called with |
| 66 // the current event queue when it becomes active. | 66 // the current event queue when it becomes active. |
| 67 // | 67 // |
| 68 // If the request returns true, it's complete and will be removed from the | 68 // If the request returns true, it's complete and will be removed from the |
| 69 // request queue. | 69 // request queue. |
| 70 // If the request returns false, it needs more events, and will be called | 70 // If the request returns false, it needs more events, and will be called |
| 71 // again when new events are available. | 71 // again when new events are available. |
| 72 // The request can remove events that it uses, or keep them in the event | 72 // The request can remove events that it uses, or keep them in the event |
| 73 // queue until it has all that it needs. | 73 // queue until it has all that it needs. |
| 74 // | 74 // |
| 75 // This model is very flexible and easily extensible. | 75 // This model is very flexible and easily extensible. |
| 76 // It allows requests that don't consume events (like [hasNext]) or | 76 // It allows requests that don't consume events (like [hasNext]) or |
| 77 // potentially a request that takes either five or zero events, determined | 77 // potentially a request that takes either five or zero events, determined |
| 78 // by the content of the fifth event. | 78 // by the content of the fifth event. |
| 79 | 79 |
| 80 /// Source of events. | 80 /// Whether the event source done. |
|
nweiz
2015/08/21 19:43:57
"is done"
Lasse Reichstein Nielsen
2015/08/24 08:31:33
Done.
I mean: Is done! :)
| |
| 81 final Stream _sourceStream; | |
| 82 | |
| 83 /// Subscription on [_sourceStream] while listening for events. | |
| 84 /// | |
| 85 /// Set to subscription when listening, and set to `null` when the | |
| 86 /// subscription is done (and [_isDone] is set to true). | |
| 87 StreamSubscription _subscription; | |
| 88 | |
| 89 /// Whether we have listened on [_sourceStream] and the subscription is done. | |
| 90 bool _isDone = false; | 81 bool _isDone = false; |
| 91 | 82 |
| 92 /// Whether a closing operation has been performed on the stream queue. | 83 /// Whether a closing operation has been performed on the stream queue. |
| 93 /// | 84 /// |
| 94 /// Closing operations are [cancel] and [rest]. | 85 /// Closing operations are [cancel] and [rest]. |
| 95 bool _isClosed = false; | 86 bool _isClosed = false; |
| 96 | 87 |
| 97 /// Queue of events not used by a request yet. | 88 /// Queue of events not used by a request yet. |
| 98 final Queue<Result> _eventQueue = new Queue(); | 89 final Queue<Result> _eventQueue = new Queue(); |
| 99 | 90 |
| 100 /// Queue of pending requests. | 91 /// Queue of pending requests. |
| 101 /// | 92 /// |
| 102 /// Access through methods below to ensure consistency. | 93 /// Access through methods below to ensure consistency. |
| 103 final Queue<_EventRequest> _requestQueue = new Queue(); | 94 final Queue<_EventRequest> _requestQueue = new Queue(); |
| 104 | 95 |
| 105 /// Create a `StreamQueue` of the events of [source]. | 96 /// Create a `StreamQueue` of the events of [source]. |
| 106 StreamQueue(Stream source) | 97 factory StreamQueue(Stream source) = _StreamQueue<T>; |
| 107 : _sourceStream = source; | 98 |
| 99 StreamQueue._(); | |
| 108 | 100 |
| 109 /// Asks if the stream has any more events. | 101 /// Asks if the stream has any more events. |
| 110 /// | 102 /// |
| 111 /// Returns a future that completes with `true` if the stream has any | 103 /// Returns a future that completes with `true` if the stream has any |
| 112 /// more events, whether data or error. | 104 /// more events, whether data or error. |
| 113 /// If the stream closes without producing any more events, the returned | 105 /// If the stream closes without producing any more events, the returned |
| 114 /// future completes with `false`. | 106 /// future completes with `false`. |
| 115 /// | 107 /// |
| 116 /// Can be used before using [next] to avoid getting an error in the | 108 /// Can be used before using [next] to avoid getting an error in the |
| 117 /// future returned by `next` in the case where there are no more events. | 109 /// future returned by `next` in the case where there are no more events. |
| 110 /// Another alternative is to use `take(1)` which returns either zero or | |
| 111 /// one events. | |
| 118 Future<bool> get hasNext { | 112 Future<bool> get hasNext { |
| 119 if (!_isClosed) { | 113 if (!_isClosed) { |
| 120 var hasNextRequest = new _HasNextRequest(); | 114 var hasNextRequest = new _HasNextRequest(); |
| 121 _addRequest(hasNextRequest); | 115 _addRequest(hasNextRequest); |
| 122 return hasNextRequest.future; | 116 return hasNextRequest.future; |
| 123 } | 117 } |
| 124 throw _failClosed(); | 118 throw _failClosed(); |
| 125 } | 119 } |
| 126 | 120 |
| 127 /// Requests the next (yet unrequested) event from the stream. | 121 /// Requests the next (yet unrequested) event from the stream. |
| (...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 209 Future<List<T>> take(int count) { | 203 Future<List<T>> take(int count) { |
| 210 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 204 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| 211 if (!_isClosed) { | 205 if (!_isClosed) { |
| 212 var request = new _TakeRequest<T>(count); | 206 var request = new _TakeRequest<T>(count); |
| 213 _addRequest(request); | 207 _addRequest(request); |
| 214 return request.future; | 208 return request.future; |
| 215 } | 209 } |
| 216 throw _failClosed(); | 210 throw _failClosed(); |
| 217 } | 211 } |
| 218 | 212 |
| 219 /// Cancels the underlying stream subscription. | 213 /// Cancels the underlying event source. |
| 220 /// | 214 /// |
| 221 /// If [immediate] is `false` (the default), the cancel operation waits until | 215 /// If [immediate] is `false` (the default), the cancel operation waits until |
| 222 /// all previously requested events have been processed, then it cancels the | 216 /// all previously requested events have been processed, then it cancels the |
| 223 /// subscription providing the events. | 217 /// subscription providing the events. |
| 224 /// | 218 /// |
| 225 /// If [immediate] is `true`, the subscription is instead canceled | 219 /// If [immediate] is `true`, the source is instead canceled |
| 226 /// immediately. Any pending events are completed as though the underlying | 220 /// immediately. Any pending events are completed as though the underlying |
| 227 /// stream had closed. | 221 /// stream had closed. |
| 228 /// | 222 /// |
| 229 /// The returned future completes with the result of calling | 223 /// The returned future completes with the result of calling |
| 230 /// `cancel`. | 224 /// `cancel`. |
| 231 /// | 225 /// |
| 232 /// After calling `cancel`, no further events can be requested. | 226 /// After calling `cancel`, no further events can be requested. |
| 233 /// None of [next], [rest], [skip], [take] or [cancel] may be | 227 /// None of [next], [rest], [skip], [take] or [cancel] may be |
| 234 /// called again. | 228 /// called again. |
| 235 Future cancel({bool immediate: false}) { | 229 Future cancel({bool immediate: false}) { |
| 236 if (_isClosed) throw _failClosed(); | 230 if (_isClosed) throw _failClosed(); |
| 237 _isClosed = true; | 231 _isClosed = true; |
| 238 | 232 |
| 239 if (!immediate) { | 233 if (!immediate) { |
| 240 var request = new _CancelRequest(this); | 234 var request = new _CancelRequest(this); |
| 241 _addRequest(request); | 235 _addRequest(request); |
| 242 return request.future; | 236 return request.future; |
| 243 } | 237 } |
| 244 | 238 |
| 245 if (_isDone) return new Future.value(); | 239 if (_isDone && _eventQueue.isEmpty) return new Future.value(); |
| 246 if (_subscription == null) _subscription = _sourceStream.listen(null); | 240 return _cancel(); |
| 247 var future = _subscription.cancel(); | |
| 248 _onDone(); | |
| 249 return future; | |
| 250 } | 241 } |
| 251 | 242 |
| 243 /// Cancels the underlying event source. | |
| 244 Future _cancel(); | |
|
nweiz
2015/08/21 19:43:57
It might be clearer to explicitly divide these pri
Lasse Reichstein Nielsen
2015/08/24 08:31:34
Good idea.
| |
| 245 | |
| 252 /// Returns an error for when a request is made after cancel. | 246 /// Returns an error for when a request is made after cancel. |
| 253 /// | 247 /// |
| 254 /// Returns a [StateError] with a message saying that either | 248 /// Returns a [StateError] with a message saying that either |
| 255 /// [cancel] or [rest] have already been called. | 249 /// [cancel] or [rest] have already been called. |
| 256 Error _failClosed() { | 250 Error _failClosed() { |
| 257 return new StateError("Already cancelled"); | 251 return new StateError("Already cancelled"); |
| 258 } | 252 } |
| 259 | 253 |
| 260 // Callbacks receiving the events of the source stream. | 254 // Callbacks receiving the events of the source stream. |
| 261 | 255 void _addResult(Result result) { |
|
nweiz
2015/08/21 19:43:57
Nit: keep this empty line.
Lasse Reichstein Nielsen
2015/08/24 08:31:34
I've documented the function instead.
| |
| 262 void _onData(T data) { | 256 _eventQueue.add(result); |
| 263 _eventQueue.add(new Result.value(data)); | |
| 264 _checkQueues(); | |
| 265 } | |
| 266 | |
| 267 void _onError(error, StackTrace stack) { | |
| 268 _eventQueue.add(new Result.error(error, stack)); | |
| 269 _checkQueues(); | 257 _checkQueues(); |
| 270 } | 258 } |
| 271 | 259 |
| 272 void _onDone() { | 260 void _onDone() { |
|
nweiz
2015/08/21 19:43:57
Document this.
Lasse Reichstein Nielsen
2015/08/24 08:31:33
Done.
| |
| 273 _subscription = null; | |
| 274 _isDone = true; | 261 _isDone = true; |
| 275 _closeAllRequests(); | 262 _checkQueues(); |
| 276 } | 263 } |
| 277 | 264 |
| 278 // Request queue management. | 265 // Request queue management. |
| 279 | 266 |
| 280 /// Adds a new request to the queue. | 267 /// Adds a new request to the queue. |
| 281 void _addRequest(_EventRequest request) { | 268 void _addRequest(_EventRequest request) { |
| 282 if (_isDone) { | |
| 283 assert(_requestQueue.isEmpty); | |
| 284 if (!request.addEvents(_eventQueue)) { | |
| 285 request.close(_eventQueue); | |
| 286 } | |
| 287 return; | |
| 288 } | |
| 289 if (_requestQueue.isEmpty) { | 269 if (_requestQueue.isEmpty) { |
| 290 if (request.addEvents(_eventQueue)) return; | 270 if (request.addEvents(_eventQueue, _isDone)) return; |
| 291 _ensureListening(); | 271 _ensureListening(); |
| 292 } | 272 } |
| 293 _requestQueue.add(request); | 273 _requestQueue.add(request); |
| 294 } | 274 } |
| 295 | 275 |
| 296 /// Ensures that we are listening on events from [_sourceStream]. | 276 /// Ensures that we are listening on events from the event source. |
| 297 /// | 277 /// |
| 298 /// Resumes subscription on [_sourceStream], or creates it if necessary. | 278 /// Starts listening for the first time or resumes after a [_pause]. |
| 279 void _ensureListening(); | |
| 280 | |
| 281 /// Matches events with requests. | |
| 282 /// | |
| 283 /// Called after receiving an event or when the event source closes. | |
| 284 /// May be called by requests which have stalled after the event queue | |
|
nweiz
2015/08/21 19:43:57
"stalled" is confusing for me here—it's not a term
Lasse Reichstein Nielsen
2015/08/24 08:31:34
Done.
| |
| 285 /// is empty or when the event source is done. | |
| 286 void _checkQueues() { | |
| 287 while (_requestQueue.isNotEmpty) { | |
| 288 if (_requestQueue.first.addEvents(_eventQueue, _isDone)) { | |
| 289 _requestQueue.removeFirst(); | |
| 290 } else { | |
| 291 return; | |
| 292 } | |
| 293 } | |
| 294 if (!_isDone) { | |
|
nweiz
2015/08/21 19:43:57
Nit: I find it tends to look better when block-lev
Lasse Reichstein Nielsen
2015/08/24 08:31:34
I've added some extra lines (not all between block
| |
| 295 _pause(); | |
| 296 } | |
| 297 } | |
| 298 | |
| 299 /// Requests that the event source pauses events. | |
| 300 /// | |
| 301 /// The event source is restarted by the next call to [_ensureListening]. | |
| 302 void _pause(); | |
| 303 | |
| 304 /// Extracts a stream from the event source and makes this stream queue | |
| 305 /// unusable. | |
| 306 /// | |
| 307 /// Can only be used by the very last request. | |
| 308 /// Only used by [rest]. | |
| 309 Stream _extractStream(); | |
| 310 } | |
| 311 | |
| 312 | |
| 313 /// The default implementation of [StreamQueue]. | |
| 314 /// | |
| 315 /// This queue gets its events from a stream which is listened | |
| 316 /// to when a request needs events. | |
| 317 class _StreamQueue<T> extends StreamQueue<T> { | |
| 318 | |
|
nweiz
2015/08/21 19:43:57
Nit: extra newline.
Lasse Reichstein Nielsen
2015/08/24 08:31:33
Done.
| |
| 319 /// Source of events. | |
| 320 final Stream _sourceStream; | |
| 321 | |
| 322 /// Subscription on [_sourceStream] while listening for events. | |
| 323 /// | |
| 324 /// Set to subscription when listening, and set to `null` when the | |
| 325 /// subscription is done (and [_isDone] is set to true). | |
| 326 StreamSubscription _subscription; | |
| 327 | |
| 328 _StreamQueue(this._sourceStream) : super._(); | |
| 329 | |
| 330 Future _cancel() { | |
| 331 if (_isDone) return null; | |
| 332 if (_subscription == null) _subscription = _sourceStream.listen(null); | |
| 333 var future = _subscription.cancel(); | |
| 334 _onDone(); | |
| 335 return future; | |
| 336 } | |
| 337 | |
| 338 void _onData(T data) { | |
|
nweiz
2015/08/21 19:43:57
For what it's worth, these methods are so small I'
Lasse Reichstein Nielsen
2015/08/24 08:31:33
Done.
| |
| 339 _addResult(new Result.value(data)); | |
| 340 } | |
| 341 | |
| 342 void _onError(error, StackTrace stack) { | |
| 343 _addResult(new Result.error(error, stack)); | |
| 344 } | |
| 345 | |
| 346 void _onDone() { | |
| 347 _subscription = null; | |
| 348 super._onDone(); | |
| 349 } | |
| 350 | |
| 299 void _ensureListening() { | 351 void _ensureListening() { |
| 300 assert(!_isDone); | 352 assert(!_isDone); |
| 301 if (_subscription == null) { | 353 if (_subscription == null) { |
| 302 _subscription = | 354 _subscription = |
| 303 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | 355 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
| 304 } else { | 356 } else { |
| 305 _subscription.resume(); | 357 _subscription.resume(); |
| 306 } | 358 } |
| 307 } | 359 } |
| 308 | 360 |
| 309 /// Removes all requests and closes them. | 361 void _pause() { |
| 310 /// | 362 _subscription.pause(); |
| 311 /// Used when the source stream is done. | |
| 312 /// After this, no further requests will be added to the queue, | |
| 313 /// requests are immediately served entirely by events already in the event | |
| 314 /// queue, if any. | |
| 315 void _closeAllRequests() { | |
| 316 assert(_isDone); | |
| 317 while (_requestQueue.isNotEmpty) { | |
| 318 var request = _requestQueue.removeFirst(); | |
| 319 if (!request.addEvents(_eventQueue)) { | |
| 320 request.close(_eventQueue); | |
| 321 } | |
| 322 } | |
| 323 } | 363 } |
| 324 | 364 |
| 325 /// Matches events with requests. | 365 Stream<T> _extractStream() { |
| 326 /// | 366 assert(_isClosed); |
| 327 /// Called after receiving an event. | 367 if (_isDone) { |
| 328 void _checkQueues() { | 368 return new Stream<T>.empty(); |
| 329 while (_requestQueue.isNotEmpty) { | |
| 330 if (_requestQueue.first.addEvents(_eventQueue)) { | |
| 331 _requestQueue.removeFirst(); | |
| 332 } else { | |
| 333 return; | |
| 334 } | |
| 335 } | 369 } |
| 336 if (!_isDone) { | 370 if (_subscription == null) { |
| 337 _subscription.pause(); | 371 return _sourceStream; |
|
nweiz
2015/08/21 19:43:57
Nit: consider making this a single-line if.
Lasse Reichstein Nielsen
2015/08/24 08:31:34
I prefer to not hide significant control flow (ret
| |
| 338 } | 372 } |
| 339 } | |
| 340 | |
| 341 /// Extracts the subscription and makes this stream queue unusable. | |
| 342 /// | |
| 343 /// Can only be used by the very last request. | |
| 344 StreamSubscription _dispose() { | |
| 345 assert(_isClosed); | |
| 346 var subscription = _subscription; | 373 var subscription = _subscription; |
| 347 _subscription = null; | 374 _subscription = null; |
| 348 _isDone = true; | 375 _isDone = true; |
| 349 return subscription; | 376 if (!subscription.isPaused) subscription.pause(); |
|
nweiz
2015/08/21 19:43:57
Why pause and re-resume it?
Lasse Reichstein Nielsen
2015/08/24 08:31:34
We need to resume it if it was paused, because the
| |
| 377 var result = new SubscriptionStream<T>(subscription); | |
| 378 subscription.resume(); | |
| 379 return result; | |
| 350 } | 380 } |
| 351 } | 381 } |
| 352 | 382 |
| 383 | |
| 384 | |
|
nweiz
2015/08/21 19:43:57
Nit: three newlines here seems excessive. It's als
Lasse Reichstein Nielsen
2015/08/24 08:31:33
True, should only be two.
| |
| 353 /// Request object that receives events when they arrive, until fulfilled. | 385 /// Request object that receives events when they arrive, until fulfilled. |
| 354 /// | 386 /// |
| 355 /// Each request that cannot be fulfilled immediately is represented by | 387 /// Each request that cannot be fulfilled immediately is represented by |
| 356 /// an `_EventRequest` object in the request queue. | 388 /// an `_EventRequest` object in the request queue. |
| 357 /// | 389 /// |
| 358 /// Events from the source stream are sent to the first request in the | 390 /// Events from the source stream are sent to the first request in the |
| 359 /// queue until it reports itself as [isComplete]. | 391 /// queue until it reports itself as [isComplete]. |
| 360 /// | 392 /// |
| 361 /// When the first request in the queue `isComplete`, either when becoming | 393 /// When the first request in the queue `isComplete`, either when becoming |
| 362 /// the first request or after receiving an event, its [close] methods is | 394 /// the first request or after receiving an event, its [close] methods is |
| (...skipping 12 matching lines...) Expand all Loading... | |
| 375 /// more events. | 407 /// more events. |
| 376 /// The call may keep events in the queue until the requeust is complete, | 408 /// The call may keep events in the queue until the requeust is complete, |
| 377 /// or it may remove them immediately. | 409 /// or it may remove them immediately. |
| 378 /// | 410 /// |
| 379 /// If the method returns true, the request is considered fulfilled, and | 411 /// If the method returns true, the request is considered fulfilled, and |
| 380 /// will never be called again. | 412 /// will never be called again. |
| 381 /// | 413 /// |
| 382 /// This method is called when a request reaches the front of the request | 414 /// This method is called when a request reaches the front of the request |
| 383 /// queue, and if it returns `false`, it's called again every time a new event | 415 /// queue, and if it returns `false`, it's called again every time a new event |
| 384 /// becomes available, or when the stream closes. | 416 /// becomes available, or when the stream closes. |
| 385 bool addEvents(Queue<Result> events); | 417 /// If the function returns `false` when the stream has already closed |
|
nweiz
2015/08/21 19:43:57
"the function" -> "this method" or "it"
What are
Lasse Reichstein Nielsen
2015/08/23 11:30:00
That's exactly for "lookahead". The look-ahead may
nweiz
2015/08/25 00:38:21
SGTM
| |
| 386 | 418 /// ([isDone] is true), then the request must call [StreamQueue._checkQueues] |
| 387 /// Complete the request. | 419 /// itself when it's ready to continue. |
| 388 /// | 420 bool addEvents(Queue<Result> events, bool isDone); |
| 389 /// This is called when the source stream is done before the request | |
| 390 /// had a chance to receive all its events. That is, after a call | |
| 391 /// to [addEvents] has returned `false`. | |
| 392 /// If there are any unused events available, they are in the [events] queue. | |
| 393 /// No further events will become available. | |
| 394 /// | |
| 395 /// The queue should only remove events from the front of the event queue, | |
| 396 /// e.g., using [removeFirst]. | |
| 397 /// | |
| 398 /// If the request kept events in the queue after an [addEvents] call, | |
| 399 /// this is the last chance to use them. | |
| 400 void close(Queue<Result> events); | |
| 401 } | 421 } |
| 402 | 422 |
| 403 /// Request for a [StreamQueue.next] call. | 423 /// Request for a [StreamQueue.next] call. |
| 404 /// | 424 /// |
| 405 /// Completes the returned future when receiving the first event, | 425 /// Completes the returned future when receiving the first event, |
| 406 /// and is then complete. | 426 /// and is then complete. |
| 407 class _NextRequest<T> implements _EventRequest { | 427 class _NextRequest<T> implements _EventRequest { |
| 408 /// Completer for the future returned by [StreamQueue.next]. | 428 /// Completer for the future returned by [StreamQueue.next]. |
| 409 final Completer _completer; | 429 final Completer _completer; |
| 410 | 430 |
| 411 _NextRequest() : _completer = new Completer<T>(); | 431 _NextRequest() : _completer = new Completer<T>(); |
| 412 | 432 |
| 413 Future<T> get future => _completer.future; | 433 Future<T> get future => _completer.future; |
| 414 | 434 |
| 415 bool addEvents(Queue<Result> events) { | 435 bool addEvents(Queue<Result> events, bool isDone) { |
| 416 if (events.isEmpty) return false; | 436 if (events.isNotEmpty) { |
| 417 events.removeFirst().complete(_completer); | 437 events.removeFirst().complete(_completer); |
| 418 return true; | 438 return true; |
| 419 } | 439 } |
| 420 | 440 if (isDone) { |
| 421 void close(Queue<Result> events) { | 441 var errorFuture = |
| 422 var errorFuture = | 442 new Future.sync(() => throw new StateError("No elements")); |
| 423 new Future.sync(() => throw new StateError("No elements")); | 443 _completer.complete(errorFuture); |
| 424 _completer.complete(errorFuture); | 444 return true; |
| 445 } | |
| 446 return false; | |
| 425 } | 447 } |
| 426 } | 448 } |
| 427 | 449 |
| 428 /// Request for a [StreamQueue.skip] call. | 450 /// Request for a [StreamQueue.skip] call. |
| 429 class _SkipRequest implements _EventRequest { | 451 class _SkipRequest implements _EventRequest { |
| 430 /// Completer for the future returned by the skip call. | 452 /// Completer for the future returned by the skip call. |
| 431 final Completer _completer = new Completer<int>(); | 453 final Completer _completer = new Completer<int>(); |
| 432 | 454 |
| 433 /// Number of remaining events to skip. | 455 /// Number of remaining events to skip. |
| 434 /// | 456 /// |
| 435 /// The request [isComplete] when the values reaches zero. | 457 /// The request [isComplete] when the values reaches zero. |
| 436 /// | 458 /// |
| 437 /// Decremented when an event is seen. | 459 /// Decremented when an event is seen. |
| 438 /// Set to zero when an error is seen since errors abort the skip request. | 460 /// Set to zero when an error is seen since errors abort the skip request. |
| 439 int _eventsToSkip; | 461 int _eventsToSkip; |
| 440 | 462 |
| 441 _SkipRequest(this._eventsToSkip); | 463 _SkipRequest(this._eventsToSkip); |
| 442 | 464 |
| 443 /// The future completed when the correct number of events have been skipped. | 465 /// The future completed when the correct number of events have been skipped. |
| 444 Future get future => _completer.future; | 466 Future get future => _completer.future; |
| 445 | 467 |
| 446 bool addEvents(Queue<Result> events) { | 468 bool addEvents(Queue<Result> events, bool isDone) { |
| 447 while (_eventsToSkip > 0) { | 469 while (_eventsToSkip > 0) { |
| 448 if (events.isEmpty) return false; | 470 if (events.isEmpty) { |
| 471 if (isDone) break; | |
| 472 return false; | |
| 473 } | |
| 449 _eventsToSkip--; | 474 _eventsToSkip--; |
| 450 var event = events.removeFirst(); | 475 var event = events.removeFirst(); |
| 451 if (event.isError) { | 476 if (event.isError) { |
| 452 event.complete(_completer); | 477 event.complete(_completer); |
| 453 return true; | 478 return true; |
| 454 } | 479 } |
| 455 } | 480 } |
| 456 _completer.complete(0); | 481 _completer.complete(_eventsToSkip); |
| 457 return true; | 482 return true; |
| 458 } | 483 } |
| 459 | |
| 460 void close(Queue<Result> events) { | |
| 461 _completer.complete(_eventsToSkip); | |
| 462 } | |
| 463 } | 484 } |
| 464 | 485 |
| 465 /// Request for a [StreamQueue.take] call. | 486 /// Request for a [StreamQueue.take] call. |
| 466 class _TakeRequest<T> implements _EventRequest { | 487 class _TakeRequest<T> implements _EventRequest { |
| 467 /// Completer for the future returned by the take call. | 488 /// Completer for the future returned by the take call. |
| 468 final Completer _completer; | 489 final Completer _completer; |
| 469 | 490 |
| 470 /// List collecting events until enough have been seen. | 491 /// List collecting events until enough have been seen. |
| 471 final List _list = <T>[]; | 492 final List _list = <T>[]; |
| 472 | 493 |
| 473 /// Number of events to capture. | 494 /// Number of events to capture. |
| 474 /// | 495 /// |
| 475 /// The request [isComplete] when the length of [_list] reaches | 496 /// The request [isComplete] when the length of [_list] reaches |
| 476 /// this value. | 497 /// this value. |
| 477 final int _eventsToTake; | 498 final int _eventsToTake; |
| 478 | 499 |
| 479 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | 500 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
| 480 | 501 |
| 481 /// The future completed when the correct number of events have been captured. | 502 /// The future completed when the correct number of events have been captured. |
| 482 Future get future => _completer.future; | 503 Future get future => _completer.future; |
| 483 | 504 |
| 484 bool addEvents(Queue<Result> events) { | 505 bool addEvents(Queue<Result> events, bool isDone) { |
| 485 while (_list.length < _eventsToTake) { | 506 while (_list.length < _eventsToTake) { |
| 486 if (events.isEmpty) return false; | 507 if (events.isEmpty) { |
| 508 if (isDone) break; | |
| 509 return false; | |
| 510 } | |
| 487 var result = events.removeFirst(); | 511 var result = events.removeFirst(); |
| 488 if (result.isError) { | 512 if (result.isError) { |
| 489 result.complete(_completer); | 513 result.complete(_completer); |
| 490 return true; | 514 return true; |
| 491 } | 515 } |
| 492 _list.add(result.asValue.value); | 516 _list.add(result.asValue.value); |
| 493 } | 517 } |
| 494 _completer.complete(_list); | 518 _completer.complete(_list); |
| 495 return true; | 519 return true; |
| 496 } | 520 } |
| 497 | |
| 498 void close(Queue<Result> events) { | |
| 499 _completer.complete(_list); | |
| 500 } | |
| 501 } | 521 } |
| 502 | 522 |
| 503 /// Request for a [StreamQueue.cancel] call. | 523 /// Request for a [StreamQueue.cancel] call. |
| 504 /// | 524 /// |
| 505 /// The request needs no events, it just waits in the request queue | 525 /// The request needs no events, it just waits in the request queue |
| 506 /// until all previous events are fulfilled, then it cancels the stream queue | 526 /// until all previous events are fulfilled, then it cancels the stream queue |
| 507 /// source subscription. | 527 /// source subscription. |
| 508 class _CancelRequest implements _EventRequest { | 528 class _CancelRequest implements _EventRequest { |
| 509 /// Completer for the future returned by the `cancel` call. | 529 /// Completer for the future returned by the `cancel` call. |
| 510 final Completer _completer = new Completer(); | 530 final Completer _completer = new Completer(); |
| 511 | 531 |
| 512 /// The [StreamQueue] object that has this request queued. | 532 /// The [StreamQueue] object that has this request queued. |
| 513 /// | 533 /// |
| 514 /// When the event is completed, it needs to cancel the active subscription | 534 /// When the event is completed, it needs to cancel the active subscription |
| 515 /// of the `StreamQueue` object, if any. | 535 /// of the `StreamQueue` object, if any. |
| 516 final StreamQueue _streamQueue; | 536 final StreamQueue _streamQueue; |
| 517 | 537 |
| 518 _CancelRequest(this._streamQueue); | 538 _CancelRequest(this._streamQueue); |
| 519 | 539 |
| 520 /// The future completed when the cancel request is completed. | 540 /// The future completed when the cancel request is completed. |
| 521 Future get future => _completer.future; | 541 Future get future => _completer.future; |
| 522 | 542 |
| 523 bool addEvents(Queue<Result> events) { | 543 bool addEvents(Queue<Result> events, bool isDone) { |
| 524 _shutdown(); | 544 _completer.complete(_streamQueue._cancel()); |
| 525 return true; | 545 return true; |
| 526 } | 546 } |
| 527 | |
| 528 void close(_) { | |
| 529 _shutdown(); | |
| 530 } | |
| 531 | |
| 532 void _shutdown() { | |
| 533 if (_streamQueue._subscription == null) { | |
| 534 _completer.complete(); | |
| 535 } else { | |
| 536 _completer.complete(_streamQueue._dispose().cancel()); | |
| 537 } | |
| 538 } | |
| 539 } | 547 } |
| 540 | 548 |
| 541 /// Request for a [StreamQueue.rest] call. | 549 /// Request for a [StreamQueue.rest] call. |
| 542 /// | 550 /// |
| 543 /// The request is always complete, it just waits in the request queue | 551 /// The request is always complete, it just waits in the request queue |
| 544 /// until all previous events are fulfilled, then it takes over the | 552 /// until all previous events are fulfilled, then it takes over the |
| 545 /// stream events subscription and creates a stream from it. | 553 /// stream events subscription and creates a stream from it. |
| 546 class _RestRequest<T> implements _EventRequest { | 554 class _RestRequest<T> implements _EventRequest { |
| 547 /// Completer for the stream returned by the `rest` call. | 555 /// Completer for the stream returned by the `rest` call. |
| 548 final StreamCompleter _completer = new StreamCompleter<T>(); | 556 final StreamCompleter _completer = new StreamCompleter<T>(); |
| 549 | 557 |
| 550 /// The [StreamQueue] object that has this request queued. | 558 /// The [StreamQueue] object that has this request queued. |
| 551 /// | 559 /// |
| 552 /// When the event is completed, it needs to cancel the active subscription | 560 /// When the event is completed, it needs to cancel the active subscription |
| 553 /// of the `StreamQueue` object, if any. | 561 /// of the `StreamQueue` object, if any. |
| 554 final StreamQueue _streamQueue; | 562 final StreamQueue _streamQueue; |
| 555 | 563 |
| 556 _RestRequest(this._streamQueue); | 564 _RestRequest(this._streamQueue); |
| 557 | 565 |
| 558 /// The stream which will contain the remaining events of [_streamQueue]. | 566 /// The stream which will contain the remaining events of [_streamQueue]. |
| 559 Stream<T> get stream => _completer.stream; | 567 Stream<T> get stream => _completer.stream; |
| 560 | 568 |
| 561 bool addEvents(Queue<Result> events) { | 569 bool addEvents(Queue<Result> events, bool isDone) { |
| 562 _completeStream(events); | |
| 563 return true; | |
| 564 } | |
| 565 | |
| 566 void close(Queue<Result> events) { | |
| 567 _completeStream(events); | |
| 568 } | |
| 569 | |
| 570 void _completeStream(Queue<Result> events) { | |
| 571 if (events.isEmpty) { | 570 if (events.isEmpty) { |
| 572 if (_streamQueue._isDone) { | 571 if (_streamQueue._isDone) { |
| 573 _completer.setEmpty(); | 572 _completer.setEmpty(); |
| 574 } else { | 573 } else { |
| 575 _completer.setSourceStream(_getRestStream()); | 574 _completer.setSourceStream(_streamQueue._extractStream()); |
| 576 } | 575 } |
| 577 } else { | 576 } else { |
| 578 // There are prefetched events which needs to be added before the | 577 // There are prefetched events which needs to be added before the |
| 579 // remaining stream. | 578 // remaining stream. |
| 580 var controller = new StreamController<T>(); | 579 var controller = new StreamController<T>(); |
| 581 for (var event in events) { | 580 for (var event in events) { |
| 582 event.addTo(controller); | 581 event.addTo(controller); |
| 583 } | 582 } |
| 584 controller.addStream(_getRestStream(), cancelOnError: false) | 583 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) |
| 585 .whenComplete(controller.close); | 584 .whenComplete(controller.close); |
| 586 _completer.setSourceStream(controller.stream); | 585 _completer.setSourceStream(controller.stream); |
| 587 } | 586 } |
| 588 } | 587 return true; |
| 589 | |
| 590 /// Create a stream from the rest of [_streamQueue]'s subscription. | |
| 591 Stream _getRestStream() { | |
| 592 if (_streamQueue._isDone) { | |
| 593 var controller = new StreamController<T>()..close(); | |
| 594 return controller.stream; | |
| 595 // TODO(lrn). Use the following when 1.11 is released. | |
| 596 // return new Stream<T>.empty(); | |
| 597 } | |
| 598 if (_streamQueue._subscription == null) { | |
| 599 return _streamQueue._sourceStream; | |
| 600 } | |
| 601 var subscription = _streamQueue._dispose(); | |
| 602 subscription.resume(); | |
| 603 return new SubscriptionStream<T>(subscription); | |
| 604 } | 588 } |
| 605 } | 589 } |
| 606 | 590 |
| 607 /// Request for a [StreamQueue.hasNext] call. | 591 /// Request for a [StreamQueue.hasNext] call. |
| 608 /// | 592 /// |
| 609 /// Completes the [future] with `true` if it sees any event, | 593 /// Completes the [future] with `true` if it sees any event, |
| 610 /// but doesn't consume the event. | 594 /// but doesn't consume the event. |
| 611 /// If the request is closed without seeing an event, then | 595 /// If the request is closed without seeing an event, then |
| 612 /// the [future] is completed with `false`. | 596 /// the [future] is completed with `false`. |
| 613 class _HasNextRequest<T> implements _EventRequest { | 597 class _HasNextRequest<T> implements _EventRequest { |
| 614 final Completer _completer = new Completer<bool>(); | 598 final Completer _completer = new Completer<bool>(); |
| 615 | 599 |
| 616 Future<bool> get future => _completer.future; | 600 Future<bool> get future => _completer.future; |
| 617 | 601 |
| 618 bool addEvents(Queue<Result> events) { | 602 bool addEvents(Queue<Result> events, bool isDone) { |
| 619 if (events.isNotEmpty) { | 603 if (events.isNotEmpty) { |
| 620 _completer.complete(true); | 604 _completer.complete(true); |
| 621 return true; | 605 return true; |
| 622 } | 606 } |
| 607 if (isDone) { | |
| 608 _completer.complete(false); | |
| 609 return true; | |
| 610 } | |
| 623 return false; | 611 return false; |
| 624 } | 612 } |
| 625 | |
| 626 void close(_) { | |
| 627 _completer.complete(false); | |
| 628 } | |
| 629 } | 613 } |
| OLD | NEW |