Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(649)

Unified Diff: pkg/watcher/lib/src/directory_watcher.dart

Issue 21628002: Re-implement directory polling. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | pkg/watcher/test/no_subscription_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: pkg/watcher/lib/src/directory_watcher.dart
diff --git a/pkg/watcher/lib/src/directory_watcher.dart b/pkg/watcher/lib/src/directory_watcher.dart
index 61bf6c52a2e77681160379813f7ecb4405d4c783..f08dc198fc635cd025f7ba1af92a1edae8e29391 100644
--- a/pkg/watcher/lib/src/directory_watcher.dart
+++ b/pkg/watcher/lib/src/directory_watcher.dart
@@ -5,6 +5,7 @@
library watcher.directory_watcher;
import 'dart:async';
+import 'dart:collection';
import 'dart:io';
import 'package:crypto/crypto.dart';
@@ -27,7 +28,7 @@ class DirectoryWatcher {
Stream<WatchEvent> get events => _events.stream;
StreamController<WatchEvent> _events;
- _WatchState _state = _WatchState.notWatching;
+ _WatchState _state = _WatchState.UNSUBSCRIBED;
/// A [Future] that completes when the watcher is initialized and watching
/// for file changes.
@@ -51,6 +52,30 @@ class DirectoryWatcher {
/// Used to tell which files have been modified.
final _statuses = new Map<String, _FileStatus>();
+ /// The subscription used while [directory] is being listed.
+ ///
+ /// Will be `null` if a list is not currently happening.
+ StreamSubscription<FileSystemEntity> _listSubscription;
+
+ /// This will be `true` if we are currently asynchronously processing files
+ /// from [_filesToCheck].
nweiz 2013/08/01 22:52:39 "_filesToCheck" -> "_filesToProcess"
Bob Nystrom 2013/08/02 17:26:29 Done.
+ bool _isProcessing = false;
+
+ /// The queue of files waiting to be processed to see if they have been
+ /// modified.
+ ///
+ /// Processing a file is asynchronous, as is listing the directory, so the
+ /// queue exists to let each of those proceed at their own rate. The lister
+ /// will enqueue files as quickly as it can. Meanwhile, files are dequeued
+ /// and processed sequentially.
+ final _filesToProcess = new Queue<String>();
nweiz 2013/08/01 22:52:39 See how this looks with a FutureGroup instead of a
Bob Nystrom 2013/08/02 17:26:29 I started going in that direction but it looks lik
nweiz 2013/08/02 20:07:52 SGTM. Can you abstract out the notion of a sequent
Bob Nystrom 2013/08/02 21:45:11 Done.
+
+ /// The set of files that have been seen in the current directory listing.
+ ///
+ /// Used to tell which files have been removed: files that are in [_statuses]
+ /// but not in here when a poll completes have been removed.
+ final _polledFiles = new Set<String>();
+
/// Creates a new [DirectoryWatcher] monitoring [directory].
///
/// If [pollingDelay] is passed, it specifies the amount of time the watcher
@@ -60,77 +85,147 @@ class DirectoryWatcher {
DirectoryWatcher(this.directory, {Duration pollingDelay})
: pollingDelay = pollingDelay != null ? pollingDelay :
new Duration(seconds: 1) {
- _events = new StreamController<WatchEvent>.broadcast(onListen: () {
- _state = _state.listen(this);
- }, onCancel: () {
- _state = _state.cancel(this);
- });
+ _events = new StreamController<WatchEvent>.broadcast(
+ onListen: _watch, onCancel: _cancel);
}
- /// Starts the asynchronous polling process.
- ///
- /// Scans the contents of the directory and compares the results to the
- /// previous scan. Loops to continue monitoring as long as there are
- /// subscribers to the [events] stream.
- Future _watch() {
- var files = new Set<String>();
+ /// Scans to see which files were already present before the watcher was
+ /// subscribed to, and then starts watching the directory for changes.
+ void _watch() {
+ assert(_state == _WatchState.UNSUBSCRIBED);
+ _state = _WatchState.SCANNING;
+ _poll();
+ }
+
+ /// Stops watching the directory when there are no more subscribers.
+ void _cancel() {
+ assert(_state != _WatchState.UNSUBSCRIBED);
+ _state = _WatchState.UNSUBSCRIBED;
+
+ // If we're in the middle of listing the directory, stop.
+ if (_listSubscription != null) _listSubscription.cancel();
+
+ // Don't process any remaining files.
+ _filesToProcess.clear();
+ _polledFiles.clear();
nweiz 2013/08/01 22:52:39 Also clear _statuses.
Bob Nystrom 2013/08/02 17:26:29 Done.
+
+ _ready = new Completer();
+ }
+
+ /// Scans the contents of the directory once to see which files have been
+ /// added, removed, and modified.
+ void _poll() {
+ _filesToProcess.clear();
+ _polledFiles.clear();
var stream = new Directory(directory).list(recursive: true);
+ _listSubscription = stream.listen((entity) {
+ assert(_state != _WatchState.UNSUBSCRIBED);
+
+ if (entity is! File) return;
+ _enqueueFile(entity.path);
+ }, onDone: () {
+ assert(_state != _WatchState.UNSUBSCRIBED);
+ _listSubscription = null;
+
+ // Null tells the queue consumer that we're done listing.
+ _enqueueFile(null);
+ });
+ }
- return stream.map((entity) {
- if (entity is! File) return new Future.value();
- files.add(entity.path);
- // TODO(rnystrom): These all run as fast as possible and read the
- // contents of the files. That means there's a pretty big IO hit all at
- // once. Maybe these should be queued up and rate limited?
- return _refreshFile(entity.path);
- }).toList().then((futures) {
- // Once the listing is done, make sure to wait until each file is also
- // done.
- return Future.wait(futures);
- }).then((_) {
- var removedFiles = _statuses.keys.toSet().difference(files);
- for (var removed in removedFiles) {
- if (_state.shouldNotify) {
- _events.add(new WatchEvent(ChangeType.REMOVE, removed));
+ /// Add [path] to the queue of files whose status needs to be processed.
+ void _enqueueFile(String path) {
+ _filesToProcess.add(path);
+
+ // Start up the asynchronous processing if not already running.
+ if (_isProcessing) return;
+ _isProcessing = true;
+
+ refreshNextFile() {
nweiz 2013/08/01 22:52:39 It feels like this warrants being its own method.
Bob Nystrom 2013/08/02 17:26:29 Done.
+ var file = _filesToProcess.removeFirst();
+
+ // `null` is the sentinel which means the directory listing is complete.
+ if (file == null) {
+ _isProcessing = false;
+
+ // Any files that were not seen in the last poll but that we have a
+ // status for must have been removed.
+ var removedFiles = _statuses.keys.toSet().difference(_polledFiles);
+ for (var removed in removedFiles) {
+ if (_state == _WatchState.WATCHING) {
+ _events.add(new WatchEvent(ChangeType.REMOVE, removed));
+ }
+ _statuses.remove(removed);
+ }
+
+ assert(_state != _WatchState.UNSUBSCRIBED);
nweiz 2013/08/01 22:52:39 This should be at the top of [refreshNextFile].
Bob Nystrom 2013/08/02 17:26:29 Done.
+ if (_state == _WatchState.SCANNING) {
+ _state = _WatchState.WATCHING;
+ print("done scanning, watching");
nweiz 2013/08/01 22:52:39 Left over from debugging?
Bob Nystrom 2013/08/02 17:26:29 Yup. Done.
+ _ready.complete();
}
- _statuses.remove(removed);
- }
- var previousState = _state;
- _state = _state.finish(this);
+ // Wait.
+ return new Future.delayed(pollingDelay).then((_) {
+ // Stop if we unsubscribed while waiting.
+ if (_state == _WatchState.UNSUBSCRIBED) return;
- // If we were already sending notifications, add a bit of delay before
- // restarting just so that we don't whale on the file system.
- // TODO(rnystrom): Tune this and/or make it tunable?
- if (_state.shouldNotify) {
- return new Future.delayed(pollingDelay);
+ // And then poll again.
+ _poll();
+ });
}
- }).then((_) {
- // Make sure we haven't transitioned to a non-watching state during the
- // delay.
- if (_state.shouldWatch) _watch();
+
+ return _processFile(file).then((_) {
+ // Stop if we unsubscribed while waiting.
+ if (_state == _WatchState.UNSUBSCRIBED) return;
+
+ // If we have drained the queue (i.e. we are processing faster than
+ // we are listing files), then stop processing and wait until something
+ // has been enqueued.
+ if (_filesToProcess.isEmpty) {
+ _isProcessing = false;
+ return;
+ }
+
+ return refreshNextFile();
+ });
+ }
+
+ // Pipe all errors to the notification stream.
+ refreshNextFile().catchError((error) {
+ _events.addError(error, getAttachedStackTrace(error));
nweiz 2013/08/01 22:52:39 You don't need to manually forward the stack trace
Bob Nystrom 2013/08/02 17:26:29 Done.
});
}
- /// Compares the current state of the file at [path] to the state it was in
- /// the last time it was scanned.
- Future _refreshFile(String path) {
+ Future _processFile(String path) {
return getModificationTime(path).then((modified) {
+ // Stop if we unsubscribed while waiting.
+ if (_state == _WatchState.UNSUBSCRIBED) return;
+
var lastStatus = _statuses[path];
// If it's modification time hasn't changed, assume the file is unchanged.
nweiz 2013/08/01 22:52:39 "it's" -> "its"
Bob Nystrom 2013/08/02 17:26:29 Done.
- if (lastStatus != null && lastStatus.modified == modified) return;
+ if (lastStatus != null && lastStatus.modified == modified) {
+ // The file is still here.
+ _polledFiles.add(path);
+ return;
+ }
return _hashFile(path).then((hash) {
+ // Stop if we unsubscribed while waiting.
+ if (_state == _WatchState.UNSUBSCRIBED) return;
+
var status = new _FileStatus(modified, hash);
_statuses[path] = status;
-
- // Only notify if the file contents changed.
- if (_state.shouldNotify &&
- (lastStatus == null || !_sameHash(lastStatus.hash, hash))) {
- var change = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY;
- _events.add(new WatchEvent(change, path));
+ _polledFiles.add(path);
+
+ // Only notify while in the watching state.
+ if (_state == _WatchState.WATCHING) {
nweiz 2013/08/01 22:52:39 Short-circuit here and below.
Bob Nystrom 2013/08/02 17:26:29 Done.
+ var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash);
+ if (changed) {
+ var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY;
+ _events.add(new WatchEvent(type, path));
+ }
}
});
});
@@ -159,71 +254,14 @@ class DirectoryWatcher {
}
}
-/// An "event" that is sent to the [_WatchState] FSM to trigger state
-/// transitions.
-typedef _WatchState _WatchStateEvent(DirectoryWatcher watcher);
-
-/// The different states that the watcher can be in and the transitions between
-/// them.
-///
-/// This class defines a finite state machine for keeping track of what the
-/// asynchronous file polling is doing. Each instance of this is a state in the
-/// machine and its [listen], [cancel], and [finish] fields define the state
-/// transitions when those events occur.
class _WatchState {
- /// The watcher has no subscribers.
- static final notWatching = new _WatchState(
- listen: (watcher) {
- watcher._watch();
- return _WatchState.scanning;
- });
-
- /// The watcher has subscribers and is scanning for pre-existing files.
- static final scanning = new _WatchState(
- cancel: (watcher) {
- // No longer watching, so create a new incomplete ready future.
- watcher._ready = new Completer();
- return _WatchState.cancelling;
- }, finish: (watcher) {
- watcher._ready.complete();
- return _WatchState.watching;
- }, shouldWatch: true);
-
- /// The watcher was unsubscribed while polling and we're waiting for the poll
- /// to finish.
- static final cancelling = new _WatchState(
- listen: (_) => _WatchState.scanning,
- finish: (_) => _WatchState.notWatching);
-
- /// The watcher has subscribers, we have scanned for pre-existing files and
- /// now we're polling for changes.
- static final watching = new _WatchState(
- cancel: (watcher) {
- // No longer watching, so create a new incomplete ready future.
- watcher._ready = new Completer();
- return _WatchState.cancelling;
- }, finish: (_) => _WatchState.watching,
- shouldWatch: true, shouldNotify: true);
-
- /// Called when the first subscriber to the watcher has been added.
- final _WatchStateEvent listen;
-
- /// Called when all subscriptions on the watcher have been cancelled.
- final _WatchStateEvent cancel;
-
- /// Called when a poll loop has finished.
- final _WatchStateEvent finish;
-
- /// If the directory watcher should be watching the file system while in
- /// this state.
- final bool shouldWatch;
-
- /// If a change event should be sent for a file modification while in this
- /// state.
- final bool shouldNotify;
-
- _WatchState({this.listen, this.cancel, this.finish,
- this.shouldWatch: false, this.shouldNotify: false});
+ static const UNSUBSCRIBED = const _WatchState("unsubscribed");
+ static const SCANNING = const _WatchState("scanning");
+ static const WATCHING = const _WatchState("watching");
nweiz 2013/08/01 22:52:39 Add some docs about what each state means.
Bob Nystrom 2013/08/02 17:26:29 Done.
+
+ final String name;
+
+ const _WatchState(this.name);
}
class _FileStatus {
« no previous file with comments | « no previous file | pkg/watcher/test/no_subscription_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698