Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(426)

Unified Diff: pkg/watcher/lib/src/async_queue.dart

Issue 21628002: Re-implement directory polling. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Split out async queue. Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « pkg/pkg.status ('k') | pkg/watcher/lib/src/directory_watcher.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: pkg/watcher/lib/src/async_queue.dart
diff --git a/pkg/watcher/lib/src/async_queue.dart b/pkg/watcher/lib/src/async_queue.dart
new file mode 100644
index 0000000000000000000000000000000000000000..9456631af12107687e6ca899a47a4d732210581f
--- /dev/null
+++ b/pkg/watcher/lib/src/async_queue.dart
@@ -0,0 +1,74 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library watcher.async_queue;
+
+import 'dart:async';
+import 'dart:collection';
+
+typedef Future ItemProcessor<T>(T item);
+typedef void ErrorHandler(error);
+
+/// A queue of items that are sequentially, asynchronously processed.
+///
+/// Unlike [Stream.map] or [Stream.forEach], the callback used to process each
+/// item returns a [Future], and it will not advance to the next item until the
+/// current item is finished processing.
+///
+/// Items can be added at any point in time and processing will be started as
+/// needed. When all items are processed, it stops processing until more items
+/// are added.
+class AsyncQueue<T> {
+ final _items = new Queue<T>();
+
+ /// Whether or not the queue is currently waiting on a processing future to
+ /// complete.
+ bool _isProcessing = false;
+
+ /// The callback to invoke on each queued item.
+ ///
+ /// The next item in the queue will not be processed until the [Future]
+ /// returned by this completes.
+ final ItemProcessor<T> _processor;
+
+ /// The handler for errors thrown during processing.
+ ///
+ /// Used to avoid top-leveling asynchronous errors.
+ final ErrorHandler _errorHandler;
+
+ AsyncQueue(this._processor, {ErrorHandler onError})
+ : _errorHandler = onError;
+
+ /// Enqueues [item] to be processed and starts asynchronously processing it
+ /// if a process isn't already running.
+ void add(T item) {
+ _items.add(item);
+
+ // Start up the asynchronous processing if not already running.
+ if (_isProcessing) return;
+ _isProcessing = true;
+
+ _processNextItem().catchError(_errorHandler);
+ }
+
+ /// Removes all remaining items to be processed.
+ void clear() {
+ _items.clear();
+ }
+
+ /// Pulls the next item off [_items] and processes it.
+ ///
+ /// When complete, recursively calls itself to continue processing unless
+ /// the process was cancelled.
+ Future _processNextItem() {
+ var item = _items.removeFirst();
+ return _processor(item).then((_) {
+ if (_items.isNotEmpty) return _processNextItem();
+
+ // We have drained the queue, stop processing and wait until something
+ // has been enqueued.
+ _isProcessing = false;
+ });
+ }
+}
« no previous file with comments | « pkg/pkg.status ('k') | pkg/watcher/lib/src/directory_watcher.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698