Index: pkg/watcher/lib/src/directory_watcher/polling.dart |
diff --git a/pkg/watcher/lib/src/directory_watcher.dart b/pkg/watcher/lib/src/directory_watcher/polling.dart |
similarity index 55% |
copy from pkg/watcher/lib/src/directory_watcher.dart |
copy to pkg/watcher/lib/src/directory_watcher/polling.dart |
index 679e227e8d181cefb7124a272e416824d5b5bc8b..91ca005f31fb2a9cf33c99f8bb4bc452c5399c58 100644 |
--- a/pkg/watcher/lib/src/directory_watcher.dart |
+++ b/pkg/watcher/lib/src/directory_watcher/polling.dart |
@@ -2,51 +2,49 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-library watcher.directory_watcher; |
+library watcher.directory_watcher.polling; |
import 'dart:async'; |
import 'dart:io'; |
import 'package:crypto/crypto.dart'; |
-import 'async_queue.dart'; |
-import 'stat.dart'; |
-import 'utils.dart'; |
-import 'watch_event.dart'; |
+import '../async_queue.dart'; |
+import '../directory_watcher.dart'; |
+import '../stat.dart'; |
+import '../utils.dart'; |
+import '../watch_event.dart'; |
+import 'resubscribable.dart'; |
-/// Watches the contents of a directory and emits [WatchEvent]s when something |
-/// in the directory has changed. |
-class DirectoryWatcher { |
- /// The directory whose contents are being monitored. |
+/// Periodically polls a directory for changes. |
+class PollingDirectoryWatcher extends ResubscribableDirectoryWatcher { |
+ /// Creates a new polling watcher monitoring [directory]. |
+ /// |
+ /// If [_pollingDelay] is passed, it specifies the amount of time the watcher |
+ /// will pause between successive polls of the directory contents. Making this |
+ /// shorter will give more immediate feedback at the expense of doing more IO |
+ /// and higher CPU usage. Defaults to one second. |
+ PollingDirectoryWatcher(String directory, {Duration pollingDelay}) |
+ : super(directory, () { |
+ return new _PollingDirectoryWatcher(directory, |
+ pollingDelay != null ? pollingDelay : new Duration(seconds: 1)); |
+ }); |
+} |
+ |
+class _PollingDirectoryWatcher implements ManuallyClosedDirectoryWatcher { |
final String directory; |
- /// The broadcast [Stream] of events that have occurred to files in |
- /// [directory]. |
- /// |
- /// Changes will only be monitored while this stream has subscribers. Any |
- /// file changes that occur during periods when there are no subscribers |
- /// will not be reported the next time a subscriber is added. |
Stream<WatchEvent> get events => _events.stream; |
- StreamController<WatchEvent> _events; |
+ final _events = new StreamController<WatchEvent>.broadcast(); |
- _WatchState _state = _WatchState.UNSUBSCRIBED; |
+ bool get isReady => _ready.isCompleted; |
- /// A [Future] that completes when the watcher is initialized and watching |
- /// for file changes. |
- /// |
- /// If the watcher is not currently monitoring the directory (because there |
- /// are no subscribers to [events]), this returns a future that isn't |
- /// complete yet. It will complete when a subscriber starts listening and |
- /// the watcher finishes any initialization work it needs to do. |
- /// |
- /// If the watcher is already monitoring, this returns an already complete |
- /// future. |
Future get ready => _ready.future; |
- Completer _ready = new Completer(); |
+ final _ready = new Completer(); |
/// The amount of time the watcher pauses between successive polls of the |
/// directory contents. |
- final Duration pollingDelay; |
+ final Duration _pollingDelay; |
/// The previous status of the files in the directory. |
/// |
@@ -73,34 +71,15 @@ class DirectoryWatcher { |
/// but not in here when a poll completes have been removed. |
final _polledFiles = new Set<String>(); |
- /// Creates a new [DirectoryWatcher] monitoring [directory]. |
- /// |
- /// If [pollingDelay] is passed, it specifies the amount of time the watcher |
- /// will pause between successive polls of the directory contents. Making |
- /// this shorter will give more immediate feedback at the expense of doing |
- /// more IO and higher CPU usage. Defaults to one second. |
- DirectoryWatcher(this.directory, {Duration pollingDelay}) |
- : pollingDelay = pollingDelay != null ? pollingDelay : |
- new Duration(seconds: 1) { |
- _events = new StreamController<WatchEvent>.broadcast( |
- onListen: _watch, onCancel: _cancel); |
- |
+ _PollingDirectoryWatcher(this.directory, this._pollingDelay) { |
_filesToProcess = new AsyncQueue<String>(_processFile, |
onError: _events.addError); |
- } |
- /// Scans to see which files were already present before the watcher was |
- /// subscribed to, and then starts watching the directory for changes. |
- void _watch() { |
- assert(_state == _WatchState.UNSUBSCRIBED); |
- _state = _WatchState.SCANNING; |
_poll(); |
} |
- /// Stops watching the directory when there are no more subscribers. |
- void _cancel() { |
- assert(_state != _WatchState.UNSUBSCRIBED); |
- _state = _WatchState.UNSUBSCRIBED; |
+ void close() { |
+ _events.close(); |
// If we're in the middle of listing the directory, stop. |
if (_listSubscription != null) _listSubscription.cancel(); |
@@ -109,8 +88,6 @@ class DirectoryWatcher { |
_filesToProcess.clear(); |
_polledFiles.clear(); |
_statuses.clear(); |
- |
- _ready = new Completer(); |
} |
/// Scans the contents of the directory once to see which files have been |
@@ -120,7 +97,7 @@ class DirectoryWatcher { |
_polledFiles.clear(); |
endListing() { |
- assert(_state != _WatchState.UNSUBSCRIBED); |
+ assert(!_events.isClosed); |
_listSubscription = null; |
// Null tells the queue consumer that we're done listing. |
@@ -129,11 +106,11 @@ class DirectoryWatcher { |
var stream = new Directory(directory).list(recursive: true); |
_listSubscription = stream.listen((entity) { |
- assert(_state != _WatchState.UNSUBSCRIBED); |
+ assert(!_events.isClosed); |
if (entity is! File) return; |
_filesToProcess.add(entity.path); |
- }, onError: (error, StackTrace stackTrace) { |
+ }, onError: (error, stackTrace) { |
if (!isDirectoryNotFoundException(error)) { |
// It's some unknown error. Pipe it over to the event stream so the |
// user can see it. |
@@ -150,13 +127,11 @@ class DirectoryWatcher { |
/// Processes [file] to determine if it has been modified since the last |
/// time it was scanned. |
Future _processFile(String file) { |
- assert(_state != _WatchState.UNSUBSCRIBED); |
- |
// `null` is the sentinel which means the directory listing is complete. |
if (file == null) return _completePoll(); |
return getModificationTime(file).then((modified) { |
- if (_checkForCancel()) return null; |
+ if (_events.isClosed) return null; |
var lastStatus = _statuses[file]; |
@@ -168,14 +143,14 @@ class DirectoryWatcher { |
} |
return _hashFile(file).then((hash) { |
- if (_checkForCancel()) return; |
+ if (_events.isClosed) return; |
var status = new _FileStatus(modified, hash); |
_statuses[file] = status; |
_polledFiles.add(file); |
- // Only notify while in the watching state. |
- if (_state != _WatchState.WATCHING) return; |
+ // Only notify if we're ready to emit events. |
+ if (!isReady) return; |
// And the file is different. |
var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); |
@@ -194,34 +169,19 @@ class DirectoryWatcher { |
// status for must have been removed. |
var removedFiles = _statuses.keys.toSet().difference(_polledFiles); |
for (var removed in removedFiles) { |
- if (_state == _WatchState.WATCHING) { |
- _events.add(new WatchEvent(ChangeType.REMOVE, removed)); |
- } |
+ if (isReady) _events.add(new WatchEvent(ChangeType.REMOVE, removed)); |
_statuses.remove(removed); |
} |
- if (_state == _WatchState.SCANNING) { |
- _state = _WatchState.WATCHING; |
- _ready.complete(); |
- } |
+ if (!isReady) _ready.complete(); |
// Wait and then poll again. |
- return new Future.delayed(pollingDelay).then((_) { |
- if (_checkForCancel()) return; |
+ return new Future.delayed(_pollingDelay).then((_) { |
+ if (_events.isClosed) return; |
_poll(); |
}); |
} |
- /// Returns `true` and clears the processing queue if the watcher has been |
- /// unsubscribed. |
- bool _checkForCancel() { |
- if (_state != _WatchState.UNSUBSCRIBED) return false; |
- |
- // Don't process any more files. |
- _filesToProcess.clear(); |
- return true; |
- } |
- |
/// Calculates the SHA-1 hash of the file at [path]. |
Future<List<int>> _hashFile(String path) { |
return new File(path).readAsBytes().then((bytes) { |
@@ -245,31 +205,6 @@ class DirectoryWatcher { |
} |
} |
-/// Enum class for the states that the [DirectoryWatcher] can be in. |
-class _WatchState { |
- /// There are no subscribers to the watcher's event stream and no watching |
- /// is going on. |
- static const UNSUBSCRIBED = const _WatchState("unsubscribed"); |
- |
- /// There are subscribers and the watcher is doing an initial scan of the |
- /// directory to see which files were already present before watching started. |
- /// |
- /// The watcher does not send notifications for changes that occurred while |
- /// there were no subscribers, or for files already present before watching. |
- /// The initial scan is used to determine what "before watching" state of |
- /// the file system was. |
- static const SCANNING = const _WatchState("scanning"); |
- |
- /// There are subscribers and the watcher is polling the directory to look |
- /// for changes. |
- static const WATCHING = const _WatchState("watching"); |
- |
- /// The name of the state. |
- final String name; |
- |
- const _WatchState(this.name); |
-} |
- |
class _FileStatus { |
/// The last time the file was modified. |
DateTime modified; |
@@ -278,4 +213,5 @@ class _FileStatus { |
List<int> hash; |
_FileStatus(this.modified, this.hash); |
-} |
+} |
+ |