| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library async.stream_events; | |
| 6 | |
| 7 import 'dart:async'; | 5 import 'dart:async'; |
| 8 import 'dart:collection'; | 6 import 'dart:collection'; |
| 9 | 7 |
| 8 import 'package:collection/collection.dart'; |
| 9 |
| 10 import "cancelable_operation.dart"; |
| 11 import "result.dart"; |
| 10 import "subscription_stream.dart"; | 12 import "subscription_stream.dart"; |
| 11 import "stream_completer.dart"; | 13 import "stream_completer.dart"; |
| 12 import "../result.dart"; | 14 import "stream_splitter.dart"; |
| 13 | 15 |
| 14 /// An asynchronous pull-based interface for accessing stream events. | 16 /// An asynchronous pull-based interface for accessing stream events. |
| 15 /// | 17 /// |
| 16 /// Wraps a stream and makes individual events available on request. | 18 /// Wraps a stream and makes individual events available on request. |
| 17 /// | 19 /// |
| 18 /// You can request (and reserve) one or more events from the stream, | 20 /// You can request (and reserve) one or more events from the stream, |
| 19 /// and after all previous requests have been fulfilled, stream events | 21 /// and after all previous requests have been fulfilled, stream events |
| 20 /// go towards fulfilling your request. | 22 /// go towards fulfilling your request. |
| 21 /// | 23 /// |
| 22 /// For example, if you ask for [next] two times, the returned futures | 24 /// For example, if you ask for [next] two times, the returned futures |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 80 // by the content of the fifth event. | 82 // by the content of the fifth event. |
| 81 | 83 |
| 82 /// Whether the event source is done. | 84 /// Whether the event source is done. |
| 83 bool _isDone = false; | 85 bool _isDone = false; |
| 84 | 86 |
| 85 /// Whether a closing operation has been performed on the stream queue. | 87 /// Whether a closing operation has been performed on the stream queue. |
| 86 /// | 88 /// |
| 87 /// Closing operations are [cancel] and [rest]. | 89 /// Closing operations are [cancel] and [rest]. |
| 88 bool _isClosed = false; | 90 bool _isClosed = false; |
| 89 | 91 |
| 92 /// The number of events dispatched by this queue. |
| 93 /// |
| 94 /// This counts error events. It doesn't count done events, or events |
| 95 /// dispatched to a stream returned by [rest]. |
| 96 int get eventsDispatched => _eventsReceived - _eventQueue.length; |
| 97 |
| 98 /// The number of events received by this queue. |
| 99 var _eventsReceived = 0; |
| 100 |
| 90 /// Queue of events not used by a request yet. | 101 /// Queue of events not used by a request yet. |
| 91 final Queue<Result> _eventQueue = new Queue(); | 102 final QueueList<Result> _eventQueue = new QueueList(); |
| 92 | 103 |
| 93 /// Queue of pending requests. | 104 /// Queue of pending requests. |
| 94 /// | 105 /// |
| 95 /// Access through methods below to ensure consistency. | 106 /// Access through methods below to ensure consistency. |
| 96 final Queue<_EventRequest> _requestQueue = new Queue(); | 107 final Queue<_EventRequest> _requestQueue = new Queue(); |
| 97 | 108 |
| 98 /// Create a `StreamQueue` of the events of [source]. | 109 /// Create a `StreamQueue` of the events of [source]. |
| 99 factory StreamQueue(Stream source) = _StreamQueue<T>; | 110 factory StreamQueue(Stream<T> source) = _StreamQueue<T>; |
| 100 | 111 |
| 101 StreamQueue._(); | 112 StreamQueue._(); |
| 102 | 113 |
| 103 /// Asks if the stream has any more events. | 114 /// Asks if the stream has any more events. |
| 104 /// | 115 /// |
| 105 /// Returns a future that completes with `true` if the stream has any | 116 /// Returns a future that completes with `true` if the stream has any |
| 106 /// more events, whether data or error. | 117 /// more events, whether data or error. |
| 107 /// If the stream closes without producing any more events, the returned | 118 /// If the stream closes without producing any more events, the returned |
| 108 /// future completes with `false`. | 119 /// future completes with `false`. |
| 109 /// | 120 /// |
| 110 /// Can be used before using [next] to avoid getting an error in the | 121 /// Can be used before using [next] to avoid getting an error in the |
| 111 /// future returned by `next` in the case where there are no more events. | 122 /// future returned by `next` in the case where there are no more events. |
| 112 /// Another alternative is to use `take(1)` which returns either zero or | 123 /// Another alternative is to use `take(1)` which returns either zero or |
| 113 /// one events. | 124 /// one events. |
| 114 Future<bool> get hasNext { | 125 Future<bool> get hasNext { |
| 115 if (!_isClosed) { | 126 if (!_isClosed) { |
| 116 var hasNextRequest = new _HasNextRequest(); | 127 var hasNextRequest = new _HasNextRequest(); |
| 117 _addRequest(hasNextRequest); | 128 _addRequest(hasNextRequest); |
| 118 return hasNextRequest.future; | 129 return hasNextRequest.future; |
| 119 } | 130 } |
| 120 throw _failClosed(); | 131 throw _failClosed(); |
| 121 } | 132 } |
| 122 | 133 |
| 134 /// Look at the next [count] data events without consuming them. |
| 135 /// |
| 136 /// Works like [take] except that the events are left in the queue. |
| 137 /// If one of the next [count] events is an error, the returned future |
| 138 /// completes with this error, and the error is still left in the queue. |
| 139 Future<List<T>> lookAhead(int count) { |
| 140 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| 141 if (!_isClosed) { |
| 142 var request = new _LookAheadRequest<T>(count); |
| 143 _addRequest(request); |
| 144 return request.future; |
| 145 } |
| 146 throw _failClosed(); |
| 147 } |
| 148 |
| 123 /// Requests the next (yet unrequested) event from the stream. | 149 /// Requests the next (yet unrequested) event from the stream. |
| 124 /// | 150 /// |
| 125 /// When the requested event arrives, the returned future is completed with | 151 /// When the requested event arrives, the returned future is completed with |
| 126 /// the event. | 152 /// the event. |
| 127 /// If the event is a data event, the returned future completes | 153 /// If the event is a data event, the returned future completes |
| 128 /// with its value. | 154 /// with its value. |
| 129 /// If the event is an error event, the returned future completes with | 155 /// If the event is an error event, the returned future completes with |
| 130 /// its error and stack trace. | 156 /// its error and stack trace. |
| 131 /// If the stream closes before an event arrives, the returned future | 157 /// If the stream closes before an event arrives, the returned future |
| 132 /// completes with a [StateError]. | 158 /// completes with a [StateError]. |
| 133 /// | 159 /// |
| 134 /// It's possible to have several pending [next] calls (or other requests), | 160 /// It's possible to have several pending [next] calls (or other requests), |
| 135 /// and they will be completed in the order they were requested, by the | 161 /// and they will be completed in the order they were requested, by the |
| 136 /// first events that were not consumed by previous requeusts. | 162 /// first events that were not consumed by previous requeusts. |
| 137 Future<T> get next { | 163 Future<T> get next { |
| 138 if (!_isClosed) { | 164 if (!_isClosed) { |
| 139 var nextRequest = new _NextRequest<T>(); | 165 var nextRequest = new _NextRequest<T>(); |
| 140 _addRequest(nextRequest); | 166 _addRequest(nextRequest); |
| 141 return nextRequest.future; | 167 return nextRequest.future; |
| 142 } | 168 } |
| 143 throw _failClosed(); | 169 throw _failClosed(); |
| 144 } | 170 } |
| 145 | 171 |
| 172 /// Looks at the next (yet unrequested) event from the stream. |
| 173 /// |
| 174 /// Like [next] except that the event is not consumed. |
| 175 /// If the next event is an error event, it stays in the queue. |
| 176 Future<T> get peek { |
| 177 if (!_isClosed) { |
| 178 var nextRequest = new _PeekRequest<T>(); |
| 179 _addRequest(nextRequest); |
| 180 return nextRequest.future; |
| 181 } |
| 182 throw _failClosed(); |
| 183 } |
| 184 |
| 146 /// Returns a stream of all the remaning events of the source stream. | 185 /// Returns a stream of all the remaning events of the source stream. |
| 147 /// | 186 /// |
| 148 /// All requested [next], [skip] or [take] operations are completed | 187 /// All requested [next], [skip] or [take] operations are completed |
| 149 /// first, and then any remaining events are provided as events of | 188 /// first, and then any remaining events are provided as events of |
| 150 /// the returned stream. | 189 /// the returned stream. |
| 151 /// | 190 /// |
| 152 /// Using `rest` closes this stream queue. After getting the | 191 /// Using `rest` closes this stream queue. After getting the |
| 153 /// `rest` the caller may no longer request other events, like | 192 /// `rest` the caller may no longer request other events, like |
| 154 /// after calling [cancel]. | 193 /// after calling [cancel]. |
| 155 Stream<T> get rest { | 194 Stream<T> get rest { |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 205 Future<List<T>> take(int count) { | 244 Future<List<T>> take(int count) { |
| 206 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 245 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| 207 if (!_isClosed) { | 246 if (!_isClosed) { |
| 208 var request = new _TakeRequest<T>(count); | 247 var request = new _TakeRequest<T>(count); |
| 209 _addRequest(request); | 248 _addRequest(request); |
| 210 return request.future; | 249 return request.future; |
| 211 } | 250 } |
| 212 throw _failClosed(); | 251 throw _failClosed(); |
| 213 } | 252 } |
| 214 | 253 |
| 254 /// Requests a transaction that can conditionally consume events. |
| 255 /// |
| 256 /// The transaction can create copies of this queue at the current position |
| 257 /// using [StreamQueueTransaction.newQueue]. Each of these queues is |
| 258 /// independent of one another and of the parent queue. The transaction |
| 259 /// finishes when one of two methods is called: |
| 260 /// |
| 261 /// * [StreamQueueTransaction.commit] updates the parent queue's position to |
| 262 /// match that of one of the copies. |
| 263 /// |
| 264 /// * [StreamQueueTransaction.reject] causes the parent queue to continue as |
| 265 /// though [startTransaction] hadn't been called. |
| 266 /// |
| 267 /// Until the transaction finishes, this queue won't emit any events. |
| 268 /// |
| 269 /// See also [withTransaction] and [cancelable]. |
| 270 /// |
| 271 /// ```dart |
| 272 /// /// Consumes all empty lines from the beginning of [lines]. |
| 273 /// Future consumeEmptyLines(StreamQueue<String> lines) async { |
| 274 /// while (await lines.hasNext) { |
| 275 /// var transaction = lines.startTransaction(); |
| 276 /// var queue = transaction.newQueue(); |
| 277 /// if ((await queue.next).isNotEmpty) { |
| 278 /// transaction.reject(); |
| 279 /// return; |
| 280 /// } else { |
| 281 /// transaction.commit(queue); |
| 282 /// } |
| 283 /// } |
| 284 /// } |
| 285 /// ``` |
| 286 StreamQueueTransaction<T> startTransaction() { |
| 287 if (_isClosed) throw _failClosed(); |
| 288 |
| 289 var request = new _TransactionRequest(this); |
| 290 _addRequest(request); |
| 291 return request.transaction; |
| 292 } |
| 293 |
| 294 /// Passes a copy of this queue to [callback], and updates this queue to match |
| 295 /// the copy's position if [callback] returns `true`. |
| 296 /// |
| 297 /// This queue won't emit any events until [callback] returns. If it returns |
| 298 /// `false`, this queue continues as though [withTransaction] hadn't been |
| 299 /// called. If it throws an error, this updates this queue to match the copy's |
| 300 /// position and throws the error from the returned `Future`. |
| 301 /// |
| 302 /// Returns the same value as [callback]. |
| 303 /// |
| 304 /// See also [startTransaction] and [cancelable]. |
| 305 /// |
| 306 /// ```dart |
| 307 /// /// Consumes all empty lines from the beginning of [lines]. |
| 308 /// Future consumeEmptyLines(StreamQueue<String> lines) async { |
| 309 /// while (await lines.hasNext) { |
| 310 /// // Consume a line if it's empty, otherwise return. |
| 311 /// if (!await lines.withTransaction( |
| 312 /// (queue) async => (await queue.next).isEmpty)) { |
| 313 /// return; |
| 314 /// } |
| 315 /// } |
| 316 /// } |
| 317 /// ``` |
| 318 Future<bool> withTransaction(Future<bool> callback(StreamQueue<T> queue)) { |
| 319 var transaction = startTransaction(); |
| 320 |
| 321 /// Avoid async/await to ensure that [startTransaction] is called |
| 322 /// synchronously and so ends up in the right place in the request queue. |
| 323 var queue = transaction.newQueue(); |
| 324 return callback(queue).then((result) { |
| 325 if (result) { |
| 326 transaction.commit(queue); |
| 327 } else { |
| 328 transaction.reject(); |
| 329 } |
| 330 return result; |
| 331 }, onError: (error) { |
| 332 transaction.commit(queue); |
| 333 throw error; |
| 334 }); |
| 335 } |
| 336 |
| 337 /// Passes a copy of this queue to [callback], and updates this queue to match |
| 338 /// the copy's position once [callback] completes. |
| 339 /// |
| 340 /// If the returned [CancelableOperation] is canceled, this queue instead |
| 341 /// continues as though [cancelable] hadn't been called. Otherwise, it emits |
| 342 /// the same value or error as [callback]. |
| 343 /// |
| 344 /// See also [startTransaction] and [withTransaction]. |
| 345 /// |
| 346 /// ```dart |
| 347 /// final _stdinQueue = new StreamQueue(stdin); |
| 348 /// |
| 349 /// /// Returns an operation that completes when the user sends a line to |
| 350 /// /// standard input. |
| 351 /// /// |
| 352 /// /// If the operation is canceled, stops waiting for user input. |
| 353 /// CancelableOperation<String> nextStdinLine() => |
| 354 /// _stdinQueue.cancelable((queue) => queue.next); |
| 355 /// ``` |
| 356 CancelableOperation<S> cancelable<S>( |
| 357 Future<S> callback(StreamQueue<T> queue)) { |
| 358 var transaction = startTransaction(); |
| 359 var completer = new CancelableCompleter<S>(onCancel: () { |
| 360 transaction.reject(); |
| 361 }); |
| 362 |
| 363 var queue = transaction.newQueue(); |
| 364 completer.complete(callback(queue).whenComplete(() { |
| 365 if (!completer.isCanceled) transaction.commit(queue); |
| 366 })); |
| 367 |
| 368 return completer.operation; |
| 369 } |
| 370 |
| 215 /// Cancels the underlying event source. | 371 /// Cancels the underlying event source. |
| 216 /// | 372 /// |
| 217 /// If [immediate] is `false` (the default), the cancel operation waits until | 373 /// If [immediate] is `false` (the default), the cancel operation waits until |
| 218 /// all previously requested events have been processed, then it cancels the | 374 /// all previously requested events have been processed, then it cancels the |
| 219 /// subscription providing the events. | 375 /// subscription providing the events. |
| 220 /// | 376 /// |
| 221 /// If [immediate] is `true`, the source is instead canceled | 377 /// If [immediate] is `true`, the source is instead canceled |
| 222 /// immediately. Any pending events are completed as though the underlying | 378 /// immediately. Any pending events are completed as though the underlying |
| 223 /// stream had closed. | 379 /// stream had closed. |
| 224 /// | 380 /// |
| 225 /// The returned future completes with the result of calling | 381 /// The returned future completes with the result of calling |
| 226 /// `cancel`. | 382 /// `cancel`. |
| 227 /// | 383 /// |
| 228 /// After calling `cancel`, no further events can be requested. | 384 /// After calling `cancel`, no further events can be requested. |
| 229 /// None of [next], [rest], [skip], [take] or [cancel] may be | 385 /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel] |
| 230 /// called again. | 386 /// may be called again. |
| 231 Future cancel({bool immediate: false}) { | 387 Future cancel({bool immediate: false}) { |
| 232 if (_isClosed) throw _failClosed(); | 388 if (_isClosed) throw _failClosed(); |
| 233 _isClosed = true; | 389 _isClosed = true; |
| 234 | 390 |
| 235 if (!immediate) { | 391 if (!immediate) { |
| 236 var request = new _CancelRequest(this); | 392 var request = new _CancelRequest(this); |
| 237 _addRequest(request); | 393 _addRequest(request); |
| 238 return request.future; | 394 return request.future; |
| 239 } | 395 } |
| 240 | 396 |
| (...skipping 28 matching lines...) Expand all Loading... |
| 269 _pause(); | 425 _pause(); |
| 270 } | 426 } |
| 271 } | 427 } |
| 272 | 428 |
| 273 /// Extracts a stream from the event source and makes this stream queue | 429 /// Extracts a stream from the event source and makes this stream queue |
| 274 /// unusable. | 430 /// unusable. |
| 275 /// | 431 /// |
| 276 /// Can only be used by the very last request (the stream queue must | 432 /// Can only be used by the very last request (the stream queue must |
| 277 /// be closed by that request). | 433 /// be closed by that request). |
| 278 /// Only used by [rest]. | 434 /// Only used by [rest]. |
| 279 Stream _extractStream(); | 435 Stream<T> _extractStream(); |
| 280 | 436 |
| 281 /// Requests that the event source pauses events. | 437 /// Requests that the event source pauses events. |
| 282 /// | 438 /// |
| 283 /// This is called automatically when the request queue is empty. | 439 /// This is called automatically when the request queue is empty. |
| 284 /// | 440 /// |
| 285 /// The event source is restarted by the next call to [_ensureListening]. | 441 /// The event source is restarted by the next call to [_ensureListening]. |
| 286 void _pause(); | 442 void _pause(); |
| 287 | 443 |
| 288 /// Ensures that we are listening on events from the event source. | 444 /// Ensures that we are listening on events from the event source. |
| 289 /// | 445 /// |
| 290 /// Starts listening for the first time or resumes after a [_pause]. | 446 /// Starts listening for the first time or resumes after a [_pause]. |
| 291 /// | 447 /// |
| 292 /// Is called automatically if a request requires more events. | 448 /// Is called automatically if a request requires more events. |
| 293 void _ensureListening(); | 449 void _ensureListening(); |
| 294 | 450 |
| 295 /// Cancels the underlying event source. | 451 /// Cancels the underlying event source. |
| 296 Future _cancel(); | 452 Future _cancel(); |
| 297 | 453 |
| 298 // ------------------------------------------------------------------ | 454 // ------------------------------------------------------------------ |
| 299 // Methods called by the event source to add events or say that it's | 455 // Methods called by the event source to add events or say that it's |
| 300 // done. | 456 // done. |
| 301 | 457 |
| 302 /// Called when the event source adds a new data or error event. | 458 /// Called when the event source adds a new data or error event. |
| 303 /// Always calls [_updateRequests] after adding. | 459 /// Always calls [_updateRequests] after adding. |
| 304 void _addResult(Result result) { | 460 void _addResult(Result result) { |
| 461 _eventsReceived++; |
| 305 _eventQueue.add(result); | 462 _eventQueue.add(result); |
| 306 _updateRequests(); | 463 _updateRequests(); |
| 307 } | 464 } |
| 308 | 465 |
| 309 /// Called when the event source is done. | 466 /// Called when the event source is done. |
| 310 /// Always calls [_updateRequests] after adding. | 467 /// Always calls [_updateRequests] after adding. |
| 311 void _close() { | 468 void _close() { |
| 312 _isDone = true; | 469 _isDone = true; |
| 313 _updateRequests(); | 470 _updateRequests(); |
| 314 } | 471 } |
| (...skipping 15 matching lines...) Expand all Loading... |
| 330 /// immediately, it skips the queue. | 487 /// immediately, it skips the queue. |
| 331 void _addRequest(_EventRequest request) { | 488 void _addRequest(_EventRequest request) { |
| 332 if (_requestQueue.isEmpty) { | 489 if (_requestQueue.isEmpty) { |
| 333 if (request.update(_eventQueue, _isDone)) return; | 490 if (request.update(_eventQueue, _isDone)) return; |
| 334 _ensureListening(); | 491 _ensureListening(); |
| 335 } | 492 } |
| 336 _requestQueue.add(request); | 493 _requestQueue.add(request); |
| 337 } | 494 } |
| 338 } | 495 } |
| 339 | 496 |
| 340 | |
| 341 /// The default implementation of [StreamQueue]. | 497 /// The default implementation of [StreamQueue]. |
| 342 /// | 498 /// |
| 343 /// This queue gets its events from a stream which is listened | 499 /// This queue gets its events from a stream which is listened |
| 344 /// to when a request needs events. | 500 /// to when a request needs events. |
| 345 class _StreamQueue<T> extends StreamQueue<T> { | 501 class _StreamQueue<T> extends StreamQueue<T> { |
| 346 /// Source of events. | 502 /// Source of events. |
| 347 final Stream _sourceStream; | 503 final Stream<T> _sourceStream; |
| 348 | 504 |
| 349 /// Subscription on [_sourceStream] while listening for events. | 505 /// Subscription on [_sourceStream] while listening for events. |
| 350 /// | 506 /// |
| 351 /// Set to subscription when listening, and set to `null` when the | 507 /// Set to subscription when listening, and set to `null` when the |
| 352 /// subscription is done (and [_isDone] is set to true). | 508 /// subscription is done (and [_isDone] is set to true). |
| 353 StreamSubscription _subscription; | 509 StreamSubscription<T> _subscription; |
| 354 | 510 |
| 355 _StreamQueue(this._sourceStream) : super._(); | 511 _StreamQueue(this._sourceStream) : super._(); |
| 356 | 512 |
| 357 Future _cancel() { | 513 Future _cancel() { |
| 358 if (_isDone) return null; | 514 if (_isDone) return null; |
| 359 if (_subscription == null) _subscription = _sourceStream.listen(null); | 515 if (_subscription == null) _subscription = _sourceStream.listen(null); |
| 360 var future = _subscription.cancel(); | 516 var future = _subscription.cancel(); |
| 361 _close(); | 517 _close(); |
| 362 return future; | 518 return future; |
| 363 } | 519 } |
| 364 | 520 |
| 365 void _ensureListening() { | 521 void _ensureListening() { |
| 366 assert(!_isDone); | 522 if (_isDone) return; |
| 367 if (_subscription == null) { | 523 if (_subscription == null) { |
| 368 _subscription = | 524 _subscription = _sourceStream.listen((data) { |
| 369 _sourceStream.listen( | 525 _addResult(new Result.value(data)); |
| 370 (data) { | 526 }, onError: (error, StackTrace stackTrace) { |
| 371 _addResult(new Result.value(data)); | 527 _addResult(new Result.error(error, stackTrace)); |
| 372 }, | 528 }, onDone: () { |
| 373 onError: (error, StackTrace stackTrace) { | 529 _subscription = null; |
| 374 _addResult(new Result.error(error, stackTrace)); | 530 this._close(); |
| 375 }, | 531 }); |
| 376 onDone: () { | |
| 377 _subscription = null; | |
| 378 this._close(); | |
| 379 }); | |
| 380 } else { | 532 } else { |
| 381 _subscription.resume(); | 533 _subscription.resume(); |
| 382 } | 534 } |
| 383 } | 535 } |
| 384 | 536 |
| 385 void _pause() { | 537 void _pause() { |
| 386 _subscription.pause(); | 538 _subscription.pause(); |
| 387 } | 539 } |
| 388 | 540 |
| 389 Stream<T> _extractStream() { | 541 Stream<T> _extractStream() { |
| 390 assert(_isClosed); | 542 assert(_isClosed); |
| 391 if (_isDone) { | 543 if (_isDone) { |
| 392 return new Stream<T>.empty(); | 544 return new Stream<T>.empty(); |
| 393 } | 545 } |
| 546 _isDone = true; |
| 394 | 547 |
| 395 if (_subscription == null) { | 548 if (_subscription == null) { |
| 396 return _sourceStream; | 549 return _sourceStream; |
| 397 } | 550 } |
| 398 | 551 |
| 399 var subscription = _subscription; | 552 var subscription = _subscription; |
| 400 _subscription = null; | 553 _subscription = null; |
| 401 _isDone = true; | |
| 402 | 554 |
| 403 var wasPaused = subscription.isPaused; | 555 var wasPaused = subscription.isPaused; |
| 404 var result = new SubscriptionStream<T>(subscription); | 556 var result = new SubscriptionStream<T>(subscription); |
| 405 // Resume after creating stream because that pauses the subscription too. | 557 // Resume after creating stream because that pauses the subscription too. |
| 406 // This way there won't be a short resumption in the middle. | 558 // This way there won't be a short resumption in the middle. |
| 407 if (wasPaused) subscription.resume(); | 559 if (wasPaused) subscription.resume(); |
| 408 return result; | 560 return result; |
| 409 } | 561 } |
| 410 } | 562 } |
| 411 | 563 |
| 564 /// A transaction on a [StreamQueue], created by [StreamQueue.startTransaction]. |
| 565 /// |
| 566 /// Copies of the parent queue may be created using [newQueue]. Calling [commit] |
| 567 /// moves the parent queue to a copy's position, and calling [reject] causes it |
| 568 /// to continue as though [StreamQueue.startTransaction] was never called. |
| 569 class StreamQueueTransaction<T> { |
| 570 /// The parent queue on which this transaction is active. |
| 571 final StreamQueue<T> _parent; |
| 572 |
| 573 /// The splitter that produces copies of the parent queue's stream. |
| 574 final StreamSplitter<T> _splitter; |
| 575 |
| 576 /// Queues created using [newQueue]. |
| 577 final _queues = new Set<StreamQueue>(); |
| 578 |
| 579 /// Whether [commit] has been called. |
| 580 var _committed = false; |
| 581 |
| 582 /// Whether [reject] has been called. |
| 583 var _rejected = false; |
| 584 |
| 585 StreamQueueTransaction._(this._parent, Stream<T> source) |
| 586 : _splitter = new StreamSplitter(source); |
| 587 |
| 588 /// Creates a new copy of the parent queue. |
| 589 /// |
| 590 /// This copy starts at the parent queue's position when |
| 591 /// [StreamQueue.startTransaction] was called. Its position can be committed |
| 592 /// to the parent queue using [commit]. |
| 593 StreamQueue<T> newQueue() { |
| 594 var queue = new StreamQueue(_splitter.split()); |
| 595 _queues.add(queue); |
| 596 return queue; |
| 597 } |
| 598 |
| 599 /// Commits a queue created using [newQueue]. |
| 600 /// |
| 601 /// The parent queue's position is updated to be the same as [queue]'s. |
| 602 /// Further requests on all queues created by this transaction, including |
| 603 /// [queue], will complete as though [cancel] were called with `immediate: |
| 604 /// true`. |
| 605 /// |
| 606 /// Throws a [StateError] if [commit] or [reject] have already been called, or |
| 607 /// if there are pending requests on [queue]. |
| 608 void commit(StreamQueue<T> queue) { |
| 609 _assertActive(); |
| 610 if (!_queues.contains(queue)) { |
| 611 throw new ArgumentError("Queue doesn't belong to this transaction."); |
| 612 } else if (queue._requestQueue.isNotEmpty) { |
| 613 throw new StateError("A queue with pending requests can't be committed."); |
| 614 } |
| 615 _committed = true; |
| 616 |
| 617 // Remove all events from the parent queue that were consumed by the |
| 618 // child queue. |
| 619 for (var j = 0; j < queue.eventsDispatched; j++) { |
| 620 _parent._eventQueue.removeFirst(); |
| 621 } |
| 622 |
| 623 _done(); |
| 624 } |
| 625 |
| 626 /// Rejects this transaction without updating the parent queue. |
| 627 /// |
| 628 /// The parent will continue as though [StreamQueue.startTransaction] hadn't |
| 629 /// been called. Further requests on all queues created by this transaction |
| 630 /// will complete as though [cancel] were called with `immediate: true`. |
| 631 /// |
| 632 /// Throws a [StateError] if [commit] or [reject] have already been called. |
| 633 void reject() { |
| 634 _assertActive(); |
| 635 _rejected = true; |
| 636 _done(); |
| 637 } |
| 638 |
| 639 // Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s |
| 640 // request queue, and runs the next request. |
| 641 void _done() { |
| 642 _splitter.close(); |
| 643 for (var queue in _queues) { |
| 644 queue._cancel(); |
| 645 } |
| 646 |
| 647 assert((_parent._requestQueue.first as _TransactionRequest).transaction == |
| 648 this); |
| 649 _parent._requestQueue.removeFirst(); |
| 650 _parent._updateRequests(); |
| 651 } |
| 652 |
| 653 /// Throws a [StateError] if [accept] or [reject] has already been called. |
| 654 void _assertActive() { |
| 655 if (_committed) { |
| 656 throw new StateError("This transaction has already been accepted."); |
| 657 } else if (_rejected) { |
| 658 throw new StateError("This transaction has already been rejected."); |
| 659 } |
| 660 } |
| 661 } |
| 412 | 662 |
| 413 /// Request object that receives events when they arrive, until fulfilled. | 663 /// Request object that receives events when they arrive, until fulfilled. |
| 414 /// | 664 /// |
| 415 /// Each request that cannot be fulfilled immediately is represented by | 665 /// Each request that cannot be fulfilled immediately is represented by |
| 416 /// an `_EventRequest` object in the request queue. | 666 /// an `_EventRequest` object in the request queue. |
| 417 /// | 667 /// |
| 418 /// Events from the source stream are sent to the first request in the | 668 /// Events from the source stream are sent to the first request in the |
| 419 /// queue until it reports itself as [isComplete]. | 669 /// queue until it reports itself as [isComplete]. |
| 420 /// | 670 /// |
| 421 /// When the first request in the queue `isComplete`, either when becoming | 671 /// When the first request in the queue `isComplete`, either when becoming |
| 422 /// the first request or after receiving an event, its [close] methods is | 672 /// the first request or after receiving an event, its [close] methods is |
| 423 /// called. | 673 /// called. |
| 424 /// | 674 /// |
| 425 /// The [close] method is also called immediately when the source stream | 675 /// The [close] method is also called immediately when the source stream |
| 426 /// is done. | 676 /// is done. |
| 427 abstract class _EventRequest { | 677 abstract class _EventRequest<T> { |
| 428 /// Handle available events. | 678 /// Handle available events. |
| 429 /// | 679 /// |
| 430 /// The available events are provided as a queue. The `update` function | 680 /// The available events are provided as a queue. The `update` function |
| 431 /// should only remove events from the front of the event queue, e.g., | 681 /// should only remove events from the front of the event queue, e.g., |
| 432 /// using [removeFirst]. | 682 /// using [removeFirst]. |
| 433 /// | 683 /// |
| 434 /// Returns `true` if the request is completed, or `false` if it needs | 684 /// Returns `true` if the request is completed, or `false` if it needs |
| 435 /// more events. | 685 /// more events. |
| 436 /// The call may keep events in the queue until the requeust is complete, | 686 /// The call may keep events in the queue until the requeust is complete, |
| 437 /// or it may remove them immediately. | 687 /// or it may remove them immediately. |
| 438 /// | 688 /// |
| 439 /// If the method returns true, the request is considered fulfilled, and | 689 /// If the method returns true, the request is considered fulfilled, and |
| 440 /// will never be called again. | 690 /// will never be called again. |
| 441 /// | 691 /// |
| 442 /// This method is called when a request reaches the front of the request | 692 /// This method is called when a request reaches the front of the request |
| 443 /// queue, and if it returns `false`, it's called again every time a new event | 693 /// queue, and if it returns `false`, it's called again every time a new event |
| 444 /// becomes available, or when the stream closes. | 694 /// becomes available, or when the stream closes. |
| 445 /// If the function returns `false` when the stream has already closed | 695 /// If the function returns `false` when the stream has already closed |
| 446 /// ([isDone] is true), then the request must call | 696 /// ([isDone] is true), then the request must call |
| 447 /// [StreamQueue._updateRequests] itself when it's ready to continue. | 697 /// [StreamQueue._updateRequests] itself when it's ready to continue. |
| 448 bool update(Queue<Result> events, bool isDone); | 698 bool update(QueueList<Result<T>> events, bool isDone); |
| 449 } | 699 } |
| 450 | 700 |
| 451 /// Request for a [StreamQueue.next] call. | 701 /// Request for a [StreamQueue.next] call. |
| 452 /// | 702 /// |
| 453 /// Completes the returned future when receiving the first event, | 703 /// Completes the returned future when receiving the first event, |
| 454 /// and is then complete. | 704 /// and is then complete. |
| 455 class _NextRequest<T> implements _EventRequest { | 705 class _NextRequest<T> implements _EventRequest<T> { |
| 456 /// Completer for the future returned by [StreamQueue.next]. | 706 /// Completer for the future returned by [StreamQueue.next]. |
| 457 final Completer _completer; | 707 final _completer = new Completer<T>(); |
| 458 | 708 |
| 459 _NextRequest() : _completer = new Completer<T>(); | 709 _NextRequest(); |
| 460 | 710 |
| 461 Future<T> get future => _completer.future; | 711 Future<T> get future => _completer.future; |
| 462 | 712 |
| 463 bool update(Queue<Result> events, bool isDone) { | 713 bool update(QueueList<Result<T>> events, bool isDone) { |
| 464 if (events.isNotEmpty) { | 714 if (events.isNotEmpty) { |
| 465 events.removeFirst().complete(_completer); | 715 events.removeFirst().complete(_completer); |
| 466 return true; | 716 return true; |
| 467 } | 717 } |
| 468 if (isDone) { | 718 if (isDone) { |
| 469 var errorFuture = | 719 _completer.completeError( |
| 470 new Future.sync(() => throw new StateError("No elements")); | 720 new StateError("No elements"), StackTrace.current); |
| 471 _completer.complete(errorFuture); | |
| 472 return true; | 721 return true; |
| 473 } | 722 } |
| 474 return false; | 723 return false; |
| 724 } |
| 725 } |
| 726 |
| 727 /// Request for a [StreamQueue.peek] call. |
| 728 /// |
| 729 /// Completes the returned future when receiving the first event, |
| 730 /// and is then complete, but doesn't consume the event. |
| 731 class _PeekRequest<T> implements _EventRequest<T> { |
| 732 /// Completer for the future returned by [StreamQueue.next]. |
| 733 final _completer = new Completer<T>(); |
| 734 |
| 735 _PeekRequest(); |
| 736 |
| 737 Future<T> get future => _completer.future; |
| 738 |
| 739 bool update(QueueList<Result<T>> events, bool isDone) { |
| 740 if (events.isNotEmpty) { |
| 741 events.first.complete(_completer); |
| 742 return true; |
| 743 } |
| 744 if (isDone) { |
| 745 _completer.completeError( |
| 746 new StateError("No elements"), StackTrace.current); |
| 747 return true; |
| 748 } |
| 749 return false; |
| 475 } | 750 } |
| 476 } | 751 } |
| 477 | 752 |
| 478 /// Request for a [StreamQueue.skip] call. | 753 /// Request for a [StreamQueue.skip] call. |
| 479 class _SkipRequest implements _EventRequest { | 754 class _SkipRequest<T> implements _EventRequest<T> { |
| 480 /// Completer for the future returned by the skip call. | 755 /// Completer for the future returned by the skip call. |
| 481 final Completer _completer = new Completer<int>(); | 756 final _completer = new Completer<int>(); |
| 482 | 757 |
| 483 /// Number of remaining events to skip. | 758 /// Number of remaining events to skip. |
| 484 /// | 759 /// |
| 485 /// The request [isComplete] when the values reaches zero. | 760 /// The request [isComplete] when the values reaches zero. |
| 486 /// | 761 /// |
| 487 /// Decremented when an event is seen. | 762 /// Decremented when an event is seen. |
| 488 /// Set to zero when an error is seen since errors abort the skip request. | 763 /// Set to zero when an error is seen since errors abort the skip request. |
| 489 int _eventsToSkip; | 764 int _eventsToSkip; |
| 490 | 765 |
| 491 _SkipRequest(this._eventsToSkip); | 766 _SkipRequest(this._eventsToSkip); |
| 492 | 767 |
| 493 /// The future completed when the correct number of events have been skipped. | 768 /// The future completed when the correct number of events have been skipped. |
| 494 Future get future => _completer.future; | 769 Future<int> get future => _completer.future; |
| 495 | 770 |
| 496 bool update(Queue<Result> events, bool isDone) { | 771 bool update(QueueList<Result<T>> events, bool isDone) { |
| 497 while (_eventsToSkip > 0) { | 772 while (_eventsToSkip > 0) { |
| 498 if (events.isEmpty) { | 773 if (events.isEmpty) { |
| 499 if (isDone) break; | 774 if (isDone) break; |
| 500 return false; | 775 return false; |
| 501 } | 776 } |
| 502 _eventsToSkip--; | 777 _eventsToSkip--; |
| 503 | 778 |
| 504 var event = events.removeFirst(); | 779 var event = events.removeFirst(); |
| 505 if (event.isError) { | 780 if (event.isError) { |
| 506 event.complete(_completer); | 781 _completer.completeError(event.asError.error, event.asError.stackTrace); |
| 507 return true; | 782 return true; |
| 508 } | 783 } |
| 509 } | 784 } |
| 510 _completer.complete(_eventsToSkip); | 785 _completer.complete(_eventsToSkip); |
| 511 return true; | 786 return true; |
| 512 } | 787 } |
| 513 } | 788 } |
| 514 | 789 |
| 515 /// Request for a [StreamQueue.take] call. | 790 /// Common superclass for [_TakeRequest] and [_LookAheadRequest]. |
| 516 class _TakeRequest<T> implements _EventRequest { | 791 abstract class _ListRequest<T> implements _EventRequest<T> { |
| 517 /// Completer for the future returned by the take call. | 792 /// Completer for the future returned by the take call. |
| 518 final Completer _completer; | 793 final _completer = new Completer<List<T>>(); |
| 519 | 794 |
| 520 /// List collecting events until enough have been seen. | 795 /// List collecting events until enough have been seen. |
| 521 final List _list = <T>[]; | 796 final _list = <T>[]; |
| 522 | 797 |
| 523 /// Number of events to capture. | 798 /// Number of events to capture. |
| 524 /// | 799 /// |
| 525 /// The request [isComplete] when the length of [_list] reaches | 800 /// The request [isComplete] when the length of [_list] reaches |
| 526 /// this value. | 801 /// this value. |
| 527 final int _eventsToTake; | 802 final int _eventsToTake; |
| 528 | 803 |
| 529 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | 804 _ListRequest(this._eventsToTake); |
| 530 | 805 |
| 531 /// The future completed when the correct number of events have been captured. | 806 /// The future completed when the correct number of events have been captured. |
| 532 Future get future => _completer.future; | 807 Future<List<T>> get future => _completer.future; |
| 808 } |
| 533 | 809 |
| 534 bool update(Queue<Result> events, bool isDone) { | 810 /// Request for a [StreamQueue.take] call. |
| 811 class _TakeRequest<T> extends _ListRequest<T> { |
| 812 _TakeRequest(int eventsToTake) : super(eventsToTake); |
| 813 |
| 814 bool update(QueueList<Result<T>> events, bool isDone) { |
| 535 while (_list.length < _eventsToTake) { | 815 while (_list.length < _eventsToTake) { |
| 536 if (events.isEmpty) { | 816 if (events.isEmpty) { |
| 537 if (isDone) break; | 817 if (isDone) break; |
| 538 return false; | 818 return false; |
| 539 } | 819 } |
| 540 | 820 |
| 541 var result = events.removeFirst(); | 821 var event = events.removeFirst(); |
| 542 if (result.isError) { | 822 if (event.isError) { |
| 543 result.complete(_completer); | 823 event.asError.complete(_completer); |
| 544 return true; | 824 return true; |
| 545 } | 825 } |
| 546 _list.add(result.asValue.value); | 826 _list.add(event.asValue.value); |
| 547 } | 827 } |
| 548 _completer.complete(_list); | 828 _completer.complete(_list); |
| 549 return true; | 829 return true; |
| 830 } |
| 831 } |
| 832 |
| 833 /// Request for a [StreamQueue.lookAhead] call. |
| 834 class _LookAheadRequest<T> extends _ListRequest<T> { |
| 835 _LookAheadRequest(int eventsToTake) : super(eventsToTake); |
| 836 |
| 837 bool update(QueueList<Result<T>> events, bool isDone) { |
| 838 while (_list.length < _eventsToTake) { |
| 839 if (events.length == _list.length) { |
| 840 if (isDone) break; |
| 841 return false; |
| 842 } |
| 843 var event = events.elementAt(_list.length); |
| 844 if (event.isError) { |
| 845 event.asError.complete(_completer); |
| 846 return true; |
| 847 } |
| 848 _list.add(event.asValue.value); |
| 849 } |
| 850 _completer.complete(_list); |
| 851 return true; |
| 550 } | 852 } |
| 551 } | 853 } |
| 552 | 854 |
| 553 /// Request for a [StreamQueue.cancel] call. | 855 /// Request for a [StreamQueue.cancel] call. |
| 554 /// | 856 /// |
| 555 /// The request needs no events, it just waits in the request queue | 857 /// The request needs no events, it just waits in the request queue |
| 556 /// until all previous events are fulfilled, then it cancels the stream queue | 858 /// until all previous events are fulfilled, then it cancels the stream queue |
| 557 /// source subscription. | 859 /// source subscription. |
| 558 class _CancelRequest implements _EventRequest { | 860 class _CancelRequest<T> implements _EventRequest<T> { |
| 559 /// Completer for the future returned by the `cancel` call. | 861 /// Completer for the future returned by the `cancel` call. |
| 560 final Completer _completer = new Completer(); | 862 final _completer = new Completer(); |
| 561 | 863 |
| 562 /// The [StreamQueue] object that has this request queued. | |
| 563 /// | 864 /// |
| 564 /// When the event is completed, it needs to cancel the active subscription | 865 /// When the event is completed, it needs to cancel the active subscription |
| 565 /// of the `StreamQueue` object, if any. | 866 /// of the `StreamQueue` object, if any. |
| 566 final StreamQueue _streamQueue; | 867 final StreamQueue _streamQueue; |
| 567 | 868 |
| 568 _CancelRequest(this._streamQueue); | 869 _CancelRequest(this._streamQueue); |
| 569 | 870 |
| 570 /// The future completed when the cancel request is completed. | 871 /// The future completed when the cancel request is completed. |
| 571 Future get future => _completer.future; | 872 Future get future => _completer.future; |
| 572 | 873 |
| 573 bool update(Queue<Result> events, bool isDone) { | 874 bool update(QueueList<Result<T>> events, bool isDone) { |
| 574 if (_streamQueue._isDone) { | 875 if (_streamQueue._isDone) { |
| 575 _completer.complete(); | 876 _completer.complete(); |
| 576 } else { | 877 } else { |
| 577 _streamQueue._ensureListening(); | 878 _streamQueue._ensureListening(); |
| 578 _completer.complete(_streamQueue._extractStream().listen(null).cancel()); | 879 _completer.complete(_streamQueue._extractStream().listen(null).cancel()); |
| 579 } | 880 } |
| 580 return true; | 881 return true; |
| 581 } | 882 } |
| 582 } | 883 } |
| 583 | 884 |
| 584 /// Request for a [StreamQueue.rest] call. | 885 /// Request for a [StreamQueue.rest] call. |
| 585 /// | 886 /// |
| 586 /// The request is always complete, it just waits in the request queue | 887 /// The request is always complete, it just waits in the request queue |
| 587 /// until all previous events are fulfilled, then it takes over the | 888 /// until all previous events are fulfilled, then it takes over the |
| 588 /// stream events subscription and creates a stream from it. | 889 /// stream events subscription and creates a stream from it. |
| 589 class _RestRequest<T> implements _EventRequest { | 890 class _RestRequest<T> implements _EventRequest<T> { |
| 590 /// Completer for the stream returned by the `rest` call. | 891 /// Completer for the stream returned by the `rest` call. |
| 591 final StreamCompleter _completer = new StreamCompleter<T>(); | 892 final _completer = new StreamCompleter<T>(); |
| 592 | 893 |
| 593 /// The [StreamQueue] object that has this request queued. | 894 /// The [StreamQueue] object that has this request queued. |
| 594 /// | 895 /// |
| 595 /// When the event is completed, it needs to cancel the active subscription | 896 /// When the event is completed, it needs to cancel the active subscription |
| 596 /// of the `StreamQueue` object, if any. | 897 /// of the `StreamQueue` object, if any. |
| 597 final StreamQueue _streamQueue; | 898 final StreamQueue<T> _streamQueue; |
| 598 | 899 |
| 599 _RestRequest(this._streamQueue); | 900 _RestRequest(this._streamQueue); |
| 600 | 901 |
| 601 /// The stream which will contain the remaining events of [_streamQueue]. | 902 /// The stream which will contain the remaining events of [_streamQueue]. |
| 602 Stream<T> get stream => _completer.stream; | 903 Stream<T> get stream => _completer.stream; |
| 603 | 904 |
| 604 bool update(Queue<Result> events, bool isDone) { | 905 bool update(QueueList<Result<T>> events, bool isDone) { |
| 605 if (events.isEmpty) { | 906 if (events.isEmpty) { |
| 606 if (_streamQueue._isDone) { | 907 if (_streamQueue._isDone) { |
| 607 _completer.setEmpty(); | 908 _completer.setEmpty(); |
| 608 } else { | 909 } else { |
| 609 _completer.setSourceStream(_streamQueue._extractStream()); | 910 _completer.setSourceStream(_streamQueue._extractStream()); |
| 610 } | 911 } |
| 611 } else { | 912 } else { |
| 612 // There are prefetched events which needs to be added before the | 913 // There are prefetched events which needs to be added before the |
| 613 // remaining stream. | 914 // remaining stream. |
| 614 var controller = new StreamController<T>(); | 915 var controller = new StreamController<T>(); |
| 615 for (var event in events) { | 916 for (var event in events) { |
| 616 event.addTo(controller); | 917 event.addTo(controller); |
| 617 } | 918 } |
| 618 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) | 919 controller |
| 619 .whenComplete(controller.close); | 920 .addStream(_streamQueue._extractStream(), cancelOnError: false) |
| 921 .whenComplete(controller.close); |
| 620 _completer.setSourceStream(controller.stream); | 922 _completer.setSourceStream(controller.stream); |
| 621 } | 923 } |
| 622 return true; | 924 return true; |
| 623 } | 925 } |
| 624 } | 926 } |
| 625 | 927 |
| 626 /// Request for a [StreamQueue.hasNext] call. | 928 /// Request for a [StreamQueue.hasNext] call. |
| 627 /// | 929 /// |
| 628 /// Completes the [future] with `true` if it sees any event, | 930 /// Completes the [future] with `true` if it sees any event, |
| 629 /// but doesn't consume the event. | 931 /// but doesn't consume the event. |
| 630 /// If the request is closed without seeing an event, then | 932 /// If the request is closed without seeing an event, then |
| 631 /// the [future] is completed with `false`. | 933 /// the [future] is completed with `false`. |
| 632 class _HasNextRequest<T> implements _EventRequest { | 934 class _HasNextRequest<T> implements _EventRequest<T> { |
| 633 final Completer _completer = new Completer<bool>(); | 935 final _completer = new Completer<bool>(); |
| 634 | 936 |
| 635 Future<bool> get future => _completer.future; | 937 Future<bool> get future => _completer.future; |
| 636 | 938 |
| 637 bool update(Queue<Result> events, bool isDone) { | 939 bool update(QueueList<Result<T>> events, bool isDone) { |
| 638 if (events.isNotEmpty) { | 940 if (events.isNotEmpty) { |
| 639 _completer.complete(true); | 941 _completer.complete(true); |
| 640 return true; | 942 return true; |
| 641 } | 943 } |
| 642 if (isDone) { | 944 if (isDone) { |
| 643 _completer.complete(false); | 945 _completer.complete(false); |
| 644 return true; | 946 return true; |
| 645 } | 947 } |
| 646 return false; | 948 return false; |
| 647 } | 949 } |
| 648 } | 950 } |
| 951 |
| 952 /// Request for a [StreamQueue.startTransaction] call. |
| 953 /// |
| 954 /// This request isn't complete until the user calls |
| 955 /// [StreamQueueTransaction.commit] or [StreamQueueTransaction.reject], at which |
| 956 /// point it manually removes itself from the request queue and calls |
| 957 /// [StreamQueue._updateRequests]. |
| 958 class _TransactionRequest<T> implements _EventRequest<T> { |
| 959 /// The transaction created by this request. |
| 960 StreamQueueTransaction<T> get transaction => _transaction; |
| 961 StreamQueueTransaction<T> _transaction; |
| 962 |
| 963 /// The controller that passes events to [transaction]. |
| 964 final _controller = new StreamController<T>(sync: true); |
| 965 |
| 966 /// The number of events passed to [_controller] so far. |
| 967 var _eventsSent = 0; |
| 968 |
| 969 _TransactionRequest(StreamQueue<T> parent) { |
| 970 _transaction = new StreamQueueTransaction._(parent, _controller.stream); |
| 971 } |
| 972 |
| 973 bool update(QueueList<Result<T>> events, bool isDone) { |
| 974 while (_eventsSent < events.length) { |
| 975 events[_eventsSent++].addTo(_controller); |
| 976 } |
| 977 if (isDone && !_controller.isClosed) _controller.close(); |
| 978 return false; |
| 979 } |
| 980 } |
| OLD | NEW |