| Index: watcher/lib/src/async_queue.dart
|
| diff --git a/watcher/lib/src/async_queue.dart b/watcher/lib/src/async_queue.dart
|
| deleted file mode 100644
|
| index b83493d733a977aa35e7341700c80b3b9981076a..0000000000000000000000000000000000000000
|
| --- a/watcher/lib/src/async_queue.dart
|
| +++ /dev/null
|
| @@ -1,73 +0,0 @@
|
| -// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
|
| -// for details. All rights reserved. Use of this source code is governed by a
|
| -// BSD-style license that can be found in the LICENSE file.
|
| -
|
| -library watcher.async_queue;
|
| -
|
| -import 'dart:async';
|
| -import 'dart:collection';
|
| -
|
| -typedef Future ItemProcessor<T>(T item);
|
| -
|
| -/// A queue of items that are sequentially, asynchronously processed.
|
| -///
|
| -/// Unlike [Stream.map] or [Stream.forEach], the callback used to process each
|
| -/// item returns a [Future], and it will not advance to the next item until the
|
| -/// current item is finished processing.
|
| -///
|
| -/// Items can be added at any point in time and processing will be started as
|
| -/// needed. When all items are processed, it stops processing until more items
|
| -/// are added.
|
| -class AsyncQueue<T> {
|
| - final _items = new Queue<T>();
|
| -
|
| - /// Whether or not the queue is currently waiting on a processing future to
|
| - /// complete.
|
| - bool _isProcessing = false;
|
| -
|
| - /// The callback to invoke on each queued item.
|
| - ///
|
| - /// The next item in the queue will not be processed until the [Future]
|
| - /// returned by this completes.
|
| - final ItemProcessor<T> _processor;
|
| -
|
| - /// The handler for errors thrown during processing.
|
| - ///
|
| - /// Used to avoid top-leveling asynchronous errors.
|
| - final Function _errorHandler;
|
| -
|
| - AsyncQueue(this._processor, {Function onError})
|
| - : _errorHandler = onError;
|
| -
|
| - /// Enqueues [item] to be processed and starts asynchronously processing it
|
| - /// if a process isn't already running.
|
| - void add(T item) {
|
| - _items.add(item);
|
| -
|
| - // Start up the asynchronous processing if not already running.
|
| - if (_isProcessing) return;
|
| - _isProcessing = true;
|
| -
|
| - _processNextItem().catchError(_errorHandler);
|
| - }
|
| -
|
| - /// Removes all remaining items to be processed.
|
| - void clear() {
|
| - _items.clear();
|
| - }
|
| -
|
| - /// Pulls the next item off [_items] and processes it.
|
| - ///
|
| - /// When complete, recursively calls itself to continue processing unless
|
| - /// the process was cancelled.
|
| - Future _processNextItem() {
|
| - var item = _items.removeFirst();
|
| - return _processor(item).then((_) {
|
| - if (_items.isNotEmpty) return _processNextItem();
|
| -
|
| - // We have drained the queue, stop processing and wait until something
|
| - // has been enqueued.
|
| - _isProcessing = false;
|
| - });
|
| - }
|
| -}
|
|
|