Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(353)

Side by Side Diff: pkg/watcher/lib/src/async_queue.dart

Issue 68253010: Be sure we don't forward an error to a closed event stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Un-mark the test as flaky Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/pkg.status ('k') | pkg/watcher/lib/src/directory_watcher/polling.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library watcher.async_queue; 5 library watcher.async_queue;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 9
10 typedef Future ItemProcessor<T>(T item); 10 typedef Future ItemProcessor<T>(T item);
11 typedef void ErrorHandler(error);
Bob Nystrom 2013/11/12 21:31:07 Why remove this typedef?
nweiz 2013/11/12 21:54:29 dart:async now accepts either fn(error) or fn(erro
12 11
13 /// A queue of items that are sequentially, asynchronously processed. 12 /// A queue of items that are sequentially, asynchronously processed.
14 /// 13 ///
15 /// Unlike [Stream.map] or [Stream.forEach], the callback used to process each 14 /// Unlike [Stream.map] or [Stream.forEach], the callback used to process each
16 /// item returns a [Future], and it will not advance to the next item until the 15 /// item returns a [Future], and it will not advance to the next item until the
17 /// current item is finished processing. 16 /// current item is finished processing.
18 /// 17 ///
19 /// Items can be added at any point in time and processing will be started as 18 /// Items can be added at any point in time and processing will be started as
20 /// needed. When all items are processed, it stops processing until more items 19 /// needed. When all items are processed, it stops processing until more items
21 /// are added. 20 /// are added.
22 class AsyncQueue<T> { 21 class AsyncQueue<T> {
23 final _items = new Queue<T>(); 22 final _items = new Queue<T>();
24 23
25 /// Whether or not the queue is currently waiting on a processing future to 24 /// Whether or not the queue is currently waiting on a processing future to
26 /// complete. 25 /// complete.
27 bool _isProcessing = false; 26 bool _isProcessing = false;
28 27
29 /// The callback to invoke on each queued item. 28 /// The callback to invoke on each queued item.
30 /// 29 ///
31 /// The next item in the queue will not be processed until the [Future] 30 /// The next item in the queue will not be processed until the [Future]
32 /// returned by this completes. 31 /// returned by this completes.
33 final ItemProcessor<T> _processor; 32 final ItemProcessor<T> _processor;
34 33
35 /// The handler for errors thrown during processing. 34 /// The handler for errors thrown during processing.
36 /// 35 ///
37 /// Used to avoid top-leveling asynchronous errors. 36 /// Used to avoid top-leveling asynchronous errors.
38 final ErrorHandler _errorHandler; 37 final Function _errorHandler;
39 38
40 AsyncQueue(this._processor, {ErrorHandler onError}) 39 AsyncQueue(this._processor, {Function onError})
41 : _errorHandler = onError; 40 : _errorHandler = onError;
42 41
43 /// Enqueues [item] to be processed and starts asynchronously processing it 42 /// Enqueues [item] to be processed and starts asynchronously processing it
44 /// if a process isn't already running. 43 /// if a process isn't already running.
45 void add(T item) { 44 void add(T item) {
46 _items.add(item); 45 _items.add(item);
47 46
48 // Start up the asynchronous processing if not already running. 47 // Start up the asynchronous processing if not already running.
49 if (_isProcessing) return; 48 if (_isProcessing) return;
50 _isProcessing = true; 49 _isProcessing = true;
(...skipping 14 matching lines...) Expand all
65 var item = _items.removeFirst(); 64 var item = _items.removeFirst();
66 return _processor(item).then((_) { 65 return _processor(item).then((_) {
67 if (_items.isNotEmpty) return _processNextItem(); 66 if (_items.isNotEmpty) return _processNextItem();
68 67
69 // We have drained the queue, stop processing and wait until something 68 // We have drained the queue, stop processing and wait until something
70 // has been enqueued. 69 // has been enqueued.
71 _isProcessing = false; 70 _isProcessing = false;
72 }); 71 });
73 } 72 }
74 } 73 }
OLDNEW
« no previous file with comments | « pkg/pkg.status ('k') | pkg/watcher/lib/src/directory_watcher/polling.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698