Index: pkg/watcher/lib/src/async_queue.dart |
diff --git a/pkg/watcher/lib/src/async_queue.dart b/pkg/watcher/lib/src/async_queue.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..9456631af12107687e6ca899a47a4d732210581f |
--- /dev/null |
+++ b/pkg/watcher/lib/src/async_queue.dart |
@@ -0,0 +1,74 @@ |
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library watcher.async_queue; |
+ |
+import 'dart:async'; |
+import 'dart:collection'; |
+ |
+typedef Future ItemProcessor<T>(T item); |
+typedef void ErrorHandler(error); |
+ |
+/// A queue of items that are sequentially, asynchronously processed. |
+/// |
+/// Unlike [Stream.map] or [Stream.forEach], the callback used to process each |
+/// item returns a [Future], and it will not advance to the next item until the |
+/// current item is finished processing. |
+/// |
+/// Items can be added at any point in time and processing will be started as |
+/// needed. When all items are processed, it stops processing until more items |
+/// are added. |
+class AsyncQueue<T> { |
+ final _items = new Queue<T>(); |
+ |
+ /// Whether or not the queue is currently waiting on a processing future to |
+ /// complete. |
+ bool _isProcessing = false; |
+ |
+ /// The callback to invoke on each queued item. |
+ /// |
+ /// The next item in the queue will not be processed until the [Future] |
+ /// returned by this completes. |
+ final ItemProcessor<T> _processor; |
+ |
+ /// The handler for errors thrown during processing. |
+ /// |
+ /// Used to avoid top-leveling asynchronous errors. |
+ final ErrorHandler _errorHandler; |
+ |
+ AsyncQueue(this._processor, {ErrorHandler onError}) |
+ : _errorHandler = onError; |
+ |
+ /// Enqueues [item] to be processed and starts asynchronously processing it |
+ /// if a process isn't already running. |
+ void add(T item) { |
+ _items.add(item); |
+ |
+ // Start up the asynchronous processing if not already running. |
+ if (_isProcessing) return; |
+ _isProcessing = true; |
+ |
+ _processNextItem().catchError(_errorHandler); |
+ } |
+ |
+ /// Removes all remaining items to be processed. |
+ void clear() { |
+ _items.clear(); |
+ } |
+ |
+ /// Pulls the next item off [_items] and processes it. |
+ /// |
+ /// When complete, recursively calls itself to continue processing unless |
+ /// the process was cancelled. |
+ Future _processNextItem() { |
+ var item = _items.removeFirst(); |
+ return _processor(item).then((_) { |
+ if (_items.isNotEmpty) return _processNextItem(); |
+ |
+ // We have drained the queue, stop processing and wait until something |
+ // has been enqueued. |
+ _isProcessing = false; |
+ }); |
+ } |
+} |