Chromium Code Reviews| Index: pkg/watcher/lib/src/directory_watcher.dart |
| diff --git a/pkg/watcher/lib/src/directory_watcher.dart b/pkg/watcher/lib/src/directory_watcher.dart |
| index 61bf6c52a2e77681160379813f7ecb4405d4c783..f08dc198fc635cd025f7ba1af92a1edae8e29391 100644 |
| --- a/pkg/watcher/lib/src/directory_watcher.dart |
| +++ b/pkg/watcher/lib/src/directory_watcher.dart |
| @@ -5,6 +5,7 @@ |
| library watcher.directory_watcher; |
| import 'dart:async'; |
| +import 'dart:collection'; |
| import 'dart:io'; |
| import 'package:crypto/crypto.dart'; |
| @@ -27,7 +28,7 @@ class DirectoryWatcher { |
| Stream<WatchEvent> get events => _events.stream; |
| StreamController<WatchEvent> _events; |
| - _WatchState _state = _WatchState.notWatching; |
| + _WatchState _state = _WatchState.UNSUBSCRIBED; |
| /// A [Future] that completes when the watcher is initialized and watching |
| /// for file changes. |
| @@ -51,6 +52,30 @@ class DirectoryWatcher { |
| /// Used to tell which files have been modified. |
| final _statuses = new Map<String, _FileStatus>(); |
| + /// The subscription used while [directory] is being listed. |
| + /// |
| + /// Will be `null` if a list is not currently happening. |
| + StreamSubscription<FileSystemEntity> _listSubscription; |
| + |
| + /// This will be `true` if we are currently asynchronously processing files |
| + /// from [_filesToCheck]. |
|
nweiz
2013/08/01 22:52:39
"_filesToCheck" -> "_filesToProcess"
Bob Nystrom
2013/08/02 17:26:29
Done.
|
| + bool _isProcessing = false; |
| + |
| + /// The queue of files waiting to be processed to see if they have been |
| + /// modified. |
| + /// |
| + /// Processing a file is asynchronous, as is listing the directory, so the |
| + /// queue exists to let each of those proceed at their own rate. The lister |
| + /// will enqueue files as quickly as it can. Meanwhile, files are dequeued |
| + /// and processed sequentially. |
| + final _filesToProcess = new Queue<String>(); |
|
nweiz
2013/08/01 22:52:39
See how this looks with a FutureGroup instead of a
Bob Nystrom
2013/08/02 17:26:29
I started going in that direction but it looks lik
nweiz
2013/08/02 20:07:52
SGTM. Can you abstract out the notion of a sequent
Bob Nystrom
2013/08/02 21:45:11
Done.
|
| + |
| + /// The set of files that have been seen in the current directory listing. |
| + /// |
| + /// Used to tell which files have been removed: files that are in [_statuses] |
| + /// but not in here when a poll completes have been removed. |
| + final _polledFiles = new Set<String>(); |
| + |
| /// Creates a new [DirectoryWatcher] monitoring [directory]. |
| /// |
| /// If [pollingDelay] is passed, it specifies the amount of time the watcher |
| @@ -60,77 +85,147 @@ class DirectoryWatcher { |
| DirectoryWatcher(this.directory, {Duration pollingDelay}) |
| : pollingDelay = pollingDelay != null ? pollingDelay : |
| new Duration(seconds: 1) { |
| - _events = new StreamController<WatchEvent>.broadcast(onListen: () { |
| - _state = _state.listen(this); |
| - }, onCancel: () { |
| - _state = _state.cancel(this); |
| - }); |
| + _events = new StreamController<WatchEvent>.broadcast( |
| + onListen: _watch, onCancel: _cancel); |
| } |
| - /// Starts the asynchronous polling process. |
| - /// |
| - /// Scans the contents of the directory and compares the results to the |
| - /// previous scan. Loops to continue monitoring as long as there are |
| - /// subscribers to the [events] stream. |
| - Future _watch() { |
| - var files = new Set<String>(); |
| + /// Scans to see which files were already present before the watcher was |
| + /// subscribed to, and then starts watching the directory for changes. |
| + void _watch() { |
| + assert(_state == _WatchState.UNSUBSCRIBED); |
| + _state = _WatchState.SCANNING; |
| + _poll(); |
| + } |
| + |
| + /// Stops watching the directory when there are no more subscribers. |
| + void _cancel() { |
| + assert(_state != _WatchState.UNSUBSCRIBED); |
| + _state = _WatchState.UNSUBSCRIBED; |
| + |
| + // If we're in the middle of listing the directory, stop. |
| + if (_listSubscription != null) _listSubscription.cancel(); |
| + |
| + // Don't process any remaining files. |
| + _filesToProcess.clear(); |
| + _polledFiles.clear(); |
|
nweiz
2013/08/01 22:52:39
Also clear _statuses.
Bob Nystrom
2013/08/02 17:26:29
Done.
|
| + |
| + _ready = new Completer(); |
| + } |
| + |
| + /// Scans the contents of the directory once to see which files have been |
| + /// added, removed, and modified. |
| + void _poll() { |
| + _filesToProcess.clear(); |
| + _polledFiles.clear(); |
| var stream = new Directory(directory).list(recursive: true); |
| + _listSubscription = stream.listen((entity) { |
| + assert(_state != _WatchState.UNSUBSCRIBED); |
| + |
| + if (entity is! File) return; |
| + _enqueueFile(entity.path); |
| + }, onDone: () { |
| + assert(_state != _WatchState.UNSUBSCRIBED); |
| + _listSubscription = null; |
| + |
| + // Null tells the queue consumer that we're done listing. |
| + _enqueueFile(null); |
| + }); |
| + } |
| - return stream.map((entity) { |
| - if (entity is! File) return new Future.value(); |
| - files.add(entity.path); |
| - // TODO(rnystrom): These all run as fast as possible and read the |
| - // contents of the files. That means there's a pretty big IO hit all at |
| - // once. Maybe these should be queued up and rate limited? |
| - return _refreshFile(entity.path); |
| - }).toList().then((futures) { |
| - // Once the listing is done, make sure to wait until each file is also |
| - // done. |
| - return Future.wait(futures); |
| - }).then((_) { |
| - var removedFiles = _statuses.keys.toSet().difference(files); |
| - for (var removed in removedFiles) { |
| - if (_state.shouldNotify) { |
| - _events.add(new WatchEvent(ChangeType.REMOVE, removed)); |
| + /// Add [path] to the queue of files whose status needs to be processed. |
| + void _enqueueFile(String path) { |
| + _filesToProcess.add(path); |
| + |
| + // Start up the asynchronous processing if not already running. |
| + if (_isProcessing) return; |
| + _isProcessing = true; |
| + |
| + refreshNextFile() { |
|
nweiz
2013/08/01 22:52:39
It feels like this warrants being its own method.
Bob Nystrom
2013/08/02 17:26:29
Done.
|
| + var file = _filesToProcess.removeFirst(); |
| + |
| + // `null` is the sentinel which means the directory listing is complete. |
| + if (file == null) { |
| + _isProcessing = false; |
| + |
| + // Any files that were not seen in the last poll but that we have a |
| + // status for must have been removed. |
| + var removedFiles = _statuses.keys.toSet().difference(_polledFiles); |
| + for (var removed in removedFiles) { |
| + if (_state == _WatchState.WATCHING) { |
| + _events.add(new WatchEvent(ChangeType.REMOVE, removed)); |
| + } |
| + _statuses.remove(removed); |
| + } |
| + |
| + assert(_state != _WatchState.UNSUBSCRIBED); |
|
nweiz
2013/08/01 22:52:39
This should be at the top of [refreshNextFile].
Bob Nystrom
2013/08/02 17:26:29
Done.
|
| + if (_state == _WatchState.SCANNING) { |
| + _state = _WatchState.WATCHING; |
| + print("done scanning, watching"); |
|
nweiz
2013/08/01 22:52:39
Left over from debugging?
Bob Nystrom
2013/08/02 17:26:29
Yup. Done.
|
| + _ready.complete(); |
| } |
| - _statuses.remove(removed); |
| - } |
| - var previousState = _state; |
| - _state = _state.finish(this); |
| + // Wait. |
| + return new Future.delayed(pollingDelay).then((_) { |
| + // Stop if we unsubscribed while waiting. |
| + if (_state == _WatchState.UNSUBSCRIBED) return; |
| - // If we were already sending notifications, add a bit of delay before |
| - // restarting just so that we don't whale on the file system. |
| - // TODO(rnystrom): Tune this and/or make it tunable? |
| - if (_state.shouldNotify) { |
| - return new Future.delayed(pollingDelay); |
| + // And then poll again. |
| + _poll(); |
| + }); |
| } |
| - }).then((_) { |
| - // Make sure we haven't transitioned to a non-watching state during the |
| - // delay. |
| - if (_state.shouldWatch) _watch(); |
| + |
| + return _processFile(file).then((_) { |
| + // Stop if we unsubscribed while waiting. |
| + if (_state == _WatchState.UNSUBSCRIBED) return; |
| + |
| + // If we have drained the queue (i.e. we are processing faster than |
| + // we are listing files), then stop processing and wait until something |
| + // has been enqueued. |
| + if (_filesToProcess.isEmpty) { |
| + _isProcessing = false; |
| + return; |
| + } |
| + |
| + return refreshNextFile(); |
| + }); |
| + } |
| + |
| + // Pipe all errors to the notification stream. |
| + refreshNextFile().catchError((error) { |
| + _events.addError(error, getAttachedStackTrace(error)); |
|
nweiz
2013/08/01 22:52:39
You don't need to manually forward the stack trace
Bob Nystrom
2013/08/02 17:26:29
Done.
|
| }); |
| } |
| - /// Compares the current state of the file at [path] to the state it was in |
| - /// the last time it was scanned. |
| - Future _refreshFile(String path) { |
| + Future _processFile(String path) { |
| return getModificationTime(path).then((modified) { |
| + // Stop if we unsubscribed while waiting. |
| + if (_state == _WatchState.UNSUBSCRIBED) return; |
| + |
| var lastStatus = _statuses[path]; |
| // If it's modification time hasn't changed, assume the file is unchanged. |
|
nweiz
2013/08/01 22:52:39
"it's" -> "its"
Bob Nystrom
2013/08/02 17:26:29
Done.
|
| - if (lastStatus != null && lastStatus.modified == modified) return; |
| + if (lastStatus != null && lastStatus.modified == modified) { |
| + // The file is still here. |
| + _polledFiles.add(path); |
| + return; |
| + } |
| return _hashFile(path).then((hash) { |
| + // Stop if we unsubscribed while waiting. |
| + if (_state == _WatchState.UNSUBSCRIBED) return; |
| + |
| var status = new _FileStatus(modified, hash); |
| _statuses[path] = status; |
| - |
| - // Only notify if the file contents changed. |
| - if (_state.shouldNotify && |
| - (lastStatus == null || !_sameHash(lastStatus.hash, hash))) { |
| - var change = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; |
| - _events.add(new WatchEvent(change, path)); |
| + _polledFiles.add(path); |
| + |
| + // Only notify while in the watching state. |
| + if (_state == _WatchState.WATCHING) { |
|
nweiz
2013/08/01 22:52:39
Short-circuit here and below.
Bob Nystrom
2013/08/02 17:26:29
Done.
|
| + var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); |
| + if (changed) { |
| + var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; |
| + _events.add(new WatchEvent(type, path)); |
| + } |
| } |
| }); |
| }); |
| @@ -159,71 +254,14 @@ class DirectoryWatcher { |
| } |
| } |
| -/// An "event" that is sent to the [_WatchState] FSM to trigger state |
| -/// transitions. |
| -typedef _WatchState _WatchStateEvent(DirectoryWatcher watcher); |
| - |
| -/// The different states that the watcher can be in and the transitions between |
| -/// them. |
| -/// |
| -/// This class defines a finite state machine for keeping track of what the |
| -/// asynchronous file polling is doing. Each instance of this is a state in the |
| -/// machine and its [listen], [cancel], and [finish] fields define the state |
| -/// transitions when those events occur. |
| class _WatchState { |
| - /// The watcher has no subscribers. |
| - static final notWatching = new _WatchState( |
| - listen: (watcher) { |
| - watcher._watch(); |
| - return _WatchState.scanning; |
| - }); |
| - |
| - /// The watcher has subscribers and is scanning for pre-existing files. |
| - static final scanning = new _WatchState( |
| - cancel: (watcher) { |
| - // No longer watching, so create a new incomplete ready future. |
| - watcher._ready = new Completer(); |
| - return _WatchState.cancelling; |
| - }, finish: (watcher) { |
| - watcher._ready.complete(); |
| - return _WatchState.watching; |
| - }, shouldWatch: true); |
| - |
| - /// The watcher was unsubscribed while polling and we're waiting for the poll |
| - /// to finish. |
| - static final cancelling = new _WatchState( |
| - listen: (_) => _WatchState.scanning, |
| - finish: (_) => _WatchState.notWatching); |
| - |
| - /// The watcher has subscribers, we have scanned for pre-existing files and |
| - /// now we're polling for changes. |
| - static final watching = new _WatchState( |
| - cancel: (watcher) { |
| - // No longer watching, so create a new incomplete ready future. |
| - watcher._ready = new Completer(); |
| - return _WatchState.cancelling; |
| - }, finish: (_) => _WatchState.watching, |
| - shouldWatch: true, shouldNotify: true); |
| - |
| - /// Called when the first subscriber to the watcher has been added. |
| - final _WatchStateEvent listen; |
| - |
| - /// Called when all subscriptions on the watcher have been cancelled. |
| - final _WatchStateEvent cancel; |
| - |
| - /// Called when a poll loop has finished. |
| - final _WatchStateEvent finish; |
| - |
| - /// If the directory watcher should be watching the file system while in |
| - /// this state. |
| - final bool shouldWatch; |
| - |
| - /// If a change event should be sent for a file modification while in this |
| - /// state. |
| - final bool shouldNotify; |
| - |
| - _WatchState({this.listen, this.cancel, this.finish, |
| - this.shouldWatch: false, this.shouldNotify: false}); |
| + static const UNSUBSCRIBED = const _WatchState("unsubscribed"); |
| + static const SCANNING = const _WatchState("scanning"); |
| + static const WATCHING = const _WatchState("watching"); |
|
nweiz
2013/08/01 22:52:39
Add some docs about what each state means.
Bob Nystrom
2013/08/02 17:26:29
Done.
|
| + |
| + final String name; |
| + |
| + const _WatchState(this.name); |
| } |
| class _FileStatus { |