OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.stream_events; | |
6 | |
7 import 'dart:async'; | 5 import 'dart:async'; |
8 import 'dart:collection'; | 6 import 'dart:collection'; |
9 | 7 |
| 8 import 'package:collection/collection.dart'; |
| 9 |
| 10 import "cancelable_operation.dart"; |
| 11 import "result.dart"; |
10 import "subscription_stream.dart"; | 12 import "subscription_stream.dart"; |
11 import "stream_completer.dart"; | 13 import "stream_completer.dart"; |
12 import "../result.dart"; | 14 import "stream_splitter.dart"; |
13 | 15 |
14 /// An asynchronous pull-based interface for accessing stream events. | 16 /// An asynchronous pull-based interface for accessing stream events. |
15 /// | 17 /// |
16 /// Wraps a stream and makes individual events available on request. | 18 /// Wraps a stream and makes individual events available on request. |
17 /// | 19 /// |
18 /// You can request (and reserve) one or more events from the stream, | 20 /// You can request (and reserve) one or more events from the stream, |
19 /// and after all previous requests have been fulfilled, stream events | 21 /// and after all previous requests have been fulfilled, stream events |
20 /// go towards fulfilling your request. | 22 /// go towards fulfilling your request. |
21 /// | 23 /// |
22 /// For example, if you ask for [next] two times, the returned futures | 24 /// For example, if you ask for [next] two times, the returned futures |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
80 // by the content of the fifth event. | 82 // by the content of the fifth event. |
81 | 83 |
82 /// Whether the event source is done. | 84 /// Whether the event source is done. |
83 bool _isDone = false; | 85 bool _isDone = false; |
84 | 86 |
85 /// Whether a closing operation has been performed on the stream queue. | 87 /// Whether a closing operation has been performed on the stream queue. |
86 /// | 88 /// |
87 /// Closing operations are [cancel] and [rest]. | 89 /// Closing operations are [cancel] and [rest]. |
88 bool _isClosed = false; | 90 bool _isClosed = false; |
89 | 91 |
| 92 /// The number of events dispatched by this queue. |
| 93 /// |
| 94 /// This counts error events. It doesn't count done events, or events |
| 95 /// dispatched to a stream returned by [rest]. |
| 96 int get eventsDispatched => _eventsReceived - _eventQueue.length; |
| 97 |
| 98 /// The number of events received by this queue. |
| 99 var _eventsReceived = 0; |
| 100 |
90 /// Queue of events not used by a request yet. | 101 /// Queue of events not used by a request yet. |
91 final Queue<Result> _eventQueue = new Queue(); | 102 final QueueList<Result> _eventQueue = new QueueList(); |
92 | 103 |
93 /// Queue of pending requests. | 104 /// Queue of pending requests. |
94 /// | 105 /// |
95 /// Access through methods below to ensure consistency. | 106 /// Access through methods below to ensure consistency. |
96 final Queue<_EventRequest> _requestQueue = new Queue(); | 107 final Queue<_EventRequest> _requestQueue = new Queue(); |
97 | 108 |
98 /// Create a `StreamQueue` of the events of [source]. | 109 /// Create a `StreamQueue` of the events of [source]. |
99 factory StreamQueue(Stream source) = _StreamQueue<T>; | 110 factory StreamQueue(Stream<T> source) = _StreamQueue<T>; |
100 | 111 |
101 StreamQueue._(); | 112 StreamQueue._(); |
102 | 113 |
103 /// Asks if the stream has any more events. | 114 /// Asks if the stream has any more events. |
104 /// | 115 /// |
105 /// Returns a future that completes with `true` if the stream has any | 116 /// Returns a future that completes with `true` if the stream has any |
106 /// more events, whether data or error. | 117 /// more events, whether data or error. |
107 /// If the stream closes without producing any more events, the returned | 118 /// If the stream closes without producing any more events, the returned |
108 /// future completes with `false`. | 119 /// future completes with `false`. |
109 /// | 120 /// |
110 /// Can be used before using [next] to avoid getting an error in the | 121 /// Can be used before using [next] to avoid getting an error in the |
111 /// future returned by `next` in the case where there are no more events. | 122 /// future returned by `next` in the case where there are no more events. |
112 /// Another alternative is to use `take(1)` which returns either zero or | 123 /// Another alternative is to use `take(1)` which returns either zero or |
113 /// one events. | 124 /// one events. |
114 Future<bool> get hasNext { | 125 Future<bool> get hasNext { |
115 if (!_isClosed) { | 126 if (!_isClosed) { |
116 var hasNextRequest = new _HasNextRequest(); | 127 var hasNextRequest = new _HasNextRequest(); |
117 _addRequest(hasNextRequest); | 128 _addRequest(hasNextRequest); |
118 return hasNextRequest.future; | 129 return hasNextRequest.future; |
119 } | 130 } |
120 throw _failClosed(); | 131 throw _failClosed(); |
121 } | 132 } |
122 | 133 |
| 134 /// Look at the next [count] data events without consuming them. |
| 135 /// |
| 136 /// Works like [take] except that the events are left in the queue. |
| 137 /// If one of the next [count] events is an error, the returned future |
| 138 /// completes with this error, and the error is still left in the queue. |
| 139 Future<List<T>> lookAhead(int count) { |
| 140 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| 141 if (!_isClosed) { |
| 142 var request = new _LookAheadRequest<T>(count); |
| 143 _addRequest(request); |
| 144 return request.future; |
| 145 } |
| 146 throw _failClosed(); |
| 147 } |
| 148 |
123 /// Requests the next (yet unrequested) event from the stream. | 149 /// Requests the next (yet unrequested) event from the stream. |
124 /// | 150 /// |
125 /// When the requested event arrives, the returned future is completed with | 151 /// When the requested event arrives, the returned future is completed with |
126 /// the event. | 152 /// the event. |
127 /// If the event is a data event, the returned future completes | 153 /// If the event is a data event, the returned future completes |
128 /// with its value. | 154 /// with its value. |
129 /// If the event is an error event, the returned future completes with | 155 /// If the event is an error event, the returned future completes with |
130 /// its error and stack trace. | 156 /// its error and stack trace. |
131 /// If the stream closes before an event arrives, the returned future | 157 /// If the stream closes before an event arrives, the returned future |
132 /// completes with a [StateError]. | 158 /// completes with a [StateError]. |
133 /// | 159 /// |
134 /// It's possible to have several pending [next] calls (or other requests), | 160 /// It's possible to have several pending [next] calls (or other requests), |
135 /// and they will be completed in the order they were requested, by the | 161 /// and they will be completed in the order they were requested, by the |
136 /// first events that were not consumed by previous requeusts. | 162 /// first events that were not consumed by previous requeusts. |
137 Future<T> get next { | 163 Future<T> get next { |
138 if (!_isClosed) { | 164 if (!_isClosed) { |
139 var nextRequest = new _NextRequest<T>(); | 165 var nextRequest = new _NextRequest<T>(); |
140 _addRequest(nextRequest); | 166 _addRequest(nextRequest); |
141 return nextRequest.future; | 167 return nextRequest.future; |
142 } | 168 } |
143 throw _failClosed(); | 169 throw _failClosed(); |
144 } | 170 } |
145 | 171 |
| 172 /// Looks at the next (yet unrequested) event from the stream. |
| 173 /// |
| 174 /// Like [next] except that the event is not consumed. |
| 175 /// If the next event is an error event, it stays in the queue. |
| 176 Future<T> get peek { |
| 177 if (!_isClosed) { |
| 178 var nextRequest = new _PeekRequest<T>(); |
| 179 _addRequest(nextRequest); |
| 180 return nextRequest.future; |
| 181 } |
| 182 throw _failClosed(); |
| 183 } |
| 184 |
146 /// Returns a stream of all the remaning events of the source stream. | 185 /// Returns a stream of all the remaning events of the source stream. |
147 /// | 186 /// |
148 /// All requested [next], [skip] or [take] operations are completed | 187 /// All requested [next], [skip] or [take] operations are completed |
149 /// first, and then any remaining events are provided as events of | 188 /// first, and then any remaining events are provided as events of |
150 /// the returned stream. | 189 /// the returned stream. |
151 /// | 190 /// |
152 /// Using `rest` closes this stream queue. After getting the | 191 /// Using `rest` closes this stream queue. After getting the |
153 /// `rest` the caller may no longer request other events, like | 192 /// `rest` the caller may no longer request other events, like |
154 /// after calling [cancel]. | 193 /// after calling [cancel]. |
155 Stream<T> get rest { | 194 Stream<T> get rest { |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
205 Future<List<T>> take(int count) { | 244 Future<List<T>> take(int count) { |
206 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 245 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
207 if (!_isClosed) { | 246 if (!_isClosed) { |
208 var request = new _TakeRequest<T>(count); | 247 var request = new _TakeRequest<T>(count); |
209 _addRequest(request); | 248 _addRequest(request); |
210 return request.future; | 249 return request.future; |
211 } | 250 } |
212 throw _failClosed(); | 251 throw _failClosed(); |
213 } | 252 } |
214 | 253 |
| 254 /// Requests a transaction that can conditionally consume events. |
| 255 /// |
| 256 /// The transaction can create copies of this queue at the current position |
| 257 /// using [StreamQueueTransaction.newQueue]. Each of these queues is |
| 258 /// independent of one another and of the parent queue. The transaction |
| 259 /// finishes when one of two methods is called: |
| 260 /// |
| 261 /// * [StreamQueueTransaction.commit] updates the parent queue's position to |
| 262 /// match that of one of the copies. |
| 263 /// |
| 264 /// * [StreamQueueTransaction.reject] causes the parent queue to continue as |
| 265 /// though [startTransaction] hadn't been called. |
| 266 /// |
| 267 /// Until the transaction finishes, this queue won't emit any events. |
| 268 /// |
| 269 /// See also [withTransaction] and [cancelable]. |
| 270 /// |
| 271 /// ```dart |
| 272 /// /// Consumes all empty lines from the beginning of [lines]. |
| 273 /// Future consumeEmptyLines(StreamQueue<String> lines) async { |
| 274 /// while (await lines.hasNext) { |
| 275 /// var transaction = lines.startTransaction(); |
| 276 /// var queue = transaction.newQueue(); |
| 277 /// if ((await queue.next).isNotEmpty) { |
| 278 /// transaction.reject(); |
| 279 /// return; |
| 280 /// } else { |
| 281 /// transaction.commit(queue); |
| 282 /// } |
| 283 /// } |
| 284 /// } |
| 285 /// ``` |
| 286 StreamQueueTransaction<T> startTransaction() { |
| 287 if (_isClosed) throw _failClosed(); |
| 288 |
| 289 var request = new _TransactionRequest(this); |
| 290 _addRequest(request); |
| 291 return request.transaction; |
| 292 } |
| 293 |
| 294 /// Passes a copy of this queue to [callback], and updates this queue to match |
| 295 /// the copy's position if [callback] returns `true`. |
| 296 /// |
| 297 /// This queue won't emit any events until [callback] returns. If it returns |
| 298 /// `false`, this queue continues as though [withTransaction] hadn't been |
| 299 /// called. If it throws an error, this updates this queue to match the copy's |
| 300 /// position and throws the error from the returned `Future`. |
| 301 /// |
| 302 /// Returns the same value as [callback]. |
| 303 /// |
| 304 /// See also [startTransaction] and [cancelable]. |
| 305 /// |
| 306 /// ```dart |
| 307 /// /// Consumes all empty lines from the beginning of [lines]. |
| 308 /// Future consumeEmptyLines(StreamQueue<String> lines) async { |
| 309 /// while (await lines.hasNext) { |
| 310 /// // Consume a line if it's empty, otherwise return. |
| 311 /// if (!await lines.withTransaction( |
| 312 /// (queue) async => (await queue.next).isEmpty)) { |
| 313 /// return; |
| 314 /// } |
| 315 /// } |
| 316 /// } |
| 317 /// ``` |
| 318 Future<bool> withTransaction(Future<bool> callback(StreamQueue<T> queue)) { |
| 319 var transaction = startTransaction(); |
| 320 |
| 321 /// Avoid async/await to ensure that [startTransaction] is called |
| 322 /// synchronously and so ends up in the right place in the request queue. |
| 323 var queue = transaction.newQueue(); |
| 324 return callback(queue).then((result) { |
| 325 if (result) { |
| 326 transaction.commit(queue); |
| 327 } else { |
| 328 transaction.reject(); |
| 329 } |
| 330 return result; |
| 331 }, onError: (error) { |
| 332 transaction.commit(queue); |
| 333 throw error; |
| 334 }); |
| 335 } |
| 336 |
| 337 /// Passes a copy of this queue to [callback], and updates this queue to match |
| 338 /// the copy's position once [callback] completes. |
| 339 /// |
| 340 /// If the returned [CancelableOperation] is canceled, this queue instead |
| 341 /// continues as though [cancelable] hadn't been called. Otherwise, it emits |
| 342 /// the same value or error as [callback]. |
| 343 /// |
| 344 /// See also [startTransaction] and [withTransaction]. |
| 345 /// |
| 346 /// ```dart |
| 347 /// final _stdinQueue = new StreamQueue(stdin); |
| 348 /// |
| 349 /// /// Returns an operation that completes when the user sends a line to |
| 350 /// /// standard input. |
| 351 /// /// |
| 352 /// /// If the operation is canceled, stops waiting for user input. |
| 353 /// CancelableOperation<String> nextStdinLine() => |
| 354 /// _stdinQueue.cancelable((queue) => queue.next); |
| 355 /// ``` |
| 356 CancelableOperation<S> cancelable<S>( |
| 357 Future<S> callback(StreamQueue<T> queue)) { |
| 358 var transaction = startTransaction(); |
| 359 var completer = new CancelableCompleter<S>(onCancel: () { |
| 360 transaction.reject(); |
| 361 }); |
| 362 |
| 363 var queue = transaction.newQueue(); |
| 364 completer.complete(callback(queue).whenComplete(() { |
| 365 if (!completer.isCanceled) transaction.commit(queue); |
| 366 })); |
| 367 |
| 368 return completer.operation; |
| 369 } |
| 370 |
215 /// Cancels the underlying event source. | 371 /// Cancels the underlying event source. |
216 /// | 372 /// |
217 /// If [immediate] is `false` (the default), the cancel operation waits until | 373 /// If [immediate] is `false` (the default), the cancel operation waits until |
218 /// all previously requested events have been processed, then it cancels the | 374 /// all previously requested events have been processed, then it cancels the |
219 /// subscription providing the events. | 375 /// subscription providing the events. |
220 /// | 376 /// |
221 /// If [immediate] is `true`, the source is instead canceled | 377 /// If [immediate] is `true`, the source is instead canceled |
222 /// immediately. Any pending events are completed as though the underlying | 378 /// immediately. Any pending events are completed as though the underlying |
223 /// stream had closed. | 379 /// stream had closed. |
224 /// | 380 /// |
225 /// The returned future completes with the result of calling | 381 /// The returned future completes with the result of calling |
226 /// `cancel`. | 382 /// `cancel`. |
227 /// | 383 /// |
228 /// After calling `cancel`, no further events can be requested. | 384 /// After calling `cancel`, no further events can be requested. |
229 /// None of [next], [rest], [skip], [take] or [cancel] may be | 385 /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel] |
230 /// called again. | 386 /// may be called again. |
231 Future cancel({bool immediate: false}) { | 387 Future cancel({bool immediate: false}) { |
232 if (_isClosed) throw _failClosed(); | 388 if (_isClosed) throw _failClosed(); |
233 _isClosed = true; | 389 _isClosed = true; |
234 | 390 |
235 if (!immediate) { | 391 if (!immediate) { |
236 var request = new _CancelRequest(this); | 392 var request = new _CancelRequest(this); |
237 _addRequest(request); | 393 _addRequest(request); |
238 return request.future; | 394 return request.future; |
239 } | 395 } |
240 | 396 |
(...skipping 28 matching lines...) Expand all Loading... |
269 _pause(); | 425 _pause(); |
270 } | 426 } |
271 } | 427 } |
272 | 428 |
273 /// Extracts a stream from the event source and makes this stream queue | 429 /// Extracts a stream from the event source and makes this stream queue |
274 /// unusable. | 430 /// unusable. |
275 /// | 431 /// |
276 /// Can only be used by the very last request (the stream queue must | 432 /// Can only be used by the very last request (the stream queue must |
277 /// be closed by that request). | 433 /// be closed by that request). |
278 /// Only used by [rest]. | 434 /// Only used by [rest]. |
279 Stream _extractStream(); | 435 Stream<T> _extractStream(); |
280 | 436 |
281 /// Requests that the event source pauses events. | 437 /// Requests that the event source pauses events. |
282 /// | 438 /// |
283 /// This is called automatically when the request queue is empty. | 439 /// This is called automatically when the request queue is empty. |
284 /// | 440 /// |
285 /// The event source is restarted by the next call to [_ensureListening]. | 441 /// The event source is restarted by the next call to [_ensureListening]. |
286 void _pause(); | 442 void _pause(); |
287 | 443 |
288 /// Ensures that we are listening on events from the event source. | 444 /// Ensures that we are listening on events from the event source. |
289 /// | 445 /// |
290 /// Starts listening for the first time or resumes after a [_pause]. | 446 /// Starts listening for the first time or resumes after a [_pause]. |
291 /// | 447 /// |
292 /// Is called automatically if a request requires more events. | 448 /// Is called automatically if a request requires more events. |
293 void _ensureListening(); | 449 void _ensureListening(); |
294 | 450 |
295 /// Cancels the underlying event source. | 451 /// Cancels the underlying event source. |
296 Future _cancel(); | 452 Future _cancel(); |
297 | 453 |
298 // ------------------------------------------------------------------ | 454 // ------------------------------------------------------------------ |
299 // Methods called by the event source to add events or say that it's | 455 // Methods called by the event source to add events or say that it's |
300 // done. | 456 // done. |
301 | 457 |
302 /// Called when the event source adds a new data or error event. | 458 /// Called when the event source adds a new data or error event. |
303 /// Always calls [_updateRequests] after adding. | 459 /// Always calls [_updateRequests] after adding. |
304 void _addResult(Result result) { | 460 void _addResult(Result result) { |
| 461 _eventsReceived++; |
305 _eventQueue.add(result); | 462 _eventQueue.add(result); |
306 _updateRequests(); | 463 _updateRequests(); |
307 } | 464 } |
308 | 465 |
309 /// Called when the event source is done. | 466 /// Called when the event source is done. |
310 /// Always calls [_updateRequests] after adding. | 467 /// Always calls [_updateRequests] after adding. |
311 void _close() { | 468 void _close() { |
312 _isDone = true; | 469 _isDone = true; |
313 _updateRequests(); | 470 _updateRequests(); |
314 } | 471 } |
(...skipping 15 matching lines...) Expand all Loading... |
330 /// immediately, it skips the queue. | 487 /// immediately, it skips the queue. |
331 void _addRequest(_EventRequest request) { | 488 void _addRequest(_EventRequest request) { |
332 if (_requestQueue.isEmpty) { | 489 if (_requestQueue.isEmpty) { |
333 if (request.update(_eventQueue, _isDone)) return; | 490 if (request.update(_eventQueue, _isDone)) return; |
334 _ensureListening(); | 491 _ensureListening(); |
335 } | 492 } |
336 _requestQueue.add(request); | 493 _requestQueue.add(request); |
337 } | 494 } |
338 } | 495 } |
339 | 496 |
340 | |
341 /// The default implementation of [StreamQueue]. | 497 /// The default implementation of [StreamQueue]. |
342 /// | 498 /// |
343 /// This queue gets its events from a stream which is listened | 499 /// This queue gets its events from a stream which is listened |
344 /// to when a request needs events. | 500 /// to when a request needs events. |
345 class _StreamQueue<T> extends StreamQueue<T> { | 501 class _StreamQueue<T> extends StreamQueue<T> { |
346 /// Source of events. | 502 /// Source of events. |
347 final Stream _sourceStream; | 503 final Stream<T> _sourceStream; |
348 | 504 |
349 /// Subscription on [_sourceStream] while listening for events. | 505 /// Subscription on [_sourceStream] while listening for events. |
350 /// | 506 /// |
351 /// Set to subscription when listening, and set to `null` when the | 507 /// Set to subscription when listening, and set to `null` when the |
352 /// subscription is done (and [_isDone] is set to true). | 508 /// subscription is done (and [_isDone] is set to true). |
353 StreamSubscription _subscription; | 509 StreamSubscription<T> _subscription; |
354 | 510 |
355 _StreamQueue(this._sourceStream) : super._(); | 511 _StreamQueue(this._sourceStream) : super._(); |
356 | 512 |
357 Future _cancel() { | 513 Future _cancel() { |
358 if (_isDone) return null; | 514 if (_isDone) return null; |
359 if (_subscription == null) _subscription = _sourceStream.listen(null); | 515 if (_subscription == null) _subscription = _sourceStream.listen(null); |
360 var future = _subscription.cancel(); | 516 var future = _subscription.cancel(); |
361 _close(); | 517 _close(); |
362 return future; | 518 return future; |
363 } | 519 } |
364 | 520 |
365 void _ensureListening() { | 521 void _ensureListening() { |
366 assert(!_isDone); | 522 if (_isDone) return; |
367 if (_subscription == null) { | 523 if (_subscription == null) { |
368 _subscription = | 524 _subscription = _sourceStream.listen((data) { |
369 _sourceStream.listen( | 525 _addResult(new Result.value(data)); |
370 (data) { | 526 }, onError: (error, StackTrace stackTrace) { |
371 _addResult(new Result.value(data)); | 527 _addResult(new Result.error(error, stackTrace)); |
372 }, | 528 }, onDone: () { |
373 onError: (error, StackTrace stackTrace) { | 529 _subscription = null; |
374 _addResult(new Result.error(error, stackTrace)); | 530 this._close(); |
375 }, | 531 }); |
376 onDone: () { | |
377 _subscription = null; | |
378 this._close(); | |
379 }); | |
380 } else { | 532 } else { |
381 _subscription.resume(); | 533 _subscription.resume(); |
382 } | 534 } |
383 } | 535 } |
384 | 536 |
385 void _pause() { | 537 void _pause() { |
386 _subscription.pause(); | 538 _subscription.pause(); |
387 } | 539 } |
388 | 540 |
389 Stream<T> _extractStream() { | 541 Stream<T> _extractStream() { |
390 assert(_isClosed); | 542 assert(_isClosed); |
391 if (_isDone) { | 543 if (_isDone) { |
392 return new Stream<T>.empty(); | 544 return new Stream<T>.empty(); |
393 } | 545 } |
| 546 _isDone = true; |
394 | 547 |
395 if (_subscription == null) { | 548 if (_subscription == null) { |
396 return _sourceStream; | 549 return _sourceStream; |
397 } | 550 } |
398 | 551 |
399 var subscription = _subscription; | 552 var subscription = _subscription; |
400 _subscription = null; | 553 _subscription = null; |
401 _isDone = true; | |
402 | 554 |
403 var wasPaused = subscription.isPaused; | 555 var wasPaused = subscription.isPaused; |
404 var result = new SubscriptionStream<T>(subscription); | 556 var result = new SubscriptionStream<T>(subscription); |
405 // Resume after creating stream because that pauses the subscription too. | 557 // Resume after creating stream because that pauses the subscription too. |
406 // This way there won't be a short resumption in the middle. | 558 // This way there won't be a short resumption in the middle. |
407 if (wasPaused) subscription.resume(); | 559 if (wasPaused) subscription.resume(); |
408 return result; | 560 return result; |
409 } | 561 } |
410 } | 562 } |
411 | 563 |
| 564 /// A transaction on a [StreamQueue], created by [StreamQueue.startTransaction]. |
| 565 /// |
| 566 /// Copies of the parent queue may be created using [newQueue]. Calling [commit] |
| 567 /// moves the parent queue to a copy's position, and calling [reject] causes it |
| 568 /// to continue as though [StreamQueue.startTransaction] was never called. |
| 569 class StreamQueueTransaction<T> { |
| 570 /// The parent queue on which this transaction is active. |
| 571 final StreamQueue<T> _parent; |
| 572 |
| 573 /// The splitter that produces copies of the parent queue's stream. |
| 574 final StreamSplitter<T> _splitter; |
| 575 |
| 576 /// Queues created using [newQueue]. |
| 577 final _queues = new Set<StreamQueue>(); |
| 578 |
| 579 /// Whether [commit] has been called. |
| 580 var _committed = false; |
| 581 |
| 582 /// Whether [reject] has been called. |
| 583 var _rejected = false; |
| 584 |
| 585 StreamQueueTransaction._(this._parent, Stream<T> source) |
| 586 : _splitter = new StreamSplitter(source); |
| 587 |
| 588 /// Creates a new copy of the parent queue. |
| 589 /// |
| 590 /// This copy starts at the parent queue's position when |
| 591 /// [StreamQueue.startTransaction] was called. Its position can be committed |
| 592 /// to the parent queue using [commit]. |
| 593 StreamQueue<T> newQueue() { |
| 594 var queue = new StreamQueue(_splitter.split()); |
| 595 _queues.add(queue); |
| 596 return queue; |
| 597 } |
| 598 |
| 599 /// Commits a queue created using [newQueue]. |
| 600 /// |
| 601 /// The parent queue's position is updated to be the same as [queue]'s. |
| 602 /// Further requests on all queues created by this transaction, including |
| 603 /// [queue], will complete as though [cancel] were called with `immediate: |
| 604 /// true`. |
| 605 /// |
| 606 /// Throws a [StateError] if [commit] or [reject] have already been called, or |
| 607 /// if there are pending requests on [queue]. |
| 608 void commit(StreamQueue<T> queue) { |
| 609 _assertActive(); |
| 610 if (!_queues.contains(queue)) { |
| 611 throw new ArgumentError("Queue doesn't belong to this transaction."); |
| 612 } else if (queue._requestQueue.isNotEmpty) { |
| 613 throw new StateError("A queue with pending requests can't be committed."); |
| 614 } |
| 615 _committed = true; |
| 616 |
| 617 // Remove all events from the parent queue that were consumed by the |
| 618 // child queue. |
| 619 for (var j = 0; j < queue.eventsDispatched; j++) { |
| 620 _parent._eventQueue.removeFirst(); |
| 621 } |
| 622 |
| 623 _done(); |
| 624 } |
| 625 |
| 626 /// Rejects this transaction without updating the parent queue. |
| 627 /// |
| 628 /// The parent will continue as though [StreamQueue.startTransaction] hadn't |
| 629 /// been called. Further requests on all queues created by this transaction |
| 630 /// will complete as though [cancel] were called with `immediate: true`. |
| 631 /// |
| 632 /// Throws a [StateError] if [commit] or [reject] have already been called. |
| 633 void reject() { |
| 634 _assertActive(); |
| 635 _rejected = true; |
| 636 _done(); |
| 637 } |
| 638 |
| 639 // Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s |
| 640 // request queue, and runs the next request. |
| 641 void _done() { |
| 642 _splitter.close(); |
| 643 for (var queue in _queues) { |
| 644 queue._cancel(); |
| 645 } |
| 646 |
| 647 assert((_parent._requestQueue.first as _TransactionRequest).transaction == |
| 648 this); |
| 649 _parent._requestQueue.removeFirst(); |
| 650 _parent._updateRequests(); |
| 651 } |
| 652 |
| 653 /// Throws a [StateError] if [accept] or [reject] has already been called. |
| 654 void _assertActive() { |
| 655 if (_committed) { |
| 656 throw new StateError("This transaction has already been accepted."); |
| 657 } else if (_rejected) { |
| 658 throw new StateError("This transaction has already been rejected."); |
| 659 } |
| 660 } |
| 661 } |
412 | 662 |
413 /// Request object that receives events when they arrive, until fulfilled. | 663 /// Request object that receives events when they arrive, until fulfilled. |
414 /// | 664 /// |
415 /// Each request that cannot be fulfilled immediately is represented by | 665 /// Each request that cannot be fulfilled immediately is represented by |
416 /// an `_EventRequest` object in the request queue. | 666 /// an `_EventRequest` object in the request queue. |
417 /// | 667 /// |
418 /// Events from the source stream are sent to the first request in the | 668 /// Events from the source stream are sent to the first request in the |
419 /// queue until it reports itself as [isComplete]. | 669 /// queue until it reports itself as [isComplete]. |
420 /// | 670 /// |
421 /// When the first request in the queue `isComplete`, either when becoming | 671 /// When the first request in the queue `isComplete`, either when becoming |
422 /// the first request or after receiving an event, its [close] methods is | 672 /// the first request or after receiving an event, its [close] methods is |
423 /// called. | 673 /// called. |
424 /// | 674 /// |
425 /// The [close] method is also called immediately when the source stream | 675 /// The [close] method is also called immediately when the source stream |
426 /// is done. | 676 /// is done. |
427 abstract class _EventRequest { | 677 abstract class _EventRequest<T> { |
428 /// Handle available events. | 678 /// Handle available events. |
429 /// | 679 /// |
430 /// The available events are provided as a queue. The `update` function | 680 /// The available events are provided as a queue. The `update` function |
431 /// should only remove events from the front of the event queue, e.g., | 681 /// should only remove events from the front of the event queue, e.g., |
432 /// using [removeFirst]. | 682 /// using [removeFirst]. |
433 /// | 683 /// |
434 /// Returns `true` if the request is completed, or `false` if it needs | 684 /// Returns `true` if the request is completed, or `false` if it needs |
435 /// more events. | 685 /// more events. |
436 /// The call may keep events in the queue until the requeust is complete, | 686 /// The call may keep events in the queue until the requeust is complete, |
437 /// or it may remove them immediately. | 687 /// or it may remove them immediately. |
438 /// | 688 /// |
439 /// If the method returns true, the request is considered fulfilled, and | 689 /// If the method returns true, the request is considered fulfilled, and |
440 /// will never be called again. | 690 /// will never be called again. |
441 /// | 691 /// |
442 /// This method is called when a request reaches the front of the request | 692 /// This method is called when a request reaches the front of the request |
443 /// queue, and if it returns `false`, it's called again every time a new event | 693 /// queue, and if it returns `false`, it's called again every time a new event |
444 /// becomes available, or when the stream closes. | 694 /// becomes available, or when the stream closes. |
445 /// If the function returns `false` when the stream has already closed | 695 /// If the function returns `false` when the stream has already closed |
446 /// ([isDone] is true), then the request must call | 696 /// ([isDone] is true), then the request must call |
447 /// [StreamQueue._updateRequests] itself when it's ready to continue. | 697 /// [StreamQueue._updateRequests] itself when it's ready to continue. |
448 bool update(Queue<Result> events, bool isDone); | 698 bool update(QueueList<Result<T>> events, bool isDone); |
449 } | 699 } |
450 | 700 |
451 /// Request for a [StreamQueue.next] call. | 701 /// Request for a [StreamQueue.next] call. |
452 /// | 702 /// |
453 /// Completes the returned future when receiving the first event, | 703 /// Completes the returned future when receiving the first event, |
454 /// and is then complete. | 704 /// and is then complete. |
455 class _NextRequest<T> implements _EventRequest { | 705 class _NextRequest<T> implements _EventRequest<T> { |
456 /// Completer for the future returned by [StreamQueue.next]. | 706 /// Completer for the future returned by [StreamQueue.next]. |
457 final Completer _completer; | 707 final _completer = new Completer<T>(); |
458 | 708 |
459 _NextRequest() : _completer = new Completer<T>(); | 709 _NextRequest(); |
460 | 710 |
461 Future<T> get future => _completer.future; | 711 Future<T> get future => _completer.future; |
462 | 712 |
463 bool update(Queue<Result> events, bool isDone) { | 713 bool update(QueueList<Result<T>> events, bool isDone) { |
464 if (events.isNotEmpty) { | 714 if (events.isNotEmpty) { |
465 events.removeFirst().complete(_completer); | 715 events.removeFirst().complete(_completer); |
466 return true; | 716 return true; |
467 } | 717 } |
468 if (isDone) { | 718 if (isDone) { |
469 var errorFuture = | 719 _completer.completeError( |
470 new Future.sync(() => throw new StateError("No elements")); | 720 new StateError("No elements"), StackTrace.current); |
471 _completer.complete(errorFuture); | |
472 return true; | 721 return true; |
473 } | 722 } |
474 return false; | 723 return false; |
| 724 } |
| 725 } |
| 726 |
| 727 /// Request for a [StreamQueue.peek] call. |
| 728 /// |
| 729 /// Completes the returned future when receiving the first event, |
| 730 /// and is then complete, but doesn't consume the event. |
| 731 class _PeekRequest<T> implements _EventRequest<T> { |
| 732 /// Completer for the future returned by [StreamQueue.next]. |
| 733 final _completer = new Completer<T>(); |
| 734 |
| 735 _PeekRequest(); |
| 736 |
| 737 Future<T> get future => _completer.future; |
| 738 |
| 739 bool update(QueueList<Result<T>> events, bool isDone) { |
| 740 if (events.isNotEmpty) { |
| 741 events.first.complete(_completer); |
| 742 return true; |
| 743 } |
| 744 if (isDone) { |
| 745 _completer.completeError( |
| 746 new StateError("No elements"), StackTrace.current); |
| 747 return true; |
| 748 } |
| 749 return false; |
475 } | 750 } |
476 } | 751 } |
477 | 752 |
478 /// Request for a [StreamQueue.skip] call. | 753 /// Request for a [StreamQueue.skip] call. |
479 class _SkipRequest implements _EventRequest { | 754 class _SkipRequest<T> implements _EventRequest<T> { |
480 /// Completer for the future returned by the skip call. | 755 /// Completer for the future returned by the skip call. |
481 final Completer _completer = new Completer<int>(); | 756 final _completer = new Completer<int>(); |
482 | 757 |
483 /// Number of remaining events to skip. | 758 /// Number of remaining events to skip. |
484 /// | 759 /// |
485 /// The request [isComplete] when the values reaches zero. | 760 /// The request [isComplete] when the values reaches zero. |
486 /// | 761 /// |
487 /// Decremented when an event is seen. | 762 /// Decremented when an event is seen. |
488 /// Set to zero when an error is seen since errors abort the skip request. | 763 /// Set to zero when an error is seen since errors abort the skip request. |
489 int _eventsToSkip; | 764 int _eventsToSkip; |
490 | 765 |
491 _SkipRequest(this._eventsToSkip); | 766 _SkipRequest(this._eventsToSkip); |
492 | 767 |
493 /// The future completed when the correct number of events have been skipped. | 768 /// The future completed when the correct number of events have been skipped. |
494 Future get future => _completer.future; | 769 Future<int> get future => _completer.future; |
495 | 770 |
496 bool update(Queue<Result> events, bool isDone) { | 771 bool update(QueueList<Result<T>> events, bool isDone) { |
497 while (_eventsToSkip > 0) { | 772 while (_eventsToSkip > 0) { |
498 if (events.isEmpty) { | 773 if (events.isEmpty) { |
499 if (isDone) break; | 774 if (isDone) break; |
500 return false; | 775 return false; |
501 } | 776 } |
502 _eventsToSkip--; | 777 _eventsToSkip--; |
503 | 778 |
504 var event = events.removeFirst(); | 779 var event = events.removeFirst(); |
505 if (event.isError) { | 780 if (event.isError) { |
506 event.complete(_completer); | 781 _completer.completeError(event.asError.error, event.asError.stackTrace); |
507 return true; | 782 return true; |
508 } | 783 } |
509 } | 784 } |
510 _completer.complete(_eventsToSkip); | 785 _completer.complete(_eventsToSkip); |
511 return true; | 786 return true; |
512 } | 787 } |
513 } | 788 } |
514 | 789 |
515 /// Request for a [StreamQueue.take] call. | 790 /// Common superclass for [_TakeRequest] and [_LookAheadRequest]. |
516 class _TakeRequest<T> implements _EventRequest { | 791 abstract class _ListRequest<T> implements _EventRequest<T> { |
517 /// Completer for the future returned by the take call. | 792 /// Completer for the future returned by the take call. |
518 final Completer _completer; | 793 final _completer = new Completer<List<T>>(); |
519 | 794 |
520 /// List collecting events until enough have been seen. | 795 /// List collecting events until enough have been seen. |
521 final List _list = <T>[]; | 796 final _list = <T>[]; |
522 | 797 |
523 /// Number of events to capture. | 798 /// Number of events to capture. |
524 /// | 799 /// |
525 /// The request [isComplete] when the length of [_list] reaches | 800 /// The request [isComplete] when the length of [_list] reaches |
526 /// this value. | 801 /// this value. |
527 final int _eventsToTake; | 802 final int _eventsToTake; |
528 | 803 |
529 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | 804 _ListRequest(this._eventsToTake); |
530 | 805 |
531 /// The future completed when the correct number of events have been captured. | 806 /// The future completed when the correct number of events have been captured. |
532 Future get future => _completer.future; | 807 Future<List<T>> get future => _completer.future; |
| 808 } |
533 | 809 |
534 bool update(Queue<Result> events, bool isDone) { | 810 /// Request for a [StreamQueue.take] call. |
| 811 class _TakeRequest<T> extends _ListRequest<T> { |
| 812 _TakeRequest(int eventsToTake) : super(eventsToTake); |
| 813 |
| 814 bool update(QueueList<Result<T>> events, bool isDone) { |
535 while (_list.length < _eventsToTake) { | 815 while (_list.length < _eventsToTake) { |
536 if (events.isEmpty) { | 816 if (events.isEmpty) { |
537 if (isDone) break; | 817 if (isDone) break; |
538 return false; | 818 return false; |
539 } | 819 } |
540 | 820 |
541 var result = events.removeFirst(); | 821 var event = events.removeFirst(); |
542 if (result.isError) { | 822 if (event.isError) { |
543 result.complete(_completer); | 823 event.asError.complete(_completer); |
544 return true; | 824 return true; |
545 } | 825 } |
546 _list.add(result.asValue.value); | 826 _list.add(event.asValue.value); |
547 } | 827 } |
548 _completer.complete(_list); | 828 _completer.complete(_list); |
549 return true; | 829 return true; |
| 830 } |
| 831 } |
| 832 |
| 833 /// Request for a [StreamQueue.lookAhead] call. |
| 834 class _LookAheadRequest<T> extends _ListRequest<T> { |
| 835 _LookAheadRequest(int eventsToTake) : super(eventsToTake); |
| 836 |
| 837 bool update(QueueList<Result<T>> events, bool isDone) { |
| 838 while (_list.length < _eventsToTake) { |
| 839 if (events.length == _list.length) { |
| 840 if (isDone) break; |
| 841 return false; |
| 842 } |
| 843 var event = events.elementAt(_list.length); |
| 844 if (event.isError) { |
| 845 event.asError.complete(_completer); |
| 846 return true; |
| 847 } |
| 848 _list.add(event.asValue.value); |
| 849 } |
| 850 _completer.complete(_list); |
| 851 return true; |
550 } | 852 } |
551 } | 853 } |
552 | 854 |
553 /// Request for a [StreamQueue.cancel] call. | 855 /// Request for a [StreamQueue.cancel] call. |
554 /// | 856 /// |
555 /// The request needs no events, it just waits in the request queue | 857 /// The request needs no events, it just waits in the request queue |
556 /// until all previous events are fulfilled, then it cancels the stream queue | 858 /// until all previous events are fulfilled, then it cancels the stream queue |
557 /// source subscription. | 859 /// source subscription. |
558 class _CancelRequest implements _EventRequest { | 860 class _CancelRequest<T> implements _EventRequest<T> { |
559 /// Completer for the future returned by the `cancel` call. | 861 /// Completer for the future returned by the `cancel` call. |
560 final Completer _completer = new Completer(); | 862 final _completer = new Completer(); |
561 | 863 |
562 /// The [StreamQueue] object that has this request queued. | |
563 /// | 864 /// |
564 /// When the event is completed, it needs to cancel the active subscription | 865 /// When the event is completed, it needs to cancel the active subscription |
565 /// of the `StreamQueue` object, if any. | 866 /// of the `StreamQueue` object, if any. |
566 final StreamQueue _streamQueue; | 867 final StreamQueue _streamQueue; |
567 | 868 |
568 _CancelRequest(this._streamQueue); | 869 _CancelRequest(this._streamQueue); |
569 | 870 |
570 /// The future completed when the cancel request is completed. | 871 /// The future completed when the cancel request is completed. |
571 Future get future => _completer.future; | 872 Future get future => _completer.future; |
572 | 873 |
573 bool update(Queue<Result> events, bool isDone) { | 874 bool update(QueueList<Result<T>> events, bool isDone) { |
574 if (_streamQueue._isDone) { | 875 if (_streamQueue._isDone) { |
575 _completer.complete(); | 876 _completer.complete(); |
576 } else { | 877 } else { |
577 _streamQueue._ensureListening(); | 878 _streamQueue._ensureListening(); |
578 _completer.complete(_streamQueue._extractStream().listen(null).cancel()); | 879 _completer.complete(_streamQueue._extractStream().listen(null).cancel()); |
579 } | 880 } |
580 return true; | 881 return true; |
581 } | 882 } |
582 } | 883 } |
583 | 884 |
584 /// Request for a [StreamQueue.rest] call. | 885 /// Request for a [StreamQueue.rest] call. |
585 /// | 886 /// |
586 /// The request is always complete, it just waits in the request queue | 887 /// The request is always complete, it just waits in the request queue |
587 /// until all previous events are fulfilled, then it takes over the | 888 /// until all previous events are fulfilled, then it takes over the |
588 /// stream events subscription and creates a stream from it. | 889 /// stream events subscription and creates a stream from it. |
589 class _RestRequest<T> implements _EventRequest { | 890 class _RestRequest<T> implements _EventRequest<T> { |
590 /// Completer for the stream returned by the `rest` call. | 891 /// Completer for the stream returned by the `rest` call. |
591 final StreamCompleter _completer = new StreamCompleter<T>(); | 892 final _completer = new StreamCompleter<T>(); |
592 | 893 |
593 /// The [StreamQueue] object that has this request queued. | 894 /// The [StreamQueue] object that has this request queued. |
594 /// | 895 /// |
595 /// When the event is completed, it needs to cancel the active subscription | 896 /// When the event is completed, it needs to cancel the active subscription |
596 /// of the `StreamQueue` object, if any. | 897 /// of the `StreamQueue` object, if any. |
597 final StreamQueue _streamQueue; | 898 final StreamQueue<T> _streamQueue; |
598 | 899 |
599 _RestRequest(this._streamQueue); | 900 _RestRequest(this._streamQueue); |
600 | 901 |
601 /// The stream which will contain the remaining events of [_streamQueue]. | 902 /// The stream which will contain the remaining events of [_streamQueue]. |
602 Stream<T> get stream => _completer.stream; | 903 Stream<T> get stream => _completer.stream; |
603 | 904 |
604 bool update(Queue<Result> events, bool isDone) { | 905 bool update(QueueList<Result<T>> events, bool isDone) { |
605 if (events.isEmpty) { | 906 if (events.isEmpty) { |
606 if (_streamQueue._isDone) { | 907 if (_streamQueue._isDone) { |
607 _completer.setEmpty(); | 908 _completer.setEmpty(); |
608 } else { | 909 } else { |
609 _completer.setSourceStream(_streamQueue._extractStream()); | 910 _completer.setSourceStream(_streamQueue._extractStream()); |
610 } | 911 } |
611 } else { | 912 } else { |
612 // There are prefetched events which needs to be added before the | 913 // There are prefetched events which needs to be added before the |
613 // remaining stream. | 914 // remaining stream. |
614 var controller = new StreamController<T>(); | 915 var controller = new StreamController<T>(); |
615 for (var event in events) { | 916 for (var event in events) { |
616 event.addTo(controller); | 917 event.addTo(controller); |
617 } | 918 } |
618 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) | 919 controller |
619 .whenComplete(controller.close); | 920 .addStream(_streamQueue._extractStream(), cancelOnError: false) |
| 921 .whenComplete(controller.close); |
620 _completer.setSourceStream(controller.stream); | 922 _completer.setSourceStream(controller.stream); |
621 } | 923 } |
622 return true; | 924 return true; |
623 } | 925 } |
624 } | 926 } |
625 | 927 |
626 /// Request for a [StreamQueue.hasNext] call. | 928 /// Request for a [StreamQueue.hasNext] call. |
627 /// | 929 /// |
628 /// Completes the [future] with `true` if it sees any event, | 930 /// Completes the [future] with `true` if it sees any event, |
629 /// but doesn't consume the event. | 931 /// but doesn't consume the event. |
630 /// If the request is closed without seeing an event, then | 932 /// If the request is closed without seeing an event, then |
631 /// the [future] is completed with `false`. | 933 /// the [future] is completed with `false`. |
632 class _HasNextRequest<T> implements _EventRequest { | 934 class _HasNextRequest<T> implements _EventRequest<T> { |
633 final Completer _completer = new Completer<bool>(); | 935 final _completer = new Completer<bool>(); |
634 | 936 |
635 Future<bool> get future => _completer.future; | 937 Future<bool> get future => _completer.future; |
636 | 938 |
637 bool update(Queue<Result> events, bool isDone) { | 939 bool update(QueueList<Result<T>> events, bool isDone) { |
638 if (events.isNotEmpty) { | 940 if (events.isNotEmpty) { |
639 _completer.complete(true); | 941 _completer.complete(true); |
640 return true; | 942 return true; |
641 } | 943 } |
642 if (isDone) { | 944 if (isDone) { |
643 _completer.complete(false); | 945 _completer.complete(false); |
644 return true; | 946 return true; |
645 } | 947 } |
646 return false; | 948 return false; |
647 } | 949 } |
648 } | 950 } |
| 951 |
| 952 /// Request for a [StreamQueue.startTransaction] call. |
| 953 /// |
| 954 /// This request isn't complete until the user calls |
| 955 /// [StreamQueueTransaction.commit] or [StreamQueueTransaction.reject], at which |
| 956 /// point it manually removes itself from the request queue and calls |
| 957 /// [StreamQueue._updateRequests]. |
| 958 class _TransactionRequest<T> implements _EventRequest<T> { |
| 959 /// The transaction created by this request. |
| 960 StreamQueueTransaction<T> get transaction => _transaction; |
| 961 StreamQueueTransaction<T> _transaction; |
| 962 |
| 963 /// The controller that passes events to [transaction]. |
| 964 final _controller = new StreamController<T>(sync: true); |
| 965 |
| 966 /// The number of events passed to [_controller] so far. |
| 967 var _eventsSent = 0; |
| 968 |
| 969 _TransactionRequest(StreamQueue<T> parent) { |
| 970 _transaction = new StreamQueueTransaction._(parent, _controller.stream); |
| 971 } |
| 972 |
| 973 bool update(QueueList<Result<T>> events, bool isDone) { |
| 974 while (_eventsSent < events.length) { |
| 975 events[_eventsSent++].addTo(_controller); |
| 976 } |
| 977 if (isDone && !_controller.isClosed) _controller.close(); |
| 978 return false; |
| 979 } |
| 980 } |
OLD | NEW |