Index: pkg/watcher/lib/src/directory_watcher.dart |
diff --git a/pkg/watcher/lib/src/directory_watcher.dart b/pkg/watcher/lib/src/directory_watcher.dart |
index 61bf6c52a2e77681160379813f7ecb4405d4c783..913101b60be9f508099e35f5121fe0a9121ac0bf 100644 |
--- a/pkg/watcher/lib/src/directory_watcher.dart |
+++ b/pkg/watcher/lib/src/directory_watcher.dart |
@@ -5,10 +5,12 @@ |
library watcher.directory_watcher; |
import 'dart:async'; |
+import 'dart:collection'; |
import 'dart:io'; |
import 'package:crypto/crypto.dart'; |
+import 'async_queue.dart'; |
import 'stat.dart'; |
import 'watch_event.dart'; |
@@ -27,7 +29,7 @@ class DirectoryWatcher { |
Stream<WatchEvent> get events => _events.stream; |
StreamController<WatchEvent> _events; |
- _WatchState _state = _WatchState.notWatching; |
+ _WatchState _state = _WatchState.UNSUBSCRIBED; |
/// A [Future] that completes when the watcher is initialized and watching |
/// for file changes. |
@@ -51,6 +53,26 @@ class DirectoryWatcher { |
/// Used to tell which files have been modified. |
final _statuses = new Map<String, _FileStatus>(); |
+ /// The subscription used while [directory] is being listed. |
+ /// |
+ /// Will be `null` if a list is not currently happening. |
+ StreamSubscription<FileSystemEntity> _listSubscription; |
+ |
+ /// The queue of files waiting to be processed to see if they have been |
+ /// modified. |
+ /// |
+ /// Processing a file is asynchronous, as is listing the directory, so the |
+ /// queue exists to let each of those proceed at their own rate. The lister |
+ /// will enqueue files as quickly as it can. Meanwhile, files are dequeued |
+ /// and processed sequentially. |
+ AsyncQueue<String> _filesToProcess; |
+ |
+ /// The set of files that have been seen in the current directory listing. |
+ /// |
+ /// Used to tell which files have been removed: files that are in [_statuses] |
+ /// but not in here when a poll completes have been removed. |
+ final _polledFiles = new Set<String>(); |
+ |
/// Creates a new [DirectoryWatcher] monitoring [directory]. |
/// |
/// If [pollingDelay] is passed, it specifies the amount of time the watcher |
@@ -60,82 +82,133 @@ class DirectoryWatcher { |
DirectoryWatcher(this.directory, {Duration pollingDelay}) |
: pollingDelay = pollingDelay != null ? pollingDelay : |
new Duration(seconds: 1) { |
- _events = new StreamController<WatchEvent>.broadcast(onListen: () { |
- _state = _state.listen(this); |
- }, onCancel: () { |
- _state = _state.cancel(this); |
- }); |
+ _events = new StreamController<WatchEvent>.broadcast( |
+ onListen: _watch, onCancel: _cancel); |
+ |
+ _filesToProcess = new AsyncQueue<String>(_processFile, |
+ onError: _events.addError); |
} |
- /// Starts the asynchronous polling process. |
- /// |
- /// Scans the contents of the directory and compares the results to the |
- /// previous scan. Loops to continue monitoring as long as there are |
- /// subscribers to the [events] stream. |
- Future _watch() { |
- var files = new Set<String>(); |
+ /// Scans to see which files were already present before the watcher was |
+ /// subscribed to, and then starts watching the directory for changes. |
+ void _watch() { |
+ assert(_state == _WatchState.UNSUBSCRIBED); |
+ _state = _WatchState.SCANNING; |
+ _poll(); |
+ } |
- var stream = new Directory(directory).list(recursive: true); |
+ /// Stops watching the directory when there are no more subscribers. |
+ void _cancel() { |
+ assert(_state != _WatchState.UNSUBSCRIBED); |
+ _state = _WatchState.UNSUBSCRIBED; |
- return stream.map((entity) { |
- if (entity is! File) return new Future.value(); |
- files.add(entity.path); |
- // TODO(rnystrom): These all run as fast as possible and read the |
- // contents of the files. That means there's a pretty big IO hit all at |
- // once. Maybe these should be queued up and rate limited? |
- return _refreshFile(entity.path); |
- }).toList().then((futures) { |
- // Once the listing is done, make sure to wait until each file is also |
- // done. |
- return Future.wait(futures); |
- }).then((_) { |
- var removedFiles = _statuses.keys.toSet().difference(files); |
- for (var removed in removedFiles) { |
- if (_state.shouldNotify) { |
- _events.add(new WatchEvent(ChangeType.REMOVE, removed)); |
- } |
- _statuses.remove(removed); |
- } |
+ // If we're in the middle of listing the directory, stop. |
+ if (_listSubscription != null) _listSubscription.cancel(); |
- var previousState = _state; |
- _state = _state.finish(this); |
+ // Don't process any remaining files. |
+ _filesToProcess.clear(); |
+ _polledFiles.clear(); |
+ _statuses.clear(); |
- // If we were already sending notifications, add a bit of delay before |
- // restarting just so that we don't whale on the file system. |
- // TODO(rnystrom): Tune this and/or make it tunable? |
- if (_state.shouldNotify) { |
- return new Future.delayed(pollingDelay); |
- } |
- }).then((_) { |
- // Make sure we haven't transitioned to a non-watching state during the |
- // delay. |
- if (_state.shouldWatch) _watch(); |
+ _ready = new Completer(); |
+ } |
+ |
+ /// Scans the contents of the directory once to see which files have been |
+ /// added, removed, and modified. |
+ void _poll() { |
+ _filesToProcess.clear(); |
+ _polledFiles.clear(); |
+ |
+ var stream = new Directory(directory).list(recursive: true); |
+ _listSubscription = stream.listen((entity) { |
+ assert(_state != _WatchState.UNSUBSCRIBED); |
+ |
+ if (entity is! File) return; |
+ _filesToProcess.add(entity.path); |
+ }, onDone: () { |
+ assert(_state != _WatchState.UNSUBSCRIBED); |
+ _listSubscription = null; |
+ |
+ // Null tells the queue consumer that we're done listing. |
+ _filesToProcess.add(null); |
}); |
} |
- /// Compares the current state of the file at [path] to the state it was in |
- /// the last time it was scanned. |
- Future _refreshFile(String path) { |
- return getModificationTime(path).then((modified) { |
- var lastStatus = _statuses[path]; |
+ /// Processes [file] to determine if it has been modified since the last |
+ /// time it was scanned. |
+ Future _processFile(String file) { |
+ assert(_state != _WatchState.UNSUBSCRIBED); |
+ |
+ // `null` is the sentinel which means the directory listing is complete. |
+ if (file == null) return _completePoll(); |
+ |
+ return getModificationTime(file).then((modified) { |
+ if (_checkForCancel()) return; |
+ |
+ var lastStatus = _statuses[file]; |
- // If it's modification time hasn't changed, assume the file is unchanged. |
- if (lastStatus != null && lastStatus.modified == modified) return; |
+ // If its modification time hasn't changed, assume the file is unchanged. |
+ if (lastStatus != null && lastStatus.modified == modified) { |
+ // The file is still here. |
+ _polledFiles.add(file); |
+ return; |
+ } |
+ |
+ return _hashFile(file).then((hash) { |
+ if (_checkForCancel()) return; |
- return _hashFile(path).then((hash) { |
var status = new _FileStatus(modified, hash); |
- _statuses[path] = status; |
- |
- // Only notify if the file contents changed. |
- if (_state.shouldNotify && |
- (lastStatus == null || !_sameHash(lastStatus.hash, hash))) { |
- var change = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; |
- _events.add(new WatchEvent(change, path)); |
- } |
+ _statuses[file] = status; |
+ _polledFiles.add(file); |
+ |
+ // Only notify while in the watching state. |
+ if (_state != _WatchState.WATCHING) return; |
+ |
+ // And the file is different. |
+ var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); |
+ if (!changed) return; |
+ |
+ var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; |
+ _events.add(new WatchEvent(type, file)); |
}); |
}); |
} |
+ /// After the directory listing is complete, this determines which files were |
+ /// removed and then restarts the next poll. |
+ Future _completePoll() { |
+ // Any files that were not seen in the last poll but that we have a |
+ // status for must have been removed. |
+ var removedFiles = _statuses.keys.toSet().difference(_polledFiles); |
+ for (var removed in removedFiles) { |
+ if (_state == _WatchState.WATCHING) { |
+ _events.add(new WatchEvent(ChangeType.REMOVE, removed)); |
+ } |
+ _statuses.remove(removed); |
+ } |
+ |
+ if (_state == _WatchState.SCANNING) { |
+ _state = _WatchState.WATCHING; |
+ _ready.complete(); |
+ } |
+ |
+ // Wait and then poll again. |
+ return new Future.delayed(pollingDelay).then((_) { |
+ if (_checkForCancel()) return; |
+ _poll(); |
+ }); |
+ } |
+ |
+ /// Returns `true` and clears the processing queue if the watcher has been |
+ /// unsubscribed. |
+ bool _checkForCancel() { |
+ if (_state != _WatchState.UNSUBSCRIBED) return false; |
+ |
+ // Don't process any more files. |
+ _filesToProcess.clear(); |
+ return true; |
+ } |
+ |
/// Calculates the SHA-1 hash of the file at [path]. |
Future<List<int>> _hashFile(String path) { |
return new File(path).readAsBytes().then((bytes) { |
@@ -159,71 +232,29 @@ class DirectoryWatcher { |
} |
} |
-/// An "event" that is sent to the [_WatchState] FSM to trigger state |
-/// transitions. |
-typedef _WatchState _WatchStateEvent(DirectoryWatcher watcher); |
- |
-/// The different states that the watcher can be in and the transitions between |
-/// them. |
-/// |
-/// This class defines a finite state machine for keeping track of what the |
-/// asynchronous file polling is doing. Each instance of this is a state in the |
-/// machine and its [listen], [cancel], and [finish] fields define the state |
-/// transitions when those events occur. |
+/// Enum class for the states that the [DirectoryWatcher] can be in. |
class _WatchState { |
- /// The watcher has no subscribers. |
- static final notWatching = new _WatchState( |
- listen: (watcher) { |
- watcher._watch(); |
- return _WatchState.scanning; |
- }); |
- |
- /// The watcher has subscribers and is scanning for pre-existing files. |
- static final scanning = new _WatchState( |
- cancel: (watcher) { |
- // No longer watching, so create a new incomplete ready future. |
- watcher._ready = new Completer(); |
- return _WatchState.cancelling; |
- }, finish: (watcher) { |
- watcher._ready.complete(); |
- return _WatchState.watching; |
- }, shouldWatch: true); |
- |
- /// The watcher was unsubscribed while polling and we're waiting for the poll |
- /// to finish. |
- static final cancelling = new _WatchState( |
- listen: (_) => _WatchState.scanning, |
- finish: (_) => _WatchState.notWatching); |
- |
- /// The watcher has subscribers, we have scanned for pre-existing files and |
- /// now we're polling for changes. |
- static final watching = new _WatchState( |
- cancel: (watcher) { |
- // No longer watching, so create a new incomplete ready future. |
- watcher._ready = new Completer(); |
- return _WatchState.cancelling; |
- }, finish: (_) => _WatchState.watching, |
- shouldWatch: true, shouldNotify: true); |
- |
- /// Called when the first subscriber to the watcher has been added. |
- final _WatchStateEvent listen; |
- |
- /// Called when all subscriptions on the watcher have been cancelled. |
- final _WatchStateEvent cancel; |
- |
- /// Called when a poll loop has finished. |
- final _WatchStateEvent finish; |
- |
- /// If the directory watcher should be watching the file system while in |
- /// this state. |
- final bool shouldWatch; |
- |
- /// If a change event should be sent for a file modification while in this |
- /// state. |
- final bool shouldNotify; |
- |
- _WatchState({this.listen, this.cancel, this.finish, |
- this.shouldWatch: false, this.shouldNotify: false}); |
+ /// There are no subscribers to the watcher's event stream and no watching |
+ /// is going on. |
+ static const UNSUBSCRIBED = const _WatchState("unsubscribed"); |
+ |
+ /// There are subscribers and the watcher is doing an initial scan of the |
+ /// directory to see which files were already present before watching started. |
+ /// |
+ /// The watcher does not send notifications for changes that occurred while |
+ /// there were no subscribers, or for files already present before watching. |
+ /// The initial scan is used to determine what "before watching" state of |
+ /// the file system was. |
+ static const SCANNING = const _WatchState("scanning"); |
+ |
+ /// There are subscribers and the watcher is polling the directory to look |
+ /// for changes. |
+ static const WATCHING = const _WatchState("watching"); |
+ |
+ /// The name of the state. |
+ final String name; |
+ |
+ const _WatchState(this.name); |
} |
class _FileStatus { |