OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library watcher.async_queue; | 5 library watcher.async_queue; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 | 9 |
10 typedef Future ItemProcessor<T>(T item); | 10 typedef Future ItemProcessor<T>(T item); |
11 typedef void ErrorHandler(error); | |
Bob Nystrom
2013/11/12 21:31:07
Why remove this typedef?
nweiz
2013/11/12 21:54:29
dart:async now accepts either fn(error) or fn(erro
| |
12 | 11 |
13 /// A queue of items that are sequentially, asynchronously processed. | 12 /// A queue of items that are sequentially, asynchronously processed. |
14 /// | 13 /// |
15 /// Unlike [Stream.map] or [Stream.forEach], the callback used to process each | 14 /// Unlike [Stream.map] or [Stream.forEach], the callback used to process each |
16 /// item returns a [Future], and it will not advance to the next item until the | 15 /// item returns a [Future], and it will not advance to the next item until the |
17 /// current item is finished processing. | 16 /// current item is finished processing. |
18 /// | 17 /// |
19 /// Items can be added at any point in time and processing will be started as | 18 /// Items can be added at any point in time and processing will be started as |
20 /// needed. When all items are processed, it stops processing until more items | 19 /// needed. When all items are processed, it stops processing until more items |
21 /// are added. | 20 /// are added. |
22 class AsyncQueue<T> { | 21 class AsyncQueue<T> { |
23 final _items = new Queue<T>(); | 22 final _items = new Queue<T>(); |
24 | 23 |
25 /// Whether or not the queue is currently waiting on a processing future to | 24 /// Whether or not the queue is currently waiting on a processing future to |
26 /// complete. | 25 /// complete. |
27 bool _isProcessing = false; | 26 bool _isProcessing = false; |
28 | 27 |
29 /// The callback to invoke on each queued item. | 28 /// The callback to invoke on each queued item. |
30 /// | 29 /// |
31 /// The next item in the queue will not be processed until the [Future] | 30 /// The next item in the queue will not be processed until the [Future] |
32 /// returned by this completes. | 31 /// returned by this completes. |
33 final ItemProcessor<T> _processor; | 32 final ItemProcessor<T> _processor; |
34 | 33 |
35 /// The handler for errors thrown during processing. | 34 /// The handler for errors thrown during processing. |
36 /// | 35 /// |
37 /// Used to avoid top-leveling asynchronous errors. | 36 /// Used to avoid top-leveling asynchronous errors. |
38 final ErrorHandler _errorHandler; | 37 final Function _errorHandler; |
39 | 38 |
40 AsyncQueue(this._processor, {ErrorHandler onError}) | 39 AsyncQueue(this._processor, {Function onError}) |
41 : _errorHandler = onError; | 40 : _errorHandler = onError; |
42 | 41 |
43 /// Enqueues [item] to be processed and starts asynchronously processing it | 42 /// Enqueues [item] to be processed and starts asynchronously processing it |
44 /// if a process isn't already running. | 43 /// if a process isn't already running. |
45 void add(T item) { | 44 void add(T item) { |
46 _items.add(item); | 45 _items.add(item); |
47 | 46 |
48 // Start up the asynchronous processing if not already running. | 47 // Start up the asynchronous processing if not already running. |
49 if (_isProcessing) return; | 48 if (_isProcessing) return; |
50 _isProcessing = true; | 49 _isProcessing = true; |
(...skipping 14 matching lines...) Expand all Loading... | |
65 var item = _items.removeFirst(); | 64 var item = _items.removeFirst(); |
66 return _processor(item).then((_) { | 65 return _processor(item).then((_) { |
67 if (_items.isNotEmpty) return _processNextItem(); | 66 if (_items.isNotEmpty) return _processNextItem(); |
68 | 67 |
69 // We have drained the queue, stop processing and wait until something | 68 // We have drained the queue, stop processing and wait until something |
70 // has been enqueued. | 69 // has been enqueued. |
71 _isProcessing = false; | 70 _isProcessing = false; |
72 }); | 71 }); |
73 } | 72 } |
74 } | 73 } |
OLD | NEW |