OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library watcher.async_queue; | |
6 | |
7 import 'dart:async'; | |
8 import 'dart:collection'; | |
9 | |
10 typedef Future ItemProcessor<T>(T item); | |
11 | |
12 /// A queue of items that are sequentially, asynchronously processed. | |
13 /// | |
14 /// Unlike [Stream.map] or [Stream.forEach], the callback used to process each | |
15 /// item returns a [Future], and it will not advance to the next item until the | |
16 /// current item is finished processing. | |
17 /// | |
18 /// Items can be added at any point in time and processing will be started as | |
19 /// needed. When all items are processed, it stops processing until more items | |
20 /// are added. | |
21 class AsyncQueue<T> { | |
22 final _items = new Queue<T>(); | |
23 | |
24 /// Whether or not the queue is currently waiting on a processing future to | |
25 /// complete. | |
26 bool _isProcessing = false; | |
27 | |
28 /// The callback to invoke on each queued item. | |
29 /// | |
30 /// The next item in the queue will not be processed until the [Future] | |
31 /// returned by this completes. | |
32 final ItemProcessor<T> _processor; | |
33 | |
34 /// The handler for errors thrown during processing. | |
35 /// | |
36 /// Used to avoid top-leveling asynchronous errors. | |
37 final Function _errorHandler; | |
38 | |
39 AsyncQueue(this._processor, {Function onError}) | |
40 : _errorHandler = onError; | |
41 | |
42 /// Enqueues [item] to be processed and starts asynchronously processing it | |
43 /// if a process isn't already running. | |
44 void add(T item) { | |
45 _items.add(item); | |
46 | |
47 // Start up the asynchronous processing if not already running. | |
48 if (_isProcessing) return; | |
49 _isProcessing = true; | |
50 | |
51 _processNextItem().catchError(_errorHandler); | |
52 } | |
53 | |
54 /// Removes all remaining items to be processed. | |
55 void clear() { | |
56 _items.clear(); | |
57 } | |
58 | |
59 /// Pulls the next item off [_items] and processes it. | |
60 /// | |
61 /// When complete, recursively calls itself to continue processing unless | |
62 /// the process was cancelled. | |
63 Future _processNextItem() { | |
64 var item = _items.removeFirst(); | |
65 return _processor(item).then((_) { | |
66 if (_items.isNotEmpty) return _processNextItem(); | |
67 | |
68 // We have drained the queue, stop processing and wait until something | |
69 // has been enqueued. | |
70 _isProcessing = false; | |
71 }); | |
72 } | |
73 } | |
OLD | NEW |