| Index: mojo/public/dart/third_party/watcher/lib/src/async_queue.dart
|
| diff --git a/mojo/public/dart/third_party/watcher/lib/src/async_queue.dart b/mojo/public/dart/third_party/watcher/lib/src/async_queue.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..b83493d733a977aa35e7341700c80b3b9981076a
|
| --- /dev/null
|
| +++ b/mojo/public/dart/third_party/watcher/lib/src/async_queue.dart
|
| @@ -0,0 +1,73 @@
|
| +// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +library watcher.async_queue;
|
| +
|
| +import 'dart:async';
|
| +import 'dart:collection';
|
| +
|
| +typedef Future ItemProcessor<T>(T item);
|
| +
|
| +/// A queue of items that are sequentially, asynchronously processed.
|
| +///
|
| +/// Unlike [Stream.map] or [Stream.forEach], the callback used to process each
|
| +/// item returns a [Future], and it will not advance to the next item until the
|
| +/// current item is finished processing.
|
| +///
|
| +/// Items can be added at any point in time and processing will be started as
|
| +/// needed. When all items are processed, it stops processing until more items
|
| +/// are added.
|
| +class AsyncQueue<T> {
|
| + final _items = new Queue<T>();
|
| +
|
| + /// Whether or not the queue is currently waiting on a processing future to
|
| + /// complete.
|
| + bool _isProcessing = false;
|
| +
|
| + /// The callback to invoke on each queued item.
|
| + ///
|
| + /// The next item in the queue will not be processed until the [Future]
|
| + /// returned by this completes.
|
| + final ItemProcessor<T> _processor;
|
| +
|
| + /// The handler for errors thrown during processing.
|
| + ///
|
| + /// Used to avoid top-leveling asynchronous errors.
|
| + final Function _errorHandler;
|
| +
|
| + AsyncQueue(this._processor, {Function onError})
|
| + : _errorHandler = onError;
|
| +
|
| + /// Enqueues [item] to be processed and starts asynchronously processing it
|
| + /// if a process isn't already running.
|
| + void add(T item) {
|
| + _items.add(item);
|
| +
|
| + // Start up the asynchronous processing if not already running.
|
| + if (_isProcessing) return;
|
| + _isProcessing = true;
|
| +
|
| + _processNextItem().catchError(_errorHandler);
|
| + }
|
| +
|
| + /// Removes all remaining items to be processed.
|
| + void clear() {
|
| + _items.clear();
|
| + }
|
| +
|
| + /// Pulls the next item off [_items] and processes it.
|
| + ///
|
| + /// When complete, recursively calls itself to continue processing unless
|
| + /// the process was cancelled.
|
| + Future _processNextItem() {
|
| + var item = _items.removeFirst();
|
| + return _processor(item).then((_) {
|
| + if (_items.isNotEmpty) return _processNextItem();
|
| +
|
| + // We have drained the queue, stop processing and wait until something
|
| + // has been enqueued.
|
| + _isProcessing = false;
|
| + });
|
| + }
|
| +}
|
|
|