OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library watcher.async_queue; |
| 6 |
| 7 import 'dart:async'; |
| 8 import 'dart:collection'; |
| 9 |
| 10 typedef Future ItemProcessor<T>(T item); |
| 11 typedef void ErrorHandler(error); |
| 12 |
| 13 /// A queue of items that are sequentially, asynchronously processed. |
| 14 /// |
| 15 /// Unlike [Stream.map] or [Stream.forEach], the callback used to process each |
| 16 /// item returns a [Future], and it will not advance to the next item until the |
| 17 /// current item is finished processing. |
| 18 /// |
| 19 /// Items can be added at any point in time and processing will be started as |
| 20 /// needed. When all items are processed, it stops processing until more items |
| 21 /// are added. |
| 22 class AsyncQueue<T> { |
| 23 final _items = new Queue<T>(); |
| 24 |
| 25 /// Whether or not the queue is currently waiting on a processing future to |
| 26 /// complete. |
| 27 bool _isProcessing = false; |
| 28 |
| 29 /// The callback to invoke on each queued item. |
| 30 /// |
| 31 /// The next item in the queue will not be processed until the [Future] |
| 32 /// returned by this completes. |
| 33 final ItemProcessor<T> _processor; |
| 34 |
| 35 /// The handler for errors thrown during processing. |
| 36 /// |
| 37 /// Used to avoid top-leveling asynchronous errors. |
| 38 final ErrorHandler _errorHandler; |
| 39 |
| 40 AsyncQueue(this._processor, {ErrorHandler onError}) |
| 41 : _errorHandler = onError; |
| 42 |
| 43 /// Enqueues [item] to be processed and starts asynchronously processing it |
| 44 /// if a process isn't already running. |
| 45 void add(T item) { |
| 46 _items.add(item); |
| 47 |
| 48 // Start up the asynchronous processing if not already running. |
| 49 if (_isProcessing) return; |
| 50 _isProcessing = true; |
| 51 |
| 52 _processNextItem().catchError(_errorHandler); |
| 53 } |
| 54 |
| 55 /// Removes all remaining items to be processed. |
| 56 void clear() { |
| 57 _items.clear(); |
| 58 } |
| 59 |
| 60 /// Pulls the next item off [_items] and processes it. |
| 61 /// |
| 62 /// When complete, recursively calls itself to continue processing unless |
| 63 /// the process was cancelled. |
| 64 Future _processNextItem() { |
| 65 var item = _items.removeFirst(); |
| 66 return _processor(item).then((_) { |
| 67 if (_items.isNotEmpty) return _processNextItem(); |
| 68 |
| 69 // We have drained the queue, stop processing and wait until something |
| 70 // has been enqueued. |
| 71 _isProcessing = false; |
| 72 }); |
| 73 } |
| 74 } |
OLD | NEW |