Index: packages/watcher/lib/src/directory_watcher/linux.dart |
diff --git a/packages/watcher/lib/src/directory_watcher/linux.dart b/packages/watcher/lib/src/directory_watcher/linux.dart |
index a747839d2ba1b2afc960ff3c2a56ee0ab609bdf9..df1365c76bff19fa15f8a16b9d92eab430811b5f 100644 |
--- a/packages/watcher/lib/src/directory_watcher/linux.dart |
+++ b/packages/watcher/lib/src/directory_watcher/linux.dart |
@@ -2,12 +2,13 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-library watcher.directory_watcher.linux; |
- |
import 'dart:async'; |
import 'dart:io'; |
+import 'package:async/async.dart'; |
+ |
import '../directory_watcher.dart'; |
+import '../path_set.dart'; |
import '../resubscribable.dart'; |
import '../utils.dart'; |
import '../watch_event.dart'; |
@@ -32,8 +33,8 @@ class LinuxDirectoryWatcher extends ResubscribableWatcher |
class _LinuxDirectoryWatcher |
implements DirectoryWatcher, ManuallyClosedWatcher { |
- String get directory => path; |
- final String path; |
+ String get directory => _files.root; |
+ String get path => _files.root; |
Stream<WatchEvent> get events => _eventsController.stream; |
final _eventsController = new StreamController<WatchEvent>.broadcast(); |
@@ -43,15 +44,17 @@ class _LinuxDirectoryWatcher |
Future get ready => _readyCompleter.future; |
final _readyCompleter = new Completer(); |
- /// The last known state for each entry in this directory. |
- /// |
- /// The keys in this map are the paths to the directory entries; the values |
- /// are [_EntryState]s indicating whether the entries are files or |
- /// directories. |
- final _entries = new Map<String, _EntryState>(); |
+ /// A stream group for the [Directory.watch] events of [path] and all its |
+ /// subdirectories. |
+ var _nativeEvents = new StreamGroup<FileSystemEvent>(); |
- /// The watchers for subdirectories of [directory]. |
- final _subWatchers = new Map<String, _LinuxDirectoryWatcher>(); |
+ /// All known files recursively within [path]. |
+ final PathSet _files; |
+ |
+ /// [Directory.watch] streams for [path]'s subdirectories, indexed by name. |
+ /// |
+ /// A stream is in this map if and only if it's also in [_nativeEvents]. |
+ final _subdirStreams = <String, Stream<FileSystemEvent>>{}; |
/// A set of all subscriptions that this watcher subscribes to. |
/// |
@@ -59,93 +62,51 @@ class _LinuxDirectoryWatcher |
/// watcher is closed. |
final _subscriptions = new Set<StreamSubscription>(); |
- _LinuxDirectoryWatcher(this.path) { |
+ _LinuxDirectoryWatcher(String path) |
+ : _files = new PathSet(path) { |
+ _nativeEvents.add(new Directory(path).watch().transform( |
+ new StreamTransformer.fromHandlers(handleDone: (sink) { |
+ // Handle the done event here rather than in the call to [_listen] because |
+ // [innerStream] won't close until we close the [StreamGroup]. However, if |
+ // we close the [StreamGroup] here, we run the risk of new-directory |
+ // events being fired after the group is closed, since batching delays |
+ // those events. See b/30768513. |
+ _onDone(); |
+ }))); |
+ |
// Batch the inotify changes together so that we can dedup events. |
- var innerStream = new Directory(path).watch() |
+ var innerStream = _nativeEvents.stream |
.transform(new BatchedStreamTransformer<FileSystemEvent>()); |
- _listen(innerStream, _onBatch, |
- onError: _eventsController.addError, |
- onDone: _onDone); |
- |
- _listen(new Directory(path).list(), (entity) { |
- _entries[entity.path] = new _EntryState(entity is Directory); |
- if (entity is! Directory) return; |
- _watchSubdir(entity.path); |
+ _listen(innerStream, _onBatch, onError: _eventsController.addError); |
+ |
+ _listen(new Directory(path).list(recursive: true), (entity) { |
+ if (entity is Directory) { |
+ _watchSubdir(entity.path); |
+ } else { |
+ _files.add(entity.path); |
+ } |
}, onError: (error, stackTrace) { |
_eventsController.addError(error, stackTrace); |
close(); |
}, onDone: () { |
- _waitUntilReady().then((_) => _readyCompleter.complete()); |
+ _readyCompleter.complete(); |
}, cancelOnError: true); |
} |
- /// Returns a [Future] that completes once all the subdirectory watchers are |
- /// fully initialized. |
- Future _waitUntilReady() { |
- return Future.wait(_subWatchers.values.map((watcher) => watcher.ready)) |
- .then((_) { |
- if (_subWatchers.values.every((watcher) => watcher.isReady)) return null; |
- return _waitUntilReady(); |
- }); |
- } |
- |
void close() { |
for (var subscription in _subscriptions) { |
subscription.cancel(); |
} |
- for (var watcher in _subWatchers.values) { |
- watcher.close(); |
- } |
- _subWatchers.clear(); |
_subscriptions.clear(); |
+ _subdirStreams.clear(); |
+ _files.clear(); |
+ _nativeEvents.close(); |
_eventsController.close(); |
} |
- /// Returns all files (not directories) that this watcher knows of are |
- /// recursively in the watched directory. |
- Set<String> get _allFiles { |
- var files = new Set<String>(); |
- _getAllFiles(files); |
- return files; |
- } |
- |
- /// Helper function for [_allFiles]. |
- /// |
- /// Adds all files that this watcher knows of to [files]. |
- void _getAllFiles(Set<String> files) { |
- files.addAll(_entries.keys |
- .where((path) => _entries[path] == _EntryState.FILE).toSet()); |
- for (var watcher in _subWatchers.values) { |
- watcher._getAllFiles(files); |
- } |
- } |
- |
/// Watch a subdirectory of [directory] for changes. |
- /// |
- /// If the subdirectory was added after [this] began emitting events, its |
- /// contents will be emitted as ADD events. |
void _watchSubdir(String path) { |
- if (_subWatchers.containsKey(path)) return; |
- var watcher = new _LinuxDirectoryWatcher(path); |
- _subWatchers[path] = watcher; |
- |
- // TODO(nweiz): Catch any errors here that indicate that the directory in |
- // question doesn't exist and silently stop watching it instead of |
- // propagating the errors. |
- _listen(watcher.events, (event) { |
- if (isReady) _eventsController.add(event); |
- }, onError: (error, stackTrace) { |
- _eventsController.addError(error, stackTrace); |
- close(); |
- }, onDone: () { |
- if (_subWatchers[path] == watcher) _subWatchers.remove(path); |
- |
- // It's possible that a directory was removed and recreated very quickly. |
- // If so, make sure we're still watching it. |
- if (new Directory(path).existsSync()) _watchSubdir(path); |
- }); |
- |
// TODO(nweiz): Right now it's possible for the watcher to emit an event for |
// a file before the directory list is complete. This could lead to the user |
// seeing a MODIFY or REMOVE event for a file before they see an ADD event, |
@@ -157,96 +118,110 @@ class _LinuxDirectoryWatcher |
// top-level clients such as barback as well, and could be implemented with |
// a wrapper similar to how listening/canceling works now. |
- // If a directory is added after we're finished with the initial scan, emit |
- // an event for each entry in it. This gives the user consistently gets an |
- // event for every new file. |
- watcher.ready.then((_) { |
- if (!isReady || _eventsController.isClosed) return; |
- _listen(new Directory(path).list(recursive: true), (entry) { |
- if (entry is Directory) return; |
- _eventsController.add(new WatchEvent(ChangeType.ADD, entry.path)); |
- }, onError: (error, stackTrace) { |
- // Ignore an exception caused by the dir not existing. It's fine if it |
- // was added and then quickly removed. |
- if (error is FileSystemException) return; |
- |
- _eventsController.addError(error, stackTrace); |
- close(); |
- }, cancelOnError: true); |
- }); |
+ // TODO(nweiz): Catch any errors here that indicate that the directory in |
+ // question doesn't exist and silently stop watching it instead of |
+ // propagating the errors. |
+ var stream = new Directory(path).watch(); |
+ _subdirStreams[path] = stream; |
+ _nativeEvents.add(stream); |
} |
/// The callback that's run when a batch of changes comes in. |
void _onBatch(List<FileSystemEvent> batch) { |
- var changedEntries = new Set<String>(); |
- var oldEntries = new Map.from(_entries); |
+ var files = new Set<String>(); |
+ var dirs = new Set<String>(); |
+ var changed = new Set<String>(); |
// inotify event batches are ordered by occurrence, so we treat them as a |
- // log of what happened to a file. |
+ // log of what happened to a file. We only emit events based on the |
+ // difference between the state before the batch and the state after it, not |
+ // the intermediate state. |
for (var event in batch) { |
// If the watched directory is deleted or moved, we'll get a deletion |
// event for it. Ignore it; we handle closing [this] when the underlying |
// stream is closed. |
if (event.path == path) continue; |
- changedEntries.add(event.path); |
+ changed.add(event.path); |
if (event is FileSystemMoveEvent) { |
- changedEntries.add(event.destination); |
- _changeEntryState(event.path, ChangeType.REMOVE, event.isDirectory); |
- _changeEntryState(event.destination, ChangeType.ADD, event.isDirectory); |
+ files.remove(event.path); |
+ dirs.remove(event.path); |
+ |
+ changed.add(event.destination); |
+ if (event.isDirectory) { |
+ files.remove(event.destination); |
+ dirs.add(event.destination); |
+ } else { |
+ files.add(event.destination); |
+ dirs.remove(event.destination); |
+ } |
+ } else if (event is FileSystemDeleteEvent) { |
+ files.remove(event.path); |
+ dirs.remove(event.path); |
+ } else if (event.isDirectory) { |
+ files.remove(event.path); |
+ dirs.add(event.path); |
} else { |
- _changeEntryState(event.path, _changeTypeFor(event), event.isDirectory); |
+ files.add(event.path); |
+ dirs.remove(event.path); |
} |
} |
- for (var path in changedEntries) { |
- emitEvent(ChangeType type) { |
- if (isReady) _eventsController.add(new WatchEvent(type, path)); |
- } |
- |
- var oldState = oldEntries[path]; |
- var newState = _entries[path]; |
+ _applyChanges(files, dirs, changed); |
+ } |
- if (oldState != _EntryState.FILE && newState == _EntryState.FILE) { |
- emitEvent(ChangeType.ADD); |
- } else if (oldState == _EntryState.FILE && newState == _EntryState.FILE) { |
- emitEvent(ChangeType.MODIFY); |
- } else if (oldState == _EntryState.FILE && newState != _EntryState.FILE) { |
- emitEvent(ChangeType.REMOVE); |
+ /// Applies the net changes computed for a batch. |
+ /// |
+ /// The [files] and [dirs] sets contain the files and directories that now |
+ /// exist, respectively. The [changed] set contains all files and directories |
+ /// that have changed (including being removed), and so is a superset of |
+ /// [files] and [dirs]. |
+ void _applyChanges(Set<String> files, Set<String> dirs, Set<String> changed) { |
+ for (var path in changed) { |
+ var stream = _subdirStreams.remove(path); |
+ if (stream != null) _nativeEvents.add(stream); |
+ |
+ // Unless [path] was a file and still is, emit REMOVE events for it or its |
+ // contents, |
+ if (files.contains(path) && _files.contains(path)) continue; |
+ for (var file in _files.remove(path)) { |
+ _emit(ChangeType.REMOVE, file); |
} |
+ } |
- if (oldState == _EntryState.DIRECTORY) { |
- var watcher = _subWatchers.remove(path); |
- if (watcher == null) continue; |
- for (var path in watcher._allFiles) { |
- _eventsController.add(new WatchEvent(ChangeType.REMOVE, path)); |
- } |
- watcher.close(); |
+ for (var file in files) { |
+ if (_files.contains(file)) { |
+ _emit(ChangeType.MODIFY, file); |
+ } else { |
+ _emit(ChangeType.ADD, file); |
+ _files.add(file); |
} |
- |
- if (newState == _EntryState.DIRECTORY) _watchSubdir(path); |
} |
- } |
- /// Changes the known state of the entry at [path] based on [change] and |
- /// [isDir]. |
- void _changeEntryState(String path, ChangeType change, bool isDir) { |
- if (change == ChangeType.ADD || change == ChangeType.MODIFY) { |
- _entries[path] = new _EntryState(isDir); |
- } else { |
- assert(change == ChangeType.REMOVE); |
- _entries.remove(path); |
+ for (var dir in dirs) { |
+ _watchSubdir(dir); |
+ _addSubdir(dir); |
} |
} |
- /// Determines the [ChangeType] associated with [event]. |
- ChangeType _changeTypeFor(FileSystemEvent event) { |
- if (event is FileSystemDeleteEvent) return ChangeType.REMOVE; |
- if (event is FileSystemCreateEvent) return ChangeType.ADD; |
+ /// Emits [ChangeType.ADD] events for the recursive contents of [path]. |
+ void _addSubdir(String path) { |
+ _listen(new Directory(path).list(recursive: true), (entity) { |
+ if (entity is Directory) { |
+ _watchSubdir(entity.path); |
+ } else { |
+ _files.add(entity.path); |
+ _emit(ChangeType.ADD, entity.path); |
+ } |
+ }, onError: (error, stackTrace) { |
+ // Ignore an exception caused by the dir not existing. It's fine if it |
+ // was added and then quickly removed. |
+ if (error is FileSystemException) return; |
- assert(event is FileSystemModifyEvent); |
- return ChangeType.MODIFY; |
+ _eventsController.addError(error, stackTrace); |
+ close(); |
+ }, cancelOnError: true); |
} |
/// Handles the underlying event stream closing, indicating that the directory |
@@ -254,28 +229,23 @@ class _LinuxDirectoryWatcher |
void _onDone() { |
// Most of the time when a directory is removed, its contents will get |
// individual REMOVE events before the watch stream is closed -- in that |
- // case, [_entries] will be empty here. However, if the directory's removal |
- // is caused by a MOVE, we need to manually emit events. |
+ // case, [_files] will be empty here. However, if the directory's removal is |
+ // caused by a MOVE, we need to manually emit events. |
if (isReady) { |
- _entries.forEach((path, state) { |
- if (state == _EntryState.DIRECTORY) return; |
- _eventsController.add(new WatchEvent(ChangeType.REMOVE, path)); |
- }); |
+ for (var file in _files.paths) { |
+ _emit(ChangeType.REMOVE, file); |
+ } |
} |
- // The parent directory often gets a close event before the subdirectories |
- // are done emitting events. We wait for them to finish before we close |
- // [events] so that we can be sure to emit a remove event for every file |
- // that used to exist. |
- Future.wait(_subWatchers.values.map((watcher) { |
- try { |
- return watcher.events.toList(); |
- } on StateError catch (_) { |
- // It's possible that [watcher.events] is closed but the onDone event |
- // hasn't reached us yet. It's fine if so. |
- return new Future.value(); |
- } |
- })).then((_) => close()); |
+ close(); |
+ } |
+ |
+ /// Emits a [WatchEvent] with [type] and [path] if this watcher is in a state |
+ /// to emit events. |
+ void _emit(ChangeType type, String path) { |
+ if (!isReady) return; |
+ if (_eventsController.isClosed) return; |
+ _eventsController.add(new WatchEvent(type, path)); |
} |
/// Like [Stream.listen], but automatically adds the subscription to |
@@ -290,22 +260,3 @@ class _LinuxDirectoryWatcher |
_subscriptions.add(subscription); |
} |
} |
- |
-/// An enum for the possible states of entries in a watched directory. |
-class _EntryState { |
- final String _name; |
- |
- /// The entry is a file. |
- static const FILE = const _EntryState._("file"); |
- |
- /// The entry is a directory. |
- static const DIRECTORY = const _EntryState._("directory"); |
- |
- const _EntryState._(this._name); |
- |
- /// Returns [DIRECTORY] if [isDir] is true, and [FILE] otherwise. |
- factory _EntryState(bool isDir) => |
- isDir ? _EntryState.DIRECTORY : _EntryState.FILE; |
- |
- String toString() => _name; |
-} |