Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(122)

Side by Side Diff: watcher/lib/src/async_queue.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « utf/pubspec.yaml ('k') | watcher/lib/src/constructable_file_system_event.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library watcher.async_queue;
6
7 import 'dart:async';
8 import 'dart:collection';
9
10 typedef Future ItemProcessor<T>(T item);
11
12 /// A queue of items that are sequentially, asynchronously processed.
13 ///
14 /// Unlike [Stream.map] or [Stream.forEach], the callback used to process each
15 /// item returns a [Future], and it will not advance to the next item until the
16 /// current item is finished processing.
17 ///
18 /// Items can be added at any point in time and processing will be started as
19 /// needed. When all items are processed, it stops processing until more items
20 /// are added.
21 class AsyncQueue<T> {
22 final _items = new Queue<T>();
23
24 /// Whether or not the queue is currently waiting on a processing future to
25 /// complete.
26 bool _isProcessing = false;
27
28 /// The callback to invoke on each queued item.
29 ///
30 /// The next item in the queue will not be processed until the [Future]
31 /// returned by this completes.
32 final ItemProcessor<T> _processor;
33
34 /// The handler for errors thrown during processing.
35 ///
36 /// Used to avoid top-leveling asynchronous errors.
37 final Function _errorHandler;
38
39 AsyncQueue(this._processor, {Function onError})
40 : _errorHandler = onError;
41
42 /// Enqueues [item] to be processed and starts asynchronously processing it
43 /// if a process isn't already running.
44 void add(T item) {
45 _items.add(item);
46
47 // Start up the asynchronous processing if not already running.
48 if (_isProcessing) return;
49 _isProcessing = true;
50
51 _processNextItem().catchError(_errorHandler);
52 }
53
54 /// Removes all remaining items to be processed.
55 void clear() {
56 _items.clear();
57 }
58
59 /// Pulls the next item off [_items] and processes it.
60 ///
61 /// When complete, recursively calls itself to continue processing unless
62 /// the process was cancelled.
63 Future _processNextItem() {
64 var item = _items.removeFirst();
65 return _processor(item).then((_) {
66 if (_items.isNotEmpty) return _processNextItem();
67
68 // We have drained the queue, stop processing and wait until something
69 // has been enqueued.
70 _isProcessing = false;
71 });
72 }
73 }
OLDNEW
« no previous file with comments | « utf/pubspec.yaml ('k') | watcher/lib/src/constructable_file_system_event.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698