| Index: pkg/watcher/lib/src/directory_watcher.dart
|
| diff --git a/pkg/watcher/lib/src/directory_watcher.dart b/pkg/watcher/lib/src/directory_watcher.dart
|
| index 61bf6c52a2e77681160379813f7ecb4405d4c783..913101b60be9f508099e35f5121fe0a9121ac0bf 100644
|
| --- a/pkg/watcher/lib/src/directory_watcher.dart
|
| +++ b/pkg/watcher/lib/src/directory_watcher.dart
|
| @@ -5,10 +5,12 @@
|
| library watcher.directory_watcher;
|
|
|
| import 'dart:async';
|
| +import 'dart:collection';
|
| import 'dart:io';
|
|
|
| import 'package:crypto/crypto.dart';
|
|
|
| +import 'async_queue.dart';
|
| import 'stat.dart';
|
| import 'watch_event.dart';
|
|
|
| @@ -27,7 +29,7 @@ class DirectoryWatcher {
|
| Stream<WatchEvent> get events => _events.stream;
|
| StreamController<WatchEvent> _events;
|
|
|
| - _WatchState _state = _WatchState.notWatching;
|
| + _WatchState _state = _WatchState.UNSUBSCRIBED;
|
|
|
| /// A [Future] that completes when the watcher is initialized and watching
|
| /// for file changes.
|
| @@ -51,6 +53,26 @@ class DirectoryWatcher {
|
| /// Used to tell which files have been modified.
|
| final _statuses = new Map<String, _FileStatus>();
|
|
|
| + /// The subscription used while [directory] is being listed.
|
| + ///
|
| + /// Will be `null` if a list is not currently happening.
|
| + StreamSubscription<FileSystemEntity> _listSubscription;
|
| +
|
| + /// The queue of files waiting to be processed to see if they have been
|
| + /// modified.
|
| + ///
|
| + /// Processing a file is asynchronous, as is listing the directory, so the
|
| + /// queue exists to let each of those proceed at their own rate. The lister
|
| + /// will enqueue files as quickly as it can. Meanwhile, files are dequeued
|
| + /// and processed sequentially.
|
| + AsyncQueue<String> _filesToProcess;
|
| +
|
| + /// The set of files that have been seen in the current directory listing.
|
| + ///
|
| + /// Used to tell which files have been removed: files that are in [_statuses]
|
| + /// but not in here when a poll completes have been removed.
|
| + final _polledFiles = new Set<String>();
|
| +
|
| /// Creates a new [DirectoryWatcher] monitoring [directory].
|
| ///
|
| /// If [pollingDelay] is passed, it specifies the amount of time the watcher
|
| @@ -60,82 +82,133 @@ class DirectoryWatcher {
|
| DirectoryWatcher(this.directory, {Duration pollingDelay})
|
| : pollingDelay = pollingDelay != null ? pollingDelay :
|
| new Duration(seconds: 1) {
|
| - _events = new StreamController<WatchEvent>.broadcast(onListen: () {
|
| - _state = _state.listen(this);
|
| - }, onCancel: () {
|
| - _state = _state.cancel(this);
|
| - });
|
| + _events = new StreamController<WatchEvent>.broadcast(
|
| + onListen: _watch, onCancel: _cancel);
|
| +
|
| + _filesToProcess = new AsyncQueue<String>(_processFile,
|
| + onError: _events.addError);
|
| }
|
|
|
| - /// Starts the asynchronous polling process.
|
| - ///
|
| - /// Scans the contents of the directory and compares the results to the
|
| - /// previous scan. Loops to continue monitoring as long as there are
|
| - /// subscribers to the [events] stream.
|
| - Future _watch() {
|
| - var files = new Set<String>();
|
| + /// Scans to see which files were already present before the watcher was
|
| + /// subscribed to, and then starts watching the directory for changes.
|
| + void _watch() {
|
| + assert(_state == _WatchState.UNSUBSCRIBED);
|
| + _state = _WatchState.SCANNING;
|
| + _poll();
|
| + }
|
|
|
| - var stream = new Directory(directory).list(recursive: true);
|
| + /// Stops watching the directory when there are no more subscribers.
|
| + void _cancel() {
|
| + assert(_state != _WatchState.UNSUBSCRIBED);
|
| + _state = _WatchState.UNSUBSCRIBED;
|
|
|
| - return stream.map((entity) {
|
| - if (entity is! File) return new Future.value();
|
| - files.add(entity.path);
|
| - // TODO(rnystrom): These all run as fast as possible and read the
|
| - // contents of the files. That means there's a pretty big IO hit all at
|
| - // once. Maybe these should be queued up and rate limited?
|
| - return _refreshFile(entity.path);
|
| - }).toList().then((futures) {
|
| - // Once the listing is done, make sure to wait until each file is also
|
| - // done.
|
| - return Future.wait(futures);
|
| - }).then((_) {
|
| - var removedFiles = _statuses.keys.toSet().difference(files);
|
| - for (var removed in removedFiles) {
|
| - if (_state.shouldNotify) {
|
| - _events.add(new WatchEvent(ChangeType.REMOVE, removed));
|
| - }
|
| - _statuses.remove(removed);
|
| - }
|
| + // If we're in the middle of listing the directory, stop.
|
| + if (_listSubscription != null) _listSubscription.cancel();
|
|
|
| - var previousState = _state;
|
| - _state = _state.finish(this);
|
| + // Don't process any remaining files.
|
| + _filesToProcess.clear();
|
| + _polledFiles.clear();
|
| + _statuses.clear();
|
|
|
| - // If we were already sending notifications, add a bit of delay before
|
| - // restarting just so that we don't whale on the file system.
|
| - // TODO(rnystrom): Tune this and/or make it tunable?
|
| - if (_state.shouldNotify) {
|
| - return new Future.delayed(pollingDelay);
|
| - }
|
| - }).then((_) {
|
| - // Make sure we haven't transitioned to a non-watching state during the
|
| - // delay.
|
| - if (_state.shouldWatch) _watch();
|
| + _ready = new Completer();
|
| + }
|
| +
|
| + /// Scans the contents of the directory once to see which files have been
|
| + /// added, removed, and modified.
|
| + void _poll() {
|
| + _filesToProcess.clear();
|
| + _polledFiles.clear();
|
| +
|
| + var stream = new Directory(directory).list(recursive: true);
|
| + _listSubscription = stream.listen((entity) {
|
| + assert(_state != _WatchState.UNSUBSCRIBED);
|
| +
|
| + if (entity is! File) return;
|
| + _filesToProcess.add(entity.path);
|
| + }, onDone: () {
|
| + assert(_state != _WatchState.UNSUBSCRIBED);
|
| + _listSubscription = null;
|
| +
|
| + // Null tells the queue consumer that we're done listing.
|
| + _filesToProcess.add(null);
|
| });
|
| }
|
|
|
| - /// Compares the current state of the file at [path] to the state it was in
|
| - /// the last time it was scanned.
|
| - Future _refreshFile(String path) {
|
| - return getModificationTime(path).then((modified) {
|
| - var lastStatus = _statuses[path];
|
| + /// Processes [file] to determine if it has been modified since the last
|
| + /// time it was scanned.
|
| + Future _processFile(String file) {
|
| + assert(_state != _WatchState.UNSUBSCRIBED);
|
| +
|
| + // `null` is the sentinel which means the directory listing is complete.
|
| + if (file == null) return _completePoll();
|
| +
|
| + return getModificationTime(file).then((modified) {
|
| + if (_checkForCancel()) return;
|
| +
|
| + var lastStatus = _statuses[file];
|
|
|
| - // If it's modification time hasn't changed, assume the file is unchanged.
|
| - if (lastStatus != null && lastStatus.modified == modified) return;
|
| + // If its modification time hasn't changed, assume the file is unchanged.
|
| + if (lastStatus != null && lastStatus.modified == modified) {
|
| + // The file is still here.
|
| + _polledFiles.add(file);
|
| + return;
|
| + }
|
| +
|
| + return _hashFile(file).then((hash) {
|
| + if (_checkForCancel()) return;
|
|
|
| - return _hashFile(path).then((hash) {
|
| var status = new _FileStatus(modified, hash);
|
| - _statuses[path] = status;
|
| -
|
| - // Only notify if the file contents changed.
|
| - if (_state.shouldNotify &&
|
| - (lastStatus == null || !_sameHash(lastStatus.hash, hash))) {
|
| - var change = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY;
|
| - _events.add(new WatchEvent(change, path));
|
| - }
|
| + _statuses[file] = status;
|
| + _polledFiles.add(file);
|
| +
|
| + // Only notify while in the watching state.
|
| + if (_state != _WatchState.WATCHING) return;
|
| +
|
| + // And the file is different.
|
| + var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash);
|
| + if (!changed) return;
|
| +
|
| + var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY;
|
| + _events.add(new WatchEvent(type, file));
|
| });
|
| });
|
| }
|
|
|
| + /// After the directory listing is complete, this determines which files were
|
| + /// removed and then restarts the next poll.
|
| + Future _completePoll() {
|
| + // Any files that were not seen in the last poll but that we have a
|
| + // status for must have been removed.
|
| + var removedFiles = _statuses.keys.toSet().difference(_polledFiles);
|
| + for (var removed in removedFiles) {
|
| + if (_state == _WatchState.WATCHING) {
|
| + _events.add(new WatchEvent(ChangeType.REMOVE, removed));
|
| + }
|
| + _statuses.remove(removed);
|
| + }
|
| +
|
| + if (_state == _WatchState.SCANNING) {
|
| + _state = _WatchState.WATCHING;
|
| + _ready.complete();
|
| + }
|
| +
|
| + // Wait and then poll again.
|
| + return new Future.delayed(pollingDelay).then((_) {
|
| + if (_checkForCancel()) return;
|
| + _poll();
|
| + });
|
| + }
|
| +
|
| + /// Returns `true` and clears the processing queue if the watcher has been
|
| + /// unsubscribed.
|
| + bool _checkForCancel() {
|
| + if (_state != _WatchState.UNSUBSCRIBED) return false;
|
| +
|
| + // Don't process any more files.
|
| + _filesToProcess.clear();
|
| + return true;
|
| + }
|
| +
|
| /// Calculates the SHA-1 hash of the file at [path].
|
| Future<List<int>> _hashFile(String path) {
|
| return new File(path).readAsBytes().then((bytes) {
|
| @@ -159,71 +232,29 @@ class DirectoryWatcher {
|
| }
|
| }
|
|
|
| -/// An "event" that is sent to the [_WatchState] FSM to trigger state
|
| -/// transitions.
|
| -typedef _WatchState _WatchStateEvent(DirectoryWatcher watcher);
|
| -
|
| -/// The different states that the watcher can be in and the transitions between
|
| -/// them.
|
| -///
|
| -/// This class defines a finite state machine for keeping track of what the
|
| -/// asynchronous file polling is doing. Each instance of this is a state in the
|
| -/// machine and its [listen], [cancel], and [finish] fields define the state
|
| -/// transitions when those events occur.
|
| +/// Enum class for the states that the [DirectoryWatcher] can be in.
|
| class _WatchState {
|
| - /// The watcher has no subscribers.
|
| - static final notWatching = new _WatchState(
|
| - listen: (watcher) {
|
| - watcher._watch();
|
| - return _WatchState.scanning;
|
| - });
|
| -
|
| - /// The watcher has subscribers and is scanning for pre-existing files.
|
| - static final scanning = new _WatchState(
|
| - cancel: (watcher) {
|
| - // No longer watching, so create a new incomplete ready future.
|
| - watcher._ready = new Completer();
|
| - return _WatchState.cancelling;
|
| - }, finish: (watcher) {
|
| - watcher._ready.complete();
|
| - return _WatchState.watching;
|
| - }, shouldWatch: true);
|
| -
|
| - /// The watcher was unsubscribed while polling and we're waiting for the poll
|
| - /// to finish.
|
| - static final cancelling = new _WatchState(
|
| - listen: (_) => _WatchState.scanning,
|
| - finish: (_) => _WatchState.notWatching);
|
| -
|
| - /// The watcher has subscribers, we have scanned for pre-existing files and
|
| - /// now we're polling for changes.
|
| - static final watching = new _WatchState(
|
| - cancel: (watcher) {
|
| - // No longer watching, so create a new incomplete ready future.
|
| - watcher._ready = new Completer();
|
| - return _WatchState.cancelling;
|
| - }, finish: (_) => _WatchState.watching,
|
| - shouldWatch: true, shouldNotify: true);
|
| -
|
| - /// Called when the first subscriber to the watcher has been added.
|
| - final _WatchStateEvent listen;
|
| -
|
| - /// Called when all subscriptions on the watcher have been cancelled.
|
| - final _WatchStateEvent cancel;
|
| -
|
| - /// Called when a poll loop has finished.
|
| - final _WatchStateEvent finish;
|
| -
|
| - /// If the directory watcher should be watching the file system while in
|
| - /// this state.
|
| - final bool shouldWatch;
|
| -
|
| - /// If a change event should be sent for a file modification while in this
|
| - /// state.
|
| - final bool shouldNotify;
|
| -
|
| - _WatchState({this.listen, this.cancel, this.finish,
|
| - this.shouldWatch: false, this.shouldNotify: false});
|
| + /// There are no subscribers to the watcher's event stream and no watching
|
| + /// is going on.
|
| + static const UNSUBSCRIBED = const _WatchState("unsubscribed");
|
| +
|
| + /// There are subscribers and the watcher is doing an initial scan of the
|
| + /// directory to see which files were already present before watching started.
|
| + ///
|
| + /// The watcher does not send notifications for changes that occurred while
|
| + /// there were no subscribers, or for files already present before watching.
|
| + /// The initial scan is used to determine what "before watching" state of
|
| + /// the file system was.
|
| + static const SCANNING = const _WatchState("scanning");
|
| +
|
| + /// There are subscribers and the watcher is polling the directory to look
|
| + /// for changes.
|
| + static const WATCHING = const _WatchState("watching");
|
| +
|
| + /// The name of the state.
|
| + final String name;
|
| +
|
| + const _WatchState(this.name);
|
| }
|
|
|
| class _FileStatus {
|
|
|