Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(756)

Unified Diff: mojo/public/dart/third_party/watcher/lib/src/async_queue.dart

Issue 1346773002: Stop running pub get at gclient sync time and fix build bugs (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/public/dart/third_party/watcher/lib/src/async_queue.dart
diff --git a/mojo/public/dart/third_party/watcher/lib/src/async_queue.dart b/mojo/public/dart/third_party/watcher/lib/src/async_queue.dart
new file mode 100644
index 0000000000000000000000000000000000000000..b83493d733a977aa35e7341700c80b3b9981076a
--- /dev/null
+++ b/mojo/public/dart/third_party/watcher/lib/src/async_queue.dart
@@ -0,0 +1,73 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library watcher.async_queue;
+
+import 'dart:async';
+import 'dart:collection';
+
+typedef Future ItemProcessor<T>(T item);
+
+/// A queue of items that are sequentially, asynchronously processed.
+///
+/// Unlike [Stream.map] or [Stream.forEach], the callback used to process each
+/// item returns a [Future], and it will not advance to the next item until the
+/// current item is finished processing.
+///
+/// Items can be added at any point in time and processing will be started as
+/// needed. When all items are processed, it stops processing until more items
+/// are added.
+class AsyncQueue<T> {
+ final _items = new Queue<T>();
+
+ /// Whether or not the queue is currently waiting on a processing future to
+ /// complete.
+ bool _isProcessing = false;
+
+ /// The callback to invoke on each queued item.
+ ///
+ /// The next item in the queue will not be processed until the [Future]
+ /// returned by this completes.
+ final ItemProcessor<T> _processor;
+
+ /// The handler for errors thrown during processing.
+ ///
+ /// Used to avoid top-leveling asynchronous errors.
+ final Function _errorHandler;
+
+ AsyncQueue(this._processor, {Function onError})
+ : _errorHandler = onError;
+
+ /// Enqueues [item] to be processed and starts asynchronously processing it
+ /// if a process isn't already running.
+ void add(T item) {
+ _items.add(item);
+
+ // Start up the asynchronous processing if not already running.
+ if (_isProcessing) return;
+ _isProcessing = true;
+
+ _processNextItem().catchError(_errorHandler);
+ }
+
+ /// Removes all remaining items to be processed.
+ void clear() {
+ _items.clear();
+ }
+
+ /// Pulls the next item off [_items] and processes it.
+ ///
+ /// When complete, recursively calls itself to continue processing unless
+ /// the process was cancelled.
+ Future _processNextItem() {
+ var item = _items.removeFirst();
+ return _processor(item).then((_) {
+ if (_items.isNotEmpty) return _processNextItem();
+
+ // We have drained the queue, stop processing and wait until something
+ // has been enqueued.
+ _isProcessing = false;
+ });
+ }
+}

Powered by Google App Engine
This is Rietveld 408576698