Index: pkg/watcher/lib/src/directory_watcher.dart |
diff --git a/pkg/watcher/lib/src/directory_watcher.dart b/pkg/watcher/lib/src/directory_watcher.dart |
index 679e227e8d181cefb7124a272e416824d5b5bc8b..4484c2b77bf194254f18cd14d4e499b07ef0b097 100644 |
--- a/pkg/watcher/lib/src/directory_watcher.dart |
+++ b/pkg/watcher/lib/src/directory_watcher.dart |
@@ -7,18 +7,15 @@ library watcher.directory_watcher; |
import 'dart:async'; |
import 'dart:io'; |
-import 'package:crypto/crypto.dart'; |
- |
-import 'async_queue.dart'; |
-import 'stat.dart'; |
-import 'utils.dart'; |
import 'watch_event.dart'; |
+import 'directory_watcher/linux.dart'; |
+import 'directory_watcher/polling.dart'; |
/// Watches the contents of a directory and emits [WatchEvent]s when something |
/// in the directory has changed. |
-class DirectoryWatcher { |
+abstract class DirectoryWatcher { |
/// The directory whose contents are being monitored. |
- final String directory; |
+ String get directory; |
/// The broadcast [Stream] of events that have occurred to files in |
/// [directory]. |
@@ -26,10 +23,12 @@ class DirectoryWatcher { |
/// Changes will only be monitored while this stream has subscribers. Any |
/// file changes that occur during periods when there are no subscribers |
/// will not be reported the next time a subscriber is added. |
- Stream<WatchEvent> get events => _events.stream; |
- StreamController<WatchEvent> _events; |
+ Stream<WatchEvent> get events; |
- _WatchState _state = _WatchState.UNSUBSCRIBED; |
+ /// Whether the watcher is initialized and watching for file changes. |
+ /// |
+ /// This is true if and only if [ready] is complete. |
+ bool get isReady; |
/// A [Future] that completes when the watcher is initialized and watching |
/// for file changes. |
@@ -41,241 +40,20 @@ class DirectoryWatcher { |
/// |
/// If the watcher is already monitoring, this returns an already complete |
/// future. |
- Future get ready => _ready.future; |
- Completer _ready = new Completer(); |
- |
- /// The amount of time the watcher pauses between successive polls of the |
- /// directory contents. |
- final Duration pollingDelay; |
- |
- /// The previous status of the files in the directory. |
- /// |
- /// Used to tell which files have been modified. |
- final _statuses = new Map<String, _FileStatus>(); |
- |
- /// The subscription used while [directory] is being listed. |
- /// |
- /// Will be `null` if a list is not currently happening. |
- StreamSubscription<FileSystemEntity> _listSubscription; |
- |
- /// The queue of files waiting to be processed to see if they have been |
- /// modified. |
- /// |
- /// Processing a file is asynchronous, as is listing the directory, so the |
- /// queue exists to let each of those proceed at their own rate. The lister |
- /// will enqueue files as quickly as it can. Meanwhile, files are dequeued |
- /// and processed sequentially. |
- AsyncQueue<String> _filesToProcess; |
- |
- /// The set of files that have been seen in the current directory listing. |
- /// |
- /// Used to tell which files have been removed: files that are in [_statuses] |
- /// but not in here when a poll completes have been removed. |
- final _polledFiles = new Set<String>(); |
+ Future get ready; |
/// Creates a new [DirectoryWatcher] monitoring [directory]. |
/// |
- /// If [pollingDelay] is passed, it specifies the amount of time the watcher |
- /// will pause between successive polls of the directory contents. Making |
- /// this shorter will give more immediate feedback at the expense of doing |
- /// more IO and higher CPU usage. Defaults to one second. |
- DirectoryWatcher(this.directory, {Duration pollingDelay}) |
- : pollingDelay = pollingDelay != null ? pollingDelay : |
- new Duration(seconds: 1) { |
- _events = new StreamController<WatchEvent>.broadcast( |
- onListen: _watch, onCancel: _cancel); |
- |
- _filesToProcess = new AsyncQueue<String>(_processFile, |
- onError: _events.addError); |
- } |
- |
- /// Scans to see which files were already present before the watcher was |
- /// subscribed to, and then starts watching the directory for changes. |
- void _watch() { |
- assert(_state == _WatchState.UNSUBSCRIBED); |
- _state = _WatchState.SCANNING; |
- _poll(); |
- } |
- |
- /// Stops watching the directory when there are no more subscribers. |
- void _cancel() { |
- assert(_state != _WatchState.UNSUBSCRIBED); |
- _state = _WatchState.UNSUBSCRIBED; |
- |
- // If we're in the middle of listing the directory, stop. |
- if (_listSubscription != null) _listSubscription.cancel(); |
- |
- // Don't process any remaining files. |
- _filesToProcess.clear(); |
- _polledFiles.clear(); |
- _statuses.clear(); |
- |
- _ready = new Completer(); |
- } |
- |
- /// Scans the contents of the directory once to see which files have been |
- /// added, removed, and modified. |
- void _poll() { |
- _filesToProcess.clear(); |
- _polledFiles.clear(); |
- |
- endListing() { |
- assert(_state != _WatchState.UNSUBSCRIBED); |
- _listSubscription = null; |
- |
- // Null tells the queue consumer that we're done listing. |
- _filesToProcess.add(null); |
- } |
- |
- var stream = new Directory(directory).list(recursive: true); |
- _listSubscription = stream.listen((entity) { |
- assert(_state != _WatchState.UNSUBSCRIBED); |
- |
- if (entity is! File) return; |
- _filesToProcess.add(entity.path); |
- }, onError: (error, StackTrace stackTrace) { |
- if (!isDirectoryNotFoundException(error)) { |
- // It's some unknown error. Pipe it over to the event stream so the |
- // user can see it. |
- _events.addError(error, stackTrace); |
- } |
- |
- // When an error occurs, we end the listing normally, which has the |
- // desired effect of marking all files that were in the directory as |
- // being removed. |
- endListing(); |
- }, onDone: endListing, cancelOnError: true); |
- } |
- |
- /// Processes [file] to determine if it has been modified since the last |
- /// time it was scanned. |
- Future _processFile(String file) { |
- assert(_state != _WatchState.UNSUBSCRIBED); |
- |
- // `null` is the sentinel which means the directory listing is complete. |
- if (file == null) return _completePoll(); |
- |
- return getModificationTime(file).then((modified) { |
- if (_checkForCancel()) return null; |
- |
- var lastStatus = _statuses[file]; |
- |
- // If its modification time hasn't changed, assume the file is unchanged. |
- if (lastStatus != null && lastStatus.modified == modified) { |
- // The file is still here. |
- _polledFiles.add(file); |
- return null; |
- } |
- |
- return _hashFile(file).then((hash) { |
- if (_checkForCancel()) return; |
- |
- var status = new _FileStatus(modified, hash); |
- _statuses[file] = status; |
- _polledFiles.add(file); |
- |
- // Only notify while in the watching state. |
- if (_state != _WatchState.WATCHING) return; |
- |
- // And the file is different. |
- var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); |
- if (!changed) return; |
- |
- var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; |
- _events.add(new WatchEvent(type, file)); |
- }); |
- }); |
- } |
- |
- /// After the directory listing is complete, this determines which files were |
- /// removed and then restarts the next poll. |
- Future _completePoll() { |
- // Any files that were not seen in the last poll but that we have a |
- // status for must have been removed. |
- var removedFiles = _statuses.keys.toSet().difference(_polledFiles); |
- for (var removed in removedFiles) { |
- if (_state == _WatchState.WATCHING) { |
- _events.add(new WatchEvent(ChangeType.REMOVE, removed)); |
- } |
- _statuses.remove(removed); |
- } |
- |
- if (_state == _WatchState.SCANNING) { |
- _state = _WatchState.WATCHING; |
- _ready.complete(); |
- } |
- |
- // Wait and then poll again. |
- return new Future.delayed(pollingDelay).then((_) { |
- if (_checkForCancel()) return; |
- _poll(); |
- }); |
- } |
- |
- /// Returns `true` and clears the processing queue if the watcher has been |
- /// unsubscribed. |
- bool _checkForCancel() { |
- if (_state != _WatchState.UNSUBSCRIBED) return false; |
- |
- // Don't process any more files. |
- _filesToProcess.clear(); |
- return true; |
- } |
- |
- /// Calculates the SHA-1 hash of the file at [path]. |
- Future<List<int>> _hashFile(String path) { |
- return new File(path).readAsBytes().then((bytes) { |
- var sha1 = new SHA1(); |
- sha1.add(bytes); |
- return sha1.close(); |
- }); |
- } |
- |
- /// Returns `true` if [a] and [b] are the same hash value, i.e. the same |
- /// series of byte values. |
- bool _sameHash(List<int> a, List<int> b) { |
- // Hashes should always be the same size. |
- assert(a.length == b.length); |
- |
- for (var i = 0; i < a.length; i++) { |
- if (a[i] != b[i]) return false; |
- } |
- |
- return true; |
- } |
-} |
- |
-/// Enum class for the states that the [DirectoryWatcher] can be in. |
-class _WatchState { |
- /// There are no subscribers to the watcher's event stream and no watching |
- /// is going on. |
- static const UNSUBSCRIBED = const _WatchState("unsubscribed"); |
- |
- /// There are subscribers and the watcher is doing an initial scan of the |
- /// directory to see which files were already present before watching started. |
+ /// If a native directory watcher is available for this platform, this will |
+ /// use it. Otherwise, it will fall back to a [PollingDirectoryWatcher]. |
/// |
- /// The watcher does not send notifications for changes that occurred while |
- /// there were no subscribers, or for files already present before watching. |
- /// The initial scan is used to determine what "before watching" state of |
- /// the file system was. |
- static const SCANNING = const _WatchState("scanning"); |
- |
- /// There are subscribers and the watcher is polling the directory to look |
- /// for changes. |
- static const WATCHING = const _WatchState("watching"); |
- |
- /// The name of the state. |
- final String name; |
- |
- const _WatchState(this.name); |
+ /// If [_pollingDelay] is passed, it specifies the amount of time the watcher |
+ /// will pause between successive polls of the directory contents. Making this |
+ /// shorter will give more immediate feedback at the expense of doing more IO |
+ /// and higher CPU usage. Defaults to one second. Ignored for non-polling |
+ /// watchers. |
+ factory DirectoryWatcher(String directory, {Duration pollingDelay}) { |
+ if (Platform.isLinux) return new LinuxDirectoryWatcher(directory); |
+ return new PollingDirectoryWatcher(directory, pollingDelay: pollingDelay); |
+ } |
} |
- |
-class _FileStatus { |
- /// The last time the file was modified. |
- DateTime modified; |
- |
- /// The SHA-1 hash of the contents of the file. |
- List<int> hash; |
- |
- _FileStatus(this.modified, this.hash); |
-} |