Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(480)

Side by Side Diff: packages/watcher/lib/src/directory_watcher/linux.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library watcher.directory_watcher.linux;
6
7 import 'dart:async'; 5 import 'dart:async';
8 import 'dart:io'; 6 import 'dart:io';
9 7
8 import 'package:async/async.dart';
9
10 import '../directory_watcher.dart'; 10 import '../directory_watcher.dart';
11 import '../path_set.dart';
11 import '../resubscribable.dart'; 12 import '../resubscribable.dart';
12 import '../utils.dart'; 13 import '../utils.dart';
13 import '../watch_event.dart'; 14 import '../watch_event.dart';
14 15
15 /// Uses the inotify subsystem to watch for filesystem events. 16 /// Uses the inotify subsystem to watch for filesystem events.
16 /// 17 ///
17 /// Inotify doesn't suport recursively watching subdirectories, nor does 18 /// Inotify doesn't suport recursively watching subdirectories, nor does
18 /// [Directory.watch] polyfill that functionality. This class polyfills it 19 /// [Directory.watch] polyfill that functionality. This class polyfills it
19 /// instead. 20 /// instead.
20 /// 21 ///
21 /// This class also compensates for the non-inotify-specific issues of 22 /// This class also compensates for the non-inotify-specific issues of
22 /// [Directory.watch] producing multiple events for a single logical action 23 /// [Directory.watch] producing multiple events for a single logical action
23 /// (issue 14372) and providing insufficient information about move events 24 /// (issue 14372) and providing insufficient information about move events
24 /// (issue 14424). 25 /// (issue 14424).
25 class LinuxDirectoryWatcher extends ResubscribableWatcher 26 class LinuxDirectoryWatcher extends ResubscribableWatcher
26 implements DirectoryWatcher { 27 implements DirectoryWatcher {
27 String get directory => path; 28 String get directory => path;
28 29
29 LinuxDirectoryWatcher(String directory) 30 LinuxDirectoryWatcher(String directory)
30 : super(directory, () => new _LinuxDirectoryWatcher(directory)); 31 : super(directory, () => new _LinuxDirectoryWatcher(directory));
31 } 32 }
32 33
33 class _LinuxDirectoryWatcher 34 class _LinuxDirectoryWatcher
34 implements DirectoryWatcher, ManuallyClosedWatcher { 35 implements DirectoryWatcher, ManuallyClosedWatcher {
35 String get directory => path; 36 String get directory => _files.root;
36 final String path; 37 String get path => _files.root;
37 38
38 Stream<WatchEvent> get events => _eventsController.stream; 39 Stream<WatchEvent> get events => _eventsController.stream;
39 final _eventsController = new StreamController<WatchEvent>.broadcast(); 40 final _eventsController = new StreamController<WatchEvent>.broadcast();
40 41
41 bool get isReady => _readyCompleter.isCompleted; 42 bool get isReady => _readyCompleter.isCompleted;
42 43
43 Future get ready => _readyCompleter.future; 44 Future get ready => _readyCompleter.future;
44 final _readyCompleter = new Completer(); 45 final _readyCompleter = new Completer();
45 46
46 /// The last known state for each entry in this directory. 47 /// A stream group for the [Directory.watch] events of [path] and all its
48 /// subdirectories.
49 var _nativeEvents = new StreamGroup<FileSystemEvent>();
50
51 /// All known files recursively within [path].
52 final PathSet _files;
53
54 /// [Directory.watch] streams for [path]'s subdirectories, indexed by name.
47 /// 55 ///
48 /// The keys in this map are the paths to the directory entries; the values 56 /// A stream is in this map if and only if it's also in [_nativeEvents].
49 /// are [_EntryState]s indicating whether the entries are files or 57 final _subdirStreams = <String, Stream<FileSystemEvent>>{};
50 /// directories.
51 final _entries = new Map<String, _EntryState>();
52
53 /// The watchers for subdirectories of [directory].
54 final _subWatchers = new Map<String, _LinuxDirectoryWatcher>();
55 58
56 /// A set of all subscriptions that this watcher subscribes to. 59 /// A set of all subscriptions that this watcher subscribes to.
57 /// 60 ///
58 /// These are gathered together so that they may all be canceled when the 61 /// These are gathered together so that they may all be canceled when the
59 /// watcher is closed. 62 /// watcher is closed.
60 final _subscriptions = new Set<StreamSubscription>(); 63 final _subscriptions = new Set<StreamSubscription>();
61 64
62 _LinuxDirectoryWatcher(this.path) { 65 _LinuxDirectoryWatcher(String path)
66 : _files = new PathSet(path) {
67 _nativeEvents.add(new Directory(path).watch().transform(
68 new StreamTransformer.fromHandlers(handleDone: (sink) {
69 // Handle the done event here rather than in the call to [_listen] because
70 // [innerStream] won't close until we close the [StreamGroup]. However, if
71 // we close the [StreamGroup] here, we run the risk of new-directory
72 // events being fired after the group is closed, since batching delays
73 // those events. See b/30768513.
74 _onDone();
75 })));
76
63 // Batch the inotify changes together so that we can dedup events. 77 // Batch the inotify changes together so that we can dedup events.
64 var innerStream = new Directory(path).watch() 78 var innerStream = _nativeEvents.stream
65 .transform(new BatchedStreamTransformer<FileSystemEvent>()); 79 .transform(new BatchedStreamTransformer<FileSystemEvent>());
66 _listen(innerStream, _onBatch, 80 _listen(innerStream, _onBatch, onError: _eventsController.addError);
67 onError: _eventsController.addError,
68 onDone: _onDone);
69 81
70 _listen(new Directory(path).list(), (entity) { 82 _listen(new Directory(path).list(recursive: true), (entity) {
71 _entries[entity.path] = new _EntryState(entity is Directory); 83 if (entity is Directory) {
72 if (entity is! Directory) return; 84 _watchSubdir(entity.path);
73 _watchSubdir(entity.path); 85 } else {
86 _files.add(entity.path);
87 }
74 }, onError: (error, stackTrace) { 88 }, onError: (error, stackTrace) {
75 _eventsController.addError(error, stackTrace); 89 _eventsController.addError(error, stackTrace);
76 close(); 90 close();
77 }, onDone: () { 91 }, onDone: () {
78 _waitUntilReady().then((_) => _readyCompleter.complete()); 92 _readyCompleter.complete();
79 }, cancelOnError: true); 93 }, cancelOnError: true);
80 } 94 }
81 95
82 /// Returns a [Future] that completes once all the subdirectory watchers are
83 /// fully initialized.
84 Future _waitUntilReady() {
85 return Future.wait(_subWatchers.values.map((watcher) => watcher.ready))
86 .then((_) {
87 if (_subWatchers.values.every((watcher) => watcher.isReady)) return null;
88 return _waitUntilReady();
89 });
90 }
91
92 void close() { 96 void close() {
93 for (var subscription in _subscriptions) { 97 for (var subscription in _subscriptions) {
94 subscription.cancel(); 98 subscription.cancel();
95 } 99 }
96 for (var watcher in _subWatchers.values) {
97 watcher.close();
98 }
99 100
100 _subWatchers.clear();
101 _subscriptions.clear(); 101 _subscriptions.clear();
102 _subdirStreams.clear();
103 _files.clear();
104 _nativeEvents.close();
102 _eventsController.close(); 105 _eventsController.close();
103 } 106 }
104 107
105 /// Returns all files (not directories) that this watcher knows of are
106 /// recursively in the watched directory.
107 Set<String> get _allFiles {
108 var files = new Set<String>();
109 _getAllFiles(files);
110 return files;
111 }
112
113 /// Helper function for [_allFiles].
114 ///
115 /// Adds all files that this watcher knows of to [files].
116 void _getAllFiles(Set<String> files) {
117 files.addAll(_entries.keys
118 .where((path) => _entries[path] == _EntryState.FILE).toSet());
119 for (var watcher in _subWatchers.values) {
120 watcher._getAllFiles(files);
121 }
122 }
123
124 /// Watch a subdirectory of [directory] for changes. 108 /// Watch a subdirectory of [directory] for changes.
125 ///
126 /// If the subdirectory was added after [this] began emitting events, its
127 /// contents will be emitted as ADD events.
128 void _watchSubdir(String path) { 109 void _watchSubdir(String path) {
129 if (_subWatchers.containsKey(path)) return;
130 var watcher = new _LinuxDirectoryWatcher(path);
131 _subWatchers[path] = watcher;
132
133 // TODO(nweiz): Catch any errors here that indicate that the directory in
134 // question doesn't exist and silently stop watching it instead of
135 // propagating the errors.
136 _listen(watcher.events, (event) {
137 if (isReady) _eventsController.add(event);
138 }, onError: (error, stackTrace) {
139 _eventsController.addError(error, stackTrace);
140 close();
141 }, onDone: () {
142 if (_subWatchers[path] == watcher) _subWatchers.remove(path);
143
144 // It's possible that a directory was removed and recreated very quickly.
145 // If so, make sure we're still watching it.
146 if (new Directory(path).existsSync()) _watchSubdir(path);
147 });
148
149 // TODO(nweiz): Right now it's possible for the watcher to emit an event for 110 // TODO(nweiz): Right now it's possible for the watcher to emit an event for
150 // a file before the directory list is complete. This could lead to the user 111 // a file before the directory list is complete. This could lead to the user
151 // seeing a MODIFY or REMOVE event for a file before they see an ADD event, 112 // seeing a MODIFY or REMOVE event for a file before they see an ADD event,
152 // which is bad. We should handle that. 113 // which is bad. We should handle that.
153 // 114 //
154 // One possibility is to provide a general means (e.g. 115 // One possibility is to provide a general means (e.g.
155 // `DirectoryWatcher.eventsAndExistingFiles`) to tell a watcher to emit 116 // `DirectoryWatcher.eventsAndExistingFiles`) to tell a watcher to emit
156 // events for all the files that already exist. This would be useful for 117 // events for all the files that already exist. This would be useful for
157 // top-level clients such as barback as well, and could be implemented with 118 // top-level clients such as barback as well, and could be implemented with
158 // a wrapper similar to how listening/canceling works now. 119 // a wrapper similar to how listening/canceling works now.
159 120
160 // If a directory is added after we're finished with the initial scan, emit 121 // TODO(nweiz): Catch any errors here that indicate that the directory in
161 // an event for each entry in it. This gives the user consistently gets an 122 // question doesn't exist and silently stop watching it instead of
162 // event for every new file. 123 // propagating the errors.
163 watcher.ready.then((_) { 124 var stream = new Directory(path).watch();
164 if (!isReady || _eventsController.isClosed) return; 125 _subdirStreams[path] = stream;
165 _listen(new Directory(path).list(recursive: true), (entry) { 126 _nativeEvents.add(stream);
166 if (entry is Directory) return;
167 _eventsController.add(new WatchEvent(ChangeType.ADD, entry.path));
168 }, onError: (error, stackTrace) {
169 // Ignore an exception caused by the dir not existing. It's fine if it
170 // was added and then quickly removed.
171 if (error is FileSystemException) return;
172
173 _eventsController.addError(error, stackTrace);
174 close();
175 }, cancelOnError: true);
176 });
177 } 127 }
178 128
179 /// The callback that's run when a batch of changes comes in. 129 /// The callback that's run when a batch of changes comes in.
180 void _onBatch(List<FileSystemEvent> batch) { 130 void _onBatch(List<FileSystemEvent> batch) {
181 var changedEntries = new Set<String>(); 131 var files = new Set<String>();
182 var oldEntries = new Map.from(_entries); 132 var dirs = new Set<String>();
133 var changed = new Set<String>();
183 134
184 // inotify event batches are ordered by occurrence, so we treat them as a 135 // inotify event batches are ordered by occurrence, so we treat them as a
185 // log of what happened to a file. 136 // log of what happened to a file. We only emit events based on the
137 // difference between the state before the batch and the state after it, not
138 // the intermediate state.
186 for (var event in batch) { 139 for (var event in batch) {
187 // If the watched directory is deleted or moved, we'll get a deletion 140 // If the watched directory is deleted or moved, we'll get a deletion
188 // event for it. Ignore it; we handle closing [this] when the underlying 141 // event for it. Ignore it; we handle closing [this] when the underlying
189 // stream is closed. 142 // stream is closed.
190 if (event.path == path) continue; 143 if (event.path == path) continue;
191 144
192 changedEntries.add(event.path); 145 changed.add(event.path);
193 146
194 if (event is FileSystemMoveEvent) { 147 if (event is FileSystemMoveEvent) {
195 changedEntries.add(event.destination); 148 files.remove(event.path);
196 _changeEntryState(event.path, ChangeType.REMOVE, event.isDirectory); 149 dirs.remove(event.path);
197 _changeEntryState(event.destination, ChangeType.ADD, event.isDirectory); 150
151 changed.add(event.destination);
152 if (event.isDirectory) {
153 files.remove(event.destination);
154 dirs.add(event.destination);
155 } else {
156 files.add(event.destination);
157 dirs.remove(event.destination);
158 }
159 } else if (event is FileSystemDeleteEvent) {
160 files.remove(event.path);
161 dirs.remove(event.path);
162 } else if (event.isDirectory) {
163 files.remove(event.path);
164 dirs.add(event.path);
198 } else { 165 } else {
199 _changeEntryState(event.path, _changeTypeFor(event), event.isDirectory); 166 files.add(event.path);
167 dirs.remove(event.path);
200 } 168 }
201 } 169 }
202 170
203 for (var path in changedEntries) { 171 _applyChanges(files, dirs, changed);
204 emitEvent(ChangeType type) { 172 }
205 if (isReady) _eventsController.add(new WatchEvent(type, path)); 173
174 /// Applies the net changes computed for a batch.
175 ///
176 /// The [files] and [dirs] sets contain the files and directories that now
177 /// exist, respectively. The [changed] set contains all files and directories
178 /// that have changed (including being removed), and so is a superset of
179 /// [files] and [dirs].
180 void _applyChanges(Set<String> files, Set<String> dirs, Set<String> changed) {
181 for (var path in changed) {
182 var stream = _subdirStreams.remove(path);
183 if (stream != null) _nativeEvents.add(stream);
184
185 // Unless [path] was a file and still is, emit REMOVE events for it or its
186 // contents,
187 if (files.contains(path) && _files.contains(path)) continue;
188 for (var file in _files.remove(path)) {
189 _emit(ChangeType.REMOVE, file);
206 } 190 }
191 }
207 192
208 var oldState = oldEntries[path]; 193 for (var file in files) {
209 var newState = _entries[path]; 194 if (_files.contains(file)) {
195 _emit(ChangeType.MODIFY, file);
196 } else {
197 _emit(ChangeType.ADD, file);
198 _files.add(file);
199 }
200 }
210 201
211 if (oldState != _EntryState.FILE && newState == _EntryState.FILE) { 202 for (var dir in dirs) {
212 emitEvent(ChangeType.ADD); 203 _watchSubdir(dir);
213 } else if (oldState == _EntryState.FILE && newState == _EntryState.FILE) { 204 _addSubdir(dir);
214 emitEvent(ChangeType.MODIFY);
215 } else if (oldState == _EntryState.FILE && newState != _EntryState.FILE) {
216 emitEvent(ChangeType.REMOVE);
217 }
218
219 if (oldState == _EntryState.DIRECTORY) {
220 var watcher = _subWatchers.remove(path);
221 if (watcher == null) continue;
222 for (var path in watcher._allFiles) {
223 _eventsController.add(new WatchEvent(ChangeType.REMOVE, path));
224 }
225 watcher.close();
226 }
227
228 if (newState == _EntryState.DIRECTORY) _watchSubdir(path);
229 } 205 }
230 } 206 }
231 207
232 /// Changes the known state of the entry at [path] based on [change] and 208 /// Emits [ChangeType.ADD] events for the recursive contents of [path].
233 /// [isDir]. 209 void _addSubdir(String path) {
234 void _changeEntryState(String path, ChangeType change, bool isDir) { 210 _listen(new Directory(path).list(recursive: true), (entity) {
235 if (change == ChangeType.ADD || change == ChangeType.MODIFY) { 211 if (entity is Directory) {
236 _entries[path] = new _EntryState(isDir); 212 _watchSubdir(entity.path);
237 } else { 213 } else {
238 assert(change == ChangeType.REMOVE); 214 _files.add(entity.path);
239 _entries.remove(path); 215 _emit(ChangeType.ADD, entity.path);
240 } 216 }
241 } 217 }, onError: (error, stackTrace) {
218 // Ignore an exception caused by the dir not existing. It's fine if it
219 // was added and then quickly removed.
220 if (error is FileSystemException) return;
242 221
243 /// Determines the [ChangeType] associated with [event]. 222 _eventsController.addError(error, stackTrace);
244 ChangeType _changeTypeFor(FileSystemEvent event) { 223 close();
245 if (event is FileSystemDeleteEvent) return ChangeType.REMOVE; 224 }, cancelOnError: true);
246 if (event is FileSystemCreateEvent) return ChangeType.ADD;
247
248 assert(event is FileSystemModifyEvent);
249 return ChangeType.MODIFY;
250 } 225 }
251 226
252 /// Handles the underlying event stream closing, indicating that the directory 227 /// Handles the underlying event stream closing, indicating that the directory
253 /// being watched was removed. 228 /// being watched was removed.
254 void _onDone() { 229 void _onDone() {
255 // Most of the time when a directory is removed, its contents will get 230 // Most of the time when a directory is removed, its contents will get
256 // individual REMOVE events before the watch stream is closed -- in that 231 // individual REMOVE events before the watch stream is closed -- in that
257 // case, [_entries] will be empty here. However, if the directory's removal 232 // case, [_files] will be empty here. However, if the directory's removal is
258 // is caused by a MOVE, we need to manually emit events. 233 // caused by a MOVE, we need to manually emit events.
259 if (isReady) { 234 if (isReady) {
260 _entries.forEach((path, state) { 235 for (var file in _files.paths) {
261 if (state == _EntryState.DIRECTORY) return; 236 _emit(ChangeType.REMOVE, file);
262 _eventsController.add(new WatchEvent(ChangeType.REMOVE, path)); 237 }
263 });
264 } 238 }
265 239
266 // The parent directory often gets a close event before the subdirectories 240 close();
267 // are done emitting events. We wait for them to finish before we close 241 }
268 // [events] so that we can be sure to emit a remove event for every file 242
269 // that used to exist. 243 /// Emits a [WatchEvent] with [type] and [path] if this watcher is in a state
270 Future.wait(_subWatchers.values.map((watcher) { 244 /// to emit events.
271 try { 245 void _emit(ChangeType type, String path) {
272 return watcher.events.toList(); 246 if (!isReady) return;
273 } on StateError catch (_) { 247 if (_eventsController.isClosed) return;
274 // It's possible that [watcher.events] is closed but the onDone event 248 _eventsController.add(new WatchEvent(type, path));
275 // hasn't reached us yet. It's fine if so.
276 return new Future.value();
277 }
278 })).then((_) => close());
279 } 249 }
280 250
281 /// Like [Stream.listen], but automatically adds the subscription to 251 /// Like [Stream.listen], but automatically adds the subscription to
282 /// [_subscriptions] so that it can be canceled when [close] is called. 252 /// [_subscriptions] so that it can be canceled when [close] is called.
283 void _listen(Stream stream, void onData(event), {Function onError, 253 void _listen(Stream stream, void onData(event), {Function onError,
284 void onDone(), bool cancelOnError}) { 254 void onDone(), bool cancelOnError}) {
285 var subscription; 255 var subscription;
286 subscription = stream.listen(onData, onError: onError, onDone: () { 256 subscription = stream.listen(onData, onError: onError, onDone: () {
287 _subscriptions.remove(subscription); 257 _subscriptions.remove(subscription);
288 if (onDone != null) onDone(); 258 if (onDone != null) onDone();
289 }, cancelOnError: cancelOnError); 259 }, cancelOnError: cancelOnError);
290 _subscriptions.add(subscription); 260 _subscriptions.add(subscription);
291 } 261 }
292 } 262 }
293
294 /// An enum for the possible states of entries in a watched directory.
295 class _EntryState {
296 final String _name;
297
298 /// The entry is a file.
299 static const FILE = const _EntryState._("file");
300
301 /// The entry is a directory.
302 static const DIRECTORY = const _EntryState._("directory");
303
304 const _EntryState._(this._name);
305
306 /// Returns [DIRECTORY] if [isDir] is true, and [FILE] otherwise.
307 factory _EntryState(bool isDir) =>
308 isDir ? _EntryState.DIRECTORY : _EntryState.FILE;
309
310 String toString() => _name;
311 }
OLDNEW
« no previous file with comments | « packages/watcher/lib/src/directory_watcher.dart ('k') | packages/watcher/lib/src/directory_watcher/mac_os.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698