Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(102)

Side by Side Diff: pkg/watcher/lib/src/async_queue.dart

Issue 21628002: Re-implement directory polling. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Split out async queue. Created 7 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/pkg.status ('k') | pkg/watcher/lib/src/directory_watcher.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library watcher.async_queue;
6
7 import 'dart:async';
8 import 'dart:collection';
9
10 typedef Future ItemProcessor<T>(T item);
11 typedef void ErrorHandler(error);
12
13 /// A queue of items that are sequentially, asynchronously processed.
14 ///
15 /// Unlike [Stream.map] or [Stream.forEach], the callback used to process each
16 /// item returns a [Future], and it will not advance to the next item until the
17 /// current item is finished processing.
18 ///
19 /// Items can be added at any point in time and processing will be started as
20 /// needed. When all items are processed, it stops processing until more items
21 /// are added.
22 class AsyncQueue<T> {
23 final _items = new Queue<T>();
24
25 /// Whether or not the queue is currently waiting on a processing future to
26 /// complete.
27 bool _isProcessing = false;
28
29 /// The callback to invoke on each queued item.
30 ///
31 /// The next item in the queue will not be processed until the [Future]
32 /// returned by this completes.
33 final ItemProcessor<T> _processor;
34
35 /// The handler for errors thrown during processing.
36 ///
37 /// Used to avoid top-leveling asynchronous errors.
38 final ErrorHandler _errorHandler;
39
40 AsyncQueue(this._processor, {ErrorHandler onError})
41 : _errorHandler = onError;
42
43 /// Enqueues [item] to be processed and starts asynchronously processing it
44 /// if a process isn't already running.
45 void add(T item) {
46 _items.add(item);
47
48 // Start up the asynchronous processing if not already running.
49 if (_isProcessing) return;
50 _isProcessing = true;
51
52 _processNextItem().catchError(_errorHandler);
53 }
54
55 /// Removes all remaining items to be processed.
56 void clear() {
57 _items.clear();
58 }
59
60 /// Pulls the next item off [_items] and processes it.
61 ///
62 /// When complete, recursively calls itself to continue processing unless
63 /// the process was cancelled.
64 Future _processNextItem() {
65 var item = _items.removeFirst();
66 return _processor(item).then((_) {
67 if (_items.isNotEmpty) return _processNextItem();
68
69 // We have drained the queue, stop processing and wait until something
70 // has been enqueued.
71 _isProcessing = false;
72 });
73 }
74 }
OLDNEW
« no previous file with comments | « pkg/pkg.status ('k') | pkg/watcher/lib/src/directory_watcher.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698