| Index: packages/watcher/lib/src/directory_watcher/linux.dart
|
| diff --git a/packages/watcher/lib/src/directory_watcher/linux.dart b/packages/watcher/lib/src/directory_watcher/linux.dart
|
| index a747839d2ba1b2afc960ff3c2a56ee0ab609bdf9..df1365c76bff19fa15f8a16b9d92eab430811b5f 100644
|
| --- a/packages/watcher/lib/src/directory_watcher/linux.dart
|
| +++ b/packages/watcher/lib/src/directory_watcher/linux.dart
|
| @@ -2,12 +2,13 @@
|
| // for details. All rights reserved. Use of this source code is governed by a
|
| // BSD-style license that can be found in the LICENSE file.
|
|
|
| -library watcher.directory_watcher.linux;
|
| -
|
| import 'dart:async';
|
| import 'dart:io';
|
|
|
| +import 'package:async/async.dart';
|
| +
|
| import '../directory_watcher.dart';
|
| +import '../path_set.dart';
|
| import '../resubscribable.dart';
|
| import '../utils.dart';
|
| import '../watch_event.dart';
|
| @@ -32,8 +33,8 @@ class LinuxDirectoryWatcher extends ResubscribableWatcher
|
|
|
| class _LinuxDirectoryWatcher
|
| implements DirectoryWatcher, ManuallyClosedWatcher {
|
| - String get directory => path;
|
| - final String path;
|
| + String get directory => _files.root;
|
| + String get path => _files.root;
|
|
|
| Stream<WatchEvent> get events => _eventsController.stream;
|
| final _eventsController = new StreamController<WatchEvent>.broadcast();
|
| @@ -43,15 +44,17 @@ class _LinuxDirectoryWatcher
|
| Future get ready => _readyCompleter.future;
|
| final _readyCompleter = new Completer();
|
|
|
| - /// The last known state for each entry in this directory.
|
| - ///
|
| - /// The keys in this map are the paths to the directory entries; the values
|
| - /// are [_EntryState]s indicating whether the entries are files or
|
| - /// directories.
|
| - final _entries = new Map<String, _EntryState>();
|
| + /// A stream group for the [Directory.watch] events of [path] and all its
|
| + /// subdirectories.
|
| + var _nativeEvents = new StreamGroup<FileSystemEvent>();
|
|
|
| - /// The watchers for subdirectories of [directory].
|
| - final _subWatchers = new Map<String, _LinuxDirectoryWatcher>();
|
| + /// All known files recursively within [path].
|
| + final PathSet _files;
|
| +
|
| + /// [Directory.watch] streams for [path]'s subdirectories, indexed by name.
|
| + ///
|
| + /// A stream is in this map if and only if it's also in [_nativeEvents].
|
| + final _subdirStreams = <String, Stream<FileSystemEvent>>{};
|
|
|
| /// A set of all subscriptions that this watcher subscribes to.
|
| ///
|
| @@ -59,93 +62,51 @@ class _LinuxDirectoryWatcher
|
| /// watcher is closed.
|
| final _subscriptions = new Set<StreamSubscription>();
|
|
|
| - _LinuxDirectoryWatcher(this.path) {
|
| + _LinuxDirectoryWatcher(String path)
|
| + : _files = new PathSet(path) {
|
| + _nativeEvents.add(new Directory(path).watch().transform(
|
| + new StreamTransformer.fromHandlers(handleDone: (sink) {
|
| + // Handle the done event here rather than in the call to [_listen] because
|
| + // [innerStream] won't close until we close the [StreamGroup]. However, if
|
| + // we close the [StreamGroup] here, we run the risk of new-directory
|
| + // events being fired after the group is closed, since batching delays
|
| + // those events. See b/30768513.
|
| + _onDone();
|
| + })));
|
| +
|
| // Batch the inotify changes together so that we can dedup events.
|
| - var innerStream = new Directory(path).watch()
|
| + var innerStream = _nativeEvents.stream
|
| .transform(new BatchedStreamTransformer<FileSystemEvent>());
|
| - _listen(innerStream, _onBatch,
|
| - onError: _eventsController.addError,
|
| - onDone: _onDone);
|
| -
|
| - _listen(new Directory(path).list(), (entity) {
|
| - _entries[entity.path] = new _EntryState(entity is Directory);
|
| - if (entity is! Directory) return;
|
| - _watchSubdir(entity.path);
|
| + _listen(innerStream, _onBatch, onError: _eventsController.addError);
|
| +
|
| + _listen(new Directory(path).list(recursive: true), (entity) {
|
| + if (entity is Directory) {
|
| + _watchSubdir(entity.path);
|
| + } else {
|
| + _files.add(entity.path);
|
| + }
|
| }, onError: (error, stackTrace) {
|
| _eventsController.addError(error, stackTrace);
|
| close();
|
| }, onDone: () {
|
| - _waitUntilReady().then((_) => _readyCompleter.complete());
|
| + _readyCompleter.complete();
|
| }, cancelOnError: true);
|
| }
|
|
|
| - /// Returns a [Future] that completes once all the subdirectory watchers are
|
| - /// fully initialized.
|
| - Future _waitUntilReady() {
|
| - return Future.wait(_subWatchers.values.map((watcher) => watcher.ready))
|
| - .then((_) {
|
| - if (_subWatchers.values.every((watcher) => watcher.isReady)) return null;
|
| - return _waitUntilReady();
|
| - });
|
| - }
|
| -
|
| void close() {
|
| for (var subscription in _subscriptions) {
|
| subscription.cancel();
|
| }
|
| - for (var watcher in _subWatchers.values) {
|
| - watcher.close();
|
| - }
|
|
|
| - _subWatchers.clear();
|
| _subscriptions.clear();
|
| + _subdirStreams.clear();
|
| + _files.clear();
|
| + _nativeEvents.close();
|
| _eventsController.close();
|
| }
|
|
|
| - /// Returns all files (not directories) that this watcher knows of are
|
| - /// recursively in the watched directory.
|
| - Set<String> get _allFiles {
|
| - var files = new Set<String>();
|
| - _getAllFiles(files);
|
| - return files;
|
| - }
|
| -
|
| - /// Helper function for [_allFiles].
|
| - ///
|
| - /// Adds all files that this watcher knows of to [files].
|
| - void _getAllFiles(Set<String> files) {
|
| - files.addAll(_entries.keys
|
| - .where((path) => _entries[path] == _EntryState.FILE).toSet());
|
| - for (var watcher in _subWatchers.values) {
|
| - watcher._getAllFiles(files);
|
| - }
|
| - }
|
| -
|
| /// Watch a subdirectory of [directory] for changes.
|
| - ///
|
| - /// If the subdirectory was added after [this] began emitting events, its
|
| - /// contents will be emitted as ADD events.
|
| void _watchSubdir(String path) {
|
| - if (_subWatchers.containsKey(path)) return;
|
| - var watcher = new _LinuxDirectoryWatcher(path);
|
| - _subWatchers[path] = watcher;
|
| -
|
| - // TODO(nweiz): Catch any errors here that indicate that the directory in
|
| - // question doesn't exist and silently stop watching it instead of
|
| - // propagating the errors.
|
| - _listen(watcher.events, (event) {
|
| - if (isReady) _eventsController.add(event);
|
| - }, onError: (error, stackTrace) {
|
| - _eventsController.addError(error, stackTrace);
|
| - close();
|
| - }, onDone: () {
|
| - if (_subWatchers[path] == watcher) _subWatchers.remove(path);
|
| -
|
| - // It's possible that a directory was removed and recreated very quickly.
|
| - // If so, make sure we're still watching it.
|
| - if (new Directory(path).existsSync()) _watchSubdir(path);
|
| - });
|
| -
|
| // TODO(nweiz): Right now it's possible for the watcher to emit an event for
|
| // a file before the directory list is complete. This could lead to the user
|
| // seeing a MODIFY or REMOVE event for a file before they see an ADD event,
|
| @@ -157,96 +118,110 @@ class _LinuxDirectoryWatcher
|
| // top-level clients such as barback as well, and could be implemented with
|
| // a wrapper similar to how listening/canceling works now.
|
|
|
| - // If a directory is added after we're finished with the initial scan, emit
|
| - // an event for each entry in it. This gives the user consistently gets an
|
| - // event for every new file.
|
| - watcher.ready.then((_) {
|
| - if (!isReady || _eventsController.isClosed) return;
|
| - _listen(new Directory(path).list(recursive: true), (entry) {
|
| - if (entry is Directory) return;
|
| - _eventsController.add(new WatchEvent(ChangeType.ADD, entry.path));
|
| - }, onError: (error, stackTrace) {
|
| - // Ignore an exception caused by the dir not existing. It's fine if it
|
| - // was added and then quickly removed.
|
| - if (error is FileSystemException) return;
|
| -
|
| - _eventsController.addError(error, stackTrace);
|
| - close();
|
| - }, cancelOnError: true);
|
| - });
|
| + // TODO(nweiz): Catch any errors here that indicate that the directory in
|
| + // question doesn't exist and silently stop watching it instead of
|
| + // propagating the errors.
|
| + var stream = new Directory(path).watch();
|
| + _subdirStreams[path] = stream;
|
| + _nativeEvents.add(stream);
|
| }
|
|
|
| /// The callback that's run when a batch of changes comes in.
|
| void _onBatch(List<FileSystemEvent> batch) {
|
| - var changedEntries = new Set<String>();
|
| - var oldEntries = new Map.from(_entries);
|
| + var files = new Set<String>();
|
| + var dirs = new Set<String>();
|
| + var changed = new Set<String>();
|
|
|
| // inotify event batches are ordered by occurrence, so we treat them as a
|
| - // log of what happened to a file.
|
| + // log of what happened to a file. We only emit events based on the
|
| + // difference between the state before the batch and the state after it, not
|
| + // the intermediate state.
|
| for (var event in batch) {
|
| // If the watched directory is deleted or moved, we'll get a deletion
|
| // event for it. Ignore it; we handle closing [this] when the underlying
|
| // stream is closed.
|
| if (event.path == path) continue;
|
|
|
| - changedEntries.add(event.path);
|
| + changed.add(event.path);
|
|
|
| if (event is FileSystemMoveEvent) {
|
| - changedEntries.add(event.destination);
|
| - _changeEntryState(event.path, ChangeType.REMOVE, event.isDirectory);
|
| - _changeEntryState(event.destination, ChangeType.ADD, event.isDirectory);
|
| + files.remove(event.path);
|
| + dirs.remove(event.path);
|
| +
|
| + changed.add(event.destination);
|
| + if (event.isDirectory) {
|
| + files.remove(event.destination);
|
| + dirs.add(event.destination);
|
| + } else {
|
| + files.add(event.destination);
|
| + dirs.remove(event.destination);
|
| + }
|
| + } else if (event is FileSystemDeleteEvent) {
|
| + files.remove(event.path);
|
| + dirs.remove(event.path);
|
| + } else if (event.isDirectory) {
|
| + files.remove(event.path);
|
| + dirs.add(event.path);
|
| } else {
|
| - _changeEntryState(event.path, _changeTypeFor(event), event.isDirectory);
|
| + files.add(event.path);
|
| + dirs.remove(event.path);
|
| }
|
| }
|
|
|
| - for (var path in changedEntries) {
|
| - emitEvent(ChangeType type) {
|
| - if (isReady) _eventsController.add(new WatchEvent(type, path));
|
| - }
|
| -
|
| - var oldState = oldEntries[path];
|
| - var newState = _entries[path];
|
| + _applyChanges(files, dirs, changed);
|
| + }
|
|
|
| - if (oldState != _EntryState.FILE && newState == _EntryState.FILE) {
|
| - emitEvent(ChangeType.ADD);
|
| - } else if (oldState == _EntryState.FILE && newState == _EntryState.FILE) {
|
| - emitEvent(ChangeType.MODIFY);
|
| - } else if (oldState == _EntryState.FILE && newState != _EntryState.FILE) {
|
| - emitEvent(ChangeType.REMOVE);
|
| + /// Applies the net changes computed for a batch.
|
| + ///
|
| + /// The [files] and [dirs] sets contain the files and directories that now
|
| + /// exist, respectively. The [changed] set contains all files and directories
|
| + /// that have changed (including being removed), and so is a superset of
|
| + /// [files] and [dirs].
|
| + void _applyChanges(Set<String> files, Set<String> dirs, Set<String> changed) {
|
| + for (var path in changed) {
|
| + var stream = _subdirStreams.remove(path);
|
| + if (stream != null) _nativeEvents.add(stream);
|
| +
|
| + // Unless [path] was a file and still is, emit REMOVE events for it or its
|
| + // contents,
|
| + if (files.contains(path) && _files.contains(path)) continue;
|
| + for (var file in _files.remove(path)) {
|
| + _emit(ChangeType.REMOVE, file);
|
| }
|
| + }
|
|
|
| - if (oldState == _EntryState.DIRECTORY) {
|
| - var watcher = _subWatchers.remove(path);
|
| - if (watcher == null) continue;
|
| - for (var path in watcher._allFiles) {
|
| - _eventsController.add(new WatchEvent(ChangeType.REMOVE, path));
|
| - }
|
| - watcher.close();
|
| + for (var file in files) {
|
| + if (_files.contains(file)) {
|
| + _emit(ChangeType.MODIFY, file);
|
| + } else {
|
| + _emit(ChangeType.ADD, file);
|
| + _files.add(file);
|
| }
|
| -
|
| - if (newState == _EntryState.DIRECTORY) _watchSubdir(path);
|
| }
|
| - }
|
|
|
| - /// Changes the known state of the entry at [path] based on [change] and
|
| - /// [isDir].
|
| - void _changeEntryState(String path, ChangeType change, bool isDir) {
|
| - if (change == ChangeType.ADD || change == ChangeType.MODIFY) {
|
| - _entries[path] = new _EntryState(isDir);
|
| - } else {
|
| - assert(change == ChangeType.REMOVE);
|
| - _entries.remove(path);
|
| + for (var dir in dirs) {
|
| + _watchSubdir(dir);
|
| + _addSubdir(dir);
|
| }
|
| }
|
|
|
| - /// Determines the [ChangeType] associated with [event].
|
| - ChangeType _changeTypeFor(FileSystemEvent event) {
|
| - if (event is FileSystemDeleteEvent) return ChangeType.REMOVE;
|
| - if (event is FileSystemCreateEvent) return ChangeType.ADD;
|
| + /// Emits [ChangeType.ADD] events for the recursive contents of [path].
|
| + void _addSubdir(String path) {
|
| + _listen(new Directory(path).list(recursive: true), (entity) {
|
| + if (entity is Directory) {
|
| + _watchSubdir(entity.path);
|
| + } else {
|
| + _files.add(entity.path);
|
| + _emit(ChangeType.ADD, entity.path);
|
| + }
|
| + }, onError: (error, stackTrace) {
|
| + // Ignore an exception caused by the dir not existing. It's fine if it
|
| + // was added and then quickly removed.
|
| + if (error is FileSystemException) return;
|
|
|
| - assert(event is FileSystemModifyEvent);
|
| - return ChangeType.MODIFY;
|
| + _eventsController.addError(error, stackTrace);
|
| + close();
|
| + }, cancelOnError: true);
|
| }
|
|
|
| /// Handles the underlying event stream closing, indicating that the directory
|
| @@ -254,28 +229,23 @@ class _LinuxDirectoryWatcher
|
| void _onDone() {
|
| // Most of the time when a directory is removed, its contents will get
|
| // individual REMOVE events before the watch stream is closed -- in that
|
| - // case, [_entries] will be empty here. However, if the directory's removal
|
| - // is caused by a MOVE, we need to manually emit events.
|
| + // case, [_files] will be empty here. However, if the directory's removal is
|
| + // caused by a MOVE, we need to manually emit events.
|
| if (isReady) {
|
| - _entries.forEach((path, state) {
|
| - if (state == _EntryState.DIRECTORY) return;
|
| - _eventsController.add(new WatchEvent(ChangeType.REMOVE, path));
|
| - });
|
| + for (var file in _files.paths) {
|
| + _emit(ChangeType.REMOVE, file);
|
| + }
|
| }
|
|
|
| - // The parent directory often gets a close event before the subdirectories
|
| - // are done emitting events. We wait for them to finish before we close
|
| - // [events] so that we can be sure to emit a remove event for every file
|
| - // that used to exist.
|
| - Future.wait(_subWatchers.values.map((watcher) {
|
| - try {
|
| - return watcher.events.toList();
|
| - } on StateError catch (_) {
|
| - // It's possible that [watcher.events] is closed but the onDone event
|
| - // hasn't reached us yet. It's fine if so.
|
| - return new Future.value();
|
| - }
|
| - })).then((_) => close());
|
| + close();
|
| + }
|
| +
|
| + /// Emits a [WatchEvent] with [type] and [path] if this watcher is in a state
|
| + /// to emit events.
|
| + void _emit(ChangeType type, String path) {
|
| + if (!isReady) return;
|
| + if (_eventsController.isClosed) return;
|
| + _eventsController.add(new WatchEvent(type, path));
|
| }
|
|
|
| /// Like [Stream.listen], but automatically adds the subscription to
|
| @@ -290,22 +260,3 @@ class _LinuxDirectoryWatcher
|
| _subscriptions.add(subscription);
|
| }
|
| }
|
| -
|
| -/// An enum for the possible states of entries in a watched directory.
|
| -class _EntryState {
|
| - final String _name;
|
| -
|
| - /// The entry is a file.
|
| - static const FILE = const _EntryState._("file");
|
| -
|
| - /// The entry is a directory.
|
| - static const DIRECTORY = const _EntryState._("directory");
|
| -
|
| - const _EntryState._(this._name);
|
| -
|
| - /// Returns [DIRECTORY] if [isDir] is true, and [FILE] otherwise.
|
| - factory _EntryState(bool isDir) =>
|
| - isDir ? _EntryState.DIRECTORY : _EntryState.FILE;
|
| -
|
| - String toString() => _name;
|
| -}
|
|
|