Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(14)

Unified Diff: packages/watcher/lib/src/directory_watcher/linux.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: packages/watcher/lib/src/directory_watcher/linux.dart
diff --git a/packages/watcher/lib/src/directory_watcher/linux.dart b/packages/watcher/lib/src/directory_watcher/linux.dart
index a747839d2ba1b2afc960ff3c2a56ee0ab609bdf9..df1365c76bff19fa15f8a16b9d92eab430811b5f 100644
--- a/packages/watcher/lib/src/directory_watcher/linux.dart
+++ b/packages/watcher/lib/src/directory_watcher/linux.dart
@@ -2,12 +2,13 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-library watcher.directory_watcher.linux;
-
import 'dart:async';
import 'dart:io';
+import 'package:async/async.dart';
+
import '../directory_watcher.dart';
+import '../path_set.dart';
import '../resubscribable.dart';
import '../utils.dart';
import '../watch_event.dart';
@@ -32,8 +33,8 @@ class LinuxDirectoryWatcher extends ResubscribableWatcher
class _LinuxDirectoryWatcher
implements DirectoryWatcher, ManuallyClosedWatcher {
- String get directory => path;
- final String path;
+ String get directory => _files.root;
+ String get path => _files.root;
Stream<WatchEvent> get events => _eventsController.stream;
final _eventsController = new StreamController<WatchEvent>.broadcast();
@@ -43,15 +44,17 @@ class _LinuxDirectoryWatcher
Future get ready => _readyCompleter.future;
final _readyCompleter = new Completer();
- /// The last known state for each entry in this directory.
- ///
- /// The keys in this map are the paths to the directory entries; the values
- /// are [_EntryState]s indicating whether the entries are files or
- /// directories.
- final _entries = new Map<String, _EntryState>();
+ /// A stream group for the [Directory.watch] events of [path] and all its
+ /// subdirectories.
+ var _nativeEvents = new StreamGroup<FileSystemEvent>();
- /// The watchers for subdirectories of [directory].
- final _subWatchers = new Map<String, _LinuxDirectoryWatcher>();
+ /// All known files recursively within [path].
+ final PathSet _files;
+
+ /// [Directory.watch] streams for [path]'s subdirectories, indexed by name.
+ ///
+ /// A stream is in this map if and only if it's also in [_nativeEvents].
+ final _subdirStreams = <String, Stream<FileSystemEvent>>{};
/// A set of all subscriptions that this watcher subscribes to.
///
@@ -59,93 +62,51 @@ class _LinuxDirectoryWatcher
/// watcher is closed.
final _subscriptions = new Set<StreamSubscription>();
- _LinuxDirectoryWatcher(this.path) {
+ _LinuxDirectoryWatcher(String path)
+ : _files = new PathSet(path) {
+ _nativeEvents.add(new Directory(path).watch().transform(
+ new StreamTransformer.fromHandlers(handleDone: (sink) {
+ // Handle the done event here rather than in the call to [_listen] because
+ // [innerStream] won't close until we close the [StreamGroup]. However, if
+ // we close the [StreamGroup] here, we run the risk of new-directory
+ // events being fired after the group is closed, since batching delays
+ // those events. See b/30768513.
+ _onDone();
+ })));
+
// Batch the inotify changes together so that we can dedup events.
- var innerStream = new Directory(path).watch()
+ var innerStream = _nativeEvents.stream
.transform(new BatchedStreamTransformer<FileSystemEvent>());
- _listen(innerStream, _onBatch,
- onError: _eventsController.addError,
- onDone: _onDone);
-
- _listen(new Directory(path).list(), (entity) {
- _entries[entity.path] = new _EntryState(entity is Directory);
- if (entity is! Directory) return;
- _watchSubdir(entity.path);
+ _listen(innerStream, _onBatch, onError: _eventsController.addError);
+
+ _listen(new Directory(path).list(recursive: true), (entity) {
+ if (entity is Directory) {
+ _watchSubdir(entity.path);
+ } else {
+ _files.add(entity.path);
+ }
}, onError: (error, stackTrace) {
_eventsController.addError(error, stackTrace);
close();
}, onDone: () {
- _waitUntilReady().then((_) => _readyCompleter.complete());
+ _readyCompleter.complete();
}, cancelOnError: true);
}
- /// Returns a [Future] that completes once all the subdirectory watchers are
- /// fully initialized.
- Future _waitUntilReady() {
- return Future.wait(_subWatchers.values.map((watcher) => watcher.ready))
- .then((_) {
- if (_subWatchers.values.every((watcher) => watcher.isReady)) return null;
- return _waitUntilReady();
- });
- }
-
void close() {
for (var subscription in _subscriptions) {
subscription.cancel();
}
- for (var watcher in _subWatchers.values) {
- watcher.close();
- }
- _subWatchers.clear();
_subscriptions.clear();
+ _subdirStreams.clear();
+ _files.clear();
+ _nativeEvents.close();
_eventsController.close();
}
- /// Returns all files (not directories) that this watcher knows of are
- /// recursively in the watched directory.
- Set<String> get _allFiles {
- var files = new Set<String>();
- _getAllFiles(files);
- return files;
- }
-
- /// Helper function for [_allFiles].
- ///
- /// Adds all files that this watcher knows of to [files].
- void _getAllFiles(Set<String> files) {
- files.addAll(_entries.keys
- .where((path) => _entries[path] == _EntryState.FILE).toSet());
- for (var watcher in _subWatchers.values) {
- watcher._getAllFiles(files);
- }
- }
-
/// Watch a subdirectory of [directory] for changes.
- ///
- /// If the subdirectory was added after [this] began emitting events, its
- /// contents will be emitted as ADD events.
void _watchSubdir(String path) {
- if (_subWatchers.containsKey(path)) return;
- var watcher = new _LinuxDirectoryWatcher(path);
- _subWatchers[path] = watcher;
-
- // TODO(nweiz): Catch any errors here that indicate that the directory in
- // question doesn't exist and silently stop watching it instead of
- // propagating the errors.
- _listen(watcher.events, (event) {
- if (isReady) _eventsController.add(event);
- }, onError: (error, stackTrace) {
- _eventsController.addError(error, stackTrace);
- close();
- }, onDone: () {
- if (_subWatchers[path] == watcher) _subWatchers.remove(path);
-
- // It's possible that a directory was removed and recreated very quickly.
- // If so, make sure we're still watching it.
- if (new Directory(path).existsSync()) _watchSubdir(path);
- });
-
// TODO(nweiz): Right now it's possible for the watcher to emit an event for
// a file before the directory list is complete. This could lead to the user
// seeing a MODIFY or REMOVE event for a file before they see an ADD event,
@@ -157,96 +118,110 @@ class _LinuxDirectoryWatcher
// top-level clients such as barback as well, and could be implemented with
// a wrapper similar to how listening/canceling works now.
- // If a directory is added after we're finished with the initial scan, emit
- // an event for each entry in it. This gives the user consistently gets an
- // event for every new file.
- watcher.ready.then((_) {
- if (!isReady || _eventsController.isClosed) return;
- _listen(new Directory(path).list(recursive: true), (entry) {
- if (entry is Directory) return;
- _eventsController.add(new WatchEvent(ChangeType.ADD, entry.path));
- }, onError: (error, stackTrace) {
- // Ignore an exception caused by the dir not existing. It's fine if it
- // was added and then quickly removed.
- if (error is FileSystemException) return;
-
- _eventsController.addError(error, stackTrace);
- close();
- }, cancelOnError: true);
- });
+ // TODO(nweiz): Catch any errors here that indicate that the directory in
+ // question doesn't exist and silently stop watching it instead of
+ // propagating the errors.
+ var stream = new Directory(path).watch();
+ _subdirStreams[path] = stream;
+ _nativeEvents.add(stream);
}
/// The callback that's run when a batch of changes comes in.
void _onBatch(List<FileSystemEvent> batch) {
- var changedEntries = new Set<String>();
- var oldEntries = new Map.from(_entries);
+ var files = new Set<String>();
+ var dirs = new Set<String>();
+ var changed = new Set<String>();
// inotify event batches are ordered by occurrence, so we treat them as a
- // log of what happened to a file.
+ // log of what happened to a file. We only emit events based on the
+ // difference between the state before the batch and the state after it, not
+ // the intermediate state.
for (var event in batch) {
// If the watched directory is deleted or moved, we'll get a deletion
// event for it. Ignore it; we handle closing [this] when the underlying
// stream is closed.
if (event.path == path) continue;
- changedEntries.add(event.path);
+ changed.add(event.path);
if (event is FileSystemMoveEvent) {
- changedEntries.add(event.destination);
- _changeEntryState(event.path, ChangeType.REMOVE, event.isDirectory);
- _changeEntryState(event.destination, ChangeType.ADD, event.isDirectory);
+ files.remove(event.path);
+ dirs.remove(event.path);
+
+ changed.add(event.destination);
+ if (event.isDirectory) {
+ files.remove(event.destination);
+ dirs.add(event.destination);
+ } else {
+ files.add(event.destination);
+ dirs.remove(event.destination);
+ }
+ } else if (event is FileSystemDeleteEvent) {
+ files.remove(event.path);
+ dirs.remove(event.path);
+ } else if (event.isDirectory) {
+ files.remove(event.path);
+ dirs.add(event.path);
} else {
- _changeEntryState(event.path, _changeTypeFor(event), event.isDirectory);
+ files.add(event.path);
+ dirs.remove(event.path);
}
}
- for (var path in changedEntries) {
- emitEvent(ChangeType type) {
- if (isReady) _eventsController.add(new WatchEvent(type, path));
- }
-
- var oldState = oldEntries[path];
- var newState = _entries[path];
+ _applyChanges(files, dirs, changed);
+ }
- if (oldState != _EntryState.FILE && newState == _EntryState.FILE) {
- emitEvent(ChangeType.ADD);
- } else if (oldState == _EntryState.FILE && newState == _EntryState.FILE) {
- emitEvent(ChangeType.MODIFY);
- } else if (oldState == _EntryState.FILE && newState != _EntryState.FILE) {
- emitEvent(ChangeType.REMOVE);
+ /// Applies the net changes computed for a batch.
+ ///
+ /// The [files] and [dirs] sets contain the files and directories that now
+ /// exist, respectively. The [changed] set contains all files and directories
+ /// that have changed (including being removed), and so is a superset of
+ /// [files] and [dirs].
+ void _applyChanges(Set<String> files, Set<String> dirs, Set<String> changed) {
+ for (var path in changed) {
+ var stream = _subdirStreams.remove(path);
+ if (stream != null) _nativeEvents.add(stream);
+
+ // Unless [path] was a file and still is, emit REMOVE events for it or its
+ // contents,
+ if (files.contains(path) && _files.contains(path)) continue;
+ for (var file in _files.remove(path)) {
+ _emit(ChangeType.REMOVE, file);
}
+ }
- if (oldState == _EntryState.DIRECTORY) {
- var watcher = _subWatchers.remove(path);
- if (watcher == null) continue;
- for (var path in watcher._allFiles) {
- _eventsController.add(new WatchEvent(ChangeType.REMOVE, path));
- }
- watcher.close();
+ for (var file in files) {
+ if (_files.contains(file)) {
+ _emit(ChangeType.MODIFY, file);
+ } else {
+ _emit(ChangeType.ADD, file);
+ _files.add(file);
}
-
- if (newState == _EntryState.DIRECTORY) _watchSubdir(path);
}
- }
- /// Changes the known state of the entry at [path] based on [change] and
- /// [isDir].
- void _changeEntryState(String path, ChangeType change, bool isDir) {
- if (change == ChangeType.ADD || change == ChangeType.MODIFY) {
- _entries[path] = new _EntryState(isDir);
- } else {
- assert(change == ChangeType.REMOVE);
- _entries.remove(path);
+ for (var dir in dirs) {
+ _watchSubdir(dir);
+ _addSubdir(dir);
}
}
- /// Determines the [ChangeType] associated with [event].
- ChangeType _changeTypeFor(FileSystemEvent event) {
- if (event is FileSystemDeleteEvent) return ChangeType.REMOVE;
- if (event is FileSystemCreateEvent) return ChangeType.ADD;
+ /// Emits [ChangeType.ADD] events for the recursive contents of [path].
+ void _addSubdir(String path) {
+ _listen(new Directory(path).list(recursive: true), (entity) {
+ if (entity is Directory) {
+ _watchSubdir(entity.path);
+ } else {
+ _files.add(entity.path);
+ _emit(ChangeType.ADD, entity.path);
+ }
+ }, onError: (error, stackTrace) {
+ // Ignore an exception caused by the dir not existing. It's fine if it
+ // was added and then quickly removed.
+ if (error is FileSystemException) return;
- assert(event is FileSystemModifyEvent);
- return ChangeType.MODIFY;
+ _eventsController.addError(error, stackTrace);
+ close();
+ }, cancelOnError: true);
}
/// Handles the underlying event stream closing, indicating that the directory
@@ -254,28 +229,23 @@ class _LinuxDirectoryWatcher
void _onDone() {
// Most of the time when a directory is removed, its contents will get
// individual REMOVE events before the watch stream is closed -- in that
- // case, [_entries] will be empty here. However, if the directory's removal
- // is caused by a MOVE, we need to manually emit events.
+ // case, [_files] will be empty here. However, if the directory's removal is
+ // caused by a MOVE, we need to manually emit events.
if (isReady) {
- _entries.forEach((path, state) {
- if (state == _EntryState.DIRECTORY) return;
- _eventsController.add(new WatchEvent(ChangeType.REMOVE, path));
- });
+ for (var file in _files.paths) {
+ _emit(ChangeType.REMOVE, file);
+ }
}
- // The parent directory often gets a close event before the subdirectories
- // are done emitting events. We wait for them to finish before we close
- // [events] so that we can be sure to emit a remove event for every file
- // that used to exist.
- Future.wait(_subWatchers.values.map((watcher) {
- try {
- return watcher.events.toList();
- } on StateError catch (_) {
- // It's possible that [watcher.events] is closed but the onDone event
- // hasn't reached us yet. It's fine if so.
- return new Future.value();
- }
- })).then((_) => close());
+ close();
+ }
+
+ /// Emits a [WatchEvent] with [type] and [path] if this watcher is in a state
+ /// to emit events.
+ void _emit(ChangeType type, String path) {
+ if (!isReady) return;
+ if (_eventsController.isClosed) return;
+ _eventsController.add(new WatchEvent(type, path));
}
/// Like [Stream.listen], but automatically adds the subscription to
@@ -290,22 +260,3 @@ class _LinuxDirectoryWatcher
_subscriptions.add(subscription);
}
}
-
-/// An enum for the possible states of entries in a watched directory.
-class _EntryState {
- final String _name;
-
- /// The entry is a file.
- static const FILE = const _EntryState._("file");
-
- /// The entry is a directory.
- static const DIRECTORY = const _EntryState._("directory");
-
- const _EntryState._(this._name);
-
- /// Returns [DIRECTORY] if [isDir] is true, and [FILE] otherwise.
- factory _EntryState(bool isDir) =>
- isDir ? _EntryState.DIRECTORY : _EntryState.FILE;
-
- String toString() => _name;
-}
« no previous file with comments | « packages/watcher/lib/src/directory_watcher.dart ('k') | packages/watcher/lib/src/directory_watcher/mac_os.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698