OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library watcher.directory_watcher.linux; | |
6 | |
7 import 'dart:async'; | 5 import 'dart:async'; |
8 import 'dart:io'; | 6 import 'dart:io'; |
9 | 7 |
| 8 import 'package:async/async.dart'; |
| 9 |
10 import '../directory_watcher.dart'; | 10 import '../directory_watcher.dart'; |
| 11 import '../path_set.dart'; |
11 import '../resubscribable.dart'; | 12 import '../resubscribable.dart'; |
12 import '../utils.dart'; | 13 import '../utils.dart'; |
13 import '../watch_event.dart'; | 14 import '../watch_event.dart'; |
14 | 15 |
15 /// Uses the inotify subsystem to watch for filesystem events. | 16 /// Uses the inotify subsystem to watch for filesystem events. |
16 /// | 17 /// |
17 /// Inotify doesn't suport recursively watching subdirectories, nor does | 18 /// Inotify doesn't suport recursively watching subdirectories, nor does |
18 /// [Directory.watch] polyfill that functionality. This class polyfills it | 19 /// [Directory.watch] polyfill that functionality. This class polyfills it |
19 /// instead. | 20 /// instead. |
20 /// | 21 /// |
21 /// This class also compensates for the non-inotify-specific issues of | 22 /// This class also compensates for the non-inotify-specific issues of |
22 /// [Directory.watch] producing multiple events for a single logical action | 23 /// [Directory.watch] producing multiple events for a single logical action |
23 /// (issue 14372) and providing insufficient information about move events | 24 /// (issue 14372) and providing insufficient information about move events |
24 /// (issue 14424). | 25 /// (issue 14424). |
25 class LinuxDirectoryWatcher extends ResubscribableWatcher | 26 class LinuxDirectoryWatcher extends ResubscribableWatcher |
26 implements DirectoryWatcher { | 27 implements DirectoryWatcher { |
27 String get directory => path; | 28 String get directory => path; |
28 | 29 |
29 LinuxDirectoryWatcher(String directory) | 30 LinuxDirectoryWatcher(String directory) |
30 : super(directory, () => new _LinuxDirectoryWatcher(directory)); | 31 : super(directory, () => new _LinuxDirectoryWatcher(directory)); |
31 } | 32 } |
32 | 33 |
33 class _LinuxDirectoryWatcher | 34 class _LinuxDirectoryWatcher |
34 implements DirectoryWatcher, ManuallyClosedWatcher { | 35 implements DirectoryWatcher, ManuallyClosedWatcher { |
35 String get directory => path; | 36 String get directory => _files.root; |
36 final String path; | 37 String get path => _files.root; |
37 | 38 |
38 Stream<WatchEvent> get events => _eventsController.stream; | 39 Stream<WatchEvent> get events => _eventsController.stream; |
39 final _eventsController = new StreamController<WatchEvent>.broadcast(); | 40 final _eventsController = new StreamController<WatchEvent>.broadcast(); |
40 | 41 |
41 bool get isReady => _readyCompleter.isCompleted; | 42 bool get isReady => _readyCompleter.isCompleted; |
42 | 43 |
43 Future get ready => _readyCompleter.future; | 44 Future get ready => _readyCompleter.future; |
44 final _readyCompleter = new Completer(); | 45 final _readyCompleter = new Completer(); |
45 | 46 |
46 /// The last known state for each entry in this directory. | 47 /// A stream group for the [Directory.watch] events of [path] and all its |
| 48 /// subdirectories. |
| 49 var _nativeEvents = new StreamGroup<FileSystemEvent>(); |
| 50 |
| 51 /// All known files recursively within [path]. |
| 52 final PathSet _files; |
| 53 |
| 54 /// [Directory.watch] streams for [path]'s subdirectories, indexed by name. |
47 /// | 55 /// |
48 /// The keys in this map are the paths to the directory entries; the values | 56 /// A stream is in this map if and only if it's also in [_nativeEvents]. |
49 /// are [_EntryState]s indicating whether the entries are files or | 57 final _subdirStreams = <String, Stream<FileSystemEvent>>{}; |
50 /// directories. | |
51 final _entries = new Map<String, _EntryState>(); | |
52 | |
53 /// The watchers for subdirectories of [directory]. | |
54 final _subWatchers = new Map<String, _LinuxDirectoryWatcher>(); | |
55 | 58 |
56 /// A set of all subscriptions that this watcher subscribes to. | 59 /// A set of all subscriptions that this watcher subscribes to. |
57 /// | 60 /// |
58 /// These are gathered together so that they may all be canceled when the | 61 /// These are gathered together so that they may all be canceled when the |
59 /// watcher is closed. | 62 /// watcher is closed. |
60 final _subscriptions = new Set<StreamSubscription>(); | 63 final _subscriptions = new Set<StreamSubscription>(); |
61 | 64 |
62 _LinuxDirectoryWatcher(this.path) { | 65 _LinuxDirectoryWatcher(String path) |
| 66 : _files = new PathSet(path) { |
| 67 _nativeEvents.add(new Directory(path).watch().transform( |
| 68 new StreamTransformer.fromHandlers(handleDone: (sink) { |
| 69 // Handle the done event here rather than in the call to [_listen] because |
| 70 // [innerStream] won't close until we close the [StreamGroup]. However, if |
| 71 // we close the [StreamGroup] here, we run the risk of new-directory |
| 72 // events being fired after the group is closed, since batching delays |
| 73 // those events. See b/30768513. |
| 74 _onDone(); |
| 75 }))); |
| 76 |
63 // Batch the inotify changes together so that we can dedup events. | 77 // Batch the inotify changes together so that we can dedup events. |
64 var innerStream = new Directory(path).watch() | 78 var innerStream = _nativeEvents.stream |
65 .transform(new BatchedStreamTransformer<FileSystemEvent>()); | 79 .transform(new BatchedStreamTransformer<FileSystemEvent>()); |
66 _listen(innerStream, _onBatch, | 80 _listen(innerStream, _onBatch, onError: _eventsController.addError); |
67 onError: _eventsController.addError, | |
68 onDone: _onDone); | |
69 | 81 |
70 _listen(new Directory(path).list(), (entity) { | 82 _listen(new Directory(path).list(recursive: true), (entity) { |
71 _entries[entity.path] = new _EntryState(entity is Directory); | 83 if (entity is Directory) { |
72 if (entity is! Directory) return; | 84 _watchSubdir(entity.path); |
73 _watchSubdir(entity.path); | 85 } else { |
| 86 _files.add(entity.path); |
| 87 } |
74 }, onError: (error, stackTrace) { | 88 }, onError: (error, stackTrace) { |
75 _eventsController.addError(error, stackTrace); | 89 _eventsController.addError(error, stackTrace); |
76 close(); | 90 close(); |
77 }, onDone: () { | 91 }, onDone: () { |
78 _waitUntilReady().then((_) => _readyCompleter.complete()); | 92 _readyCompleter.complete(); |
79 }, cancelOnError: true); | 93 }, cancelOnError: true); |
80 } | 94 } |
81 | 95 |
82 /// Returns a [Future] that completes once all the subdirectory watchers are | |
83 /// fully initialized. | |
84 Future _waitUntilReady() { | |
85 return Future.wait(_subWatchers.values.map((watcher) => watcher.ready)) | |
86 .then((_) { | |
87 if (_subWatchers.values.every((watcher) => watcher.isReady)) return null; | |
88 return _waitUntilReady(); | |
89 }); | |
90 } | |
91 | |
92 void close() { | 96 void close() { |
93 for (var subscription in _subscriptions) { | 97 for (var subscription in _subscriptions) { |
94 subscription.cancel(); | 98 subscription.cancel(); |
95 } | 99 } |
96 for (var watcher in _subWatchers.values) { | |
97 watcher.close(); | |
98 } | |
99 | 100 |
100 _subWatchers.clear(); | |
101 _subscriptions.clear(); | 101 _subscriptions.clear(); |
| 102 _subdirStreams.clear(); |
| 103 _files.clear(); |
| 104 _nativeEvents.close(); |
102 _eventsController.close(); | 105 _eventsController.close(); |
103 } | 106 } |
104 | 107 |
105 /// Returns all files (not directories) that this watcher knows of are | |
106 /// recursively in the watched directory. | |
107 Set<String> get _allFiles { | |
108 var files = new Set<String>(); | |
109 _getAllFiles(files); | |
110 return files; | |
111 } | |
112 | |
113 /// Helper function for [_allFiles]. | |
114 /// | |
115 /// Adds all files that this watcher knows of to [files]. | |
116 void _getAllFiles(Set<String> files) { | |
117 files.addAll(_entries.keys | |
118 .where((path) => _entries[path] == _EntryState.FILE).toSet()); | |
119 for (var watcher in _subWatchers.values) { | |
120 watcher._getAllFiles(files); | |
121 } | |
122 } | |
123 | |
124 /// Watch a subdirectory of [directory] for changes. | 108 /// Watch a subdirectory of [directory] for changes. |
125 /// | |
126 /// If the subdirectory was added after [this] began emitting events, its | |
127 /// contents will be emitted as ADD events. | |
128 void _watchSubdir(String path) { | 109 void _watchSubdir(String path) { |
129 if (_subWatchers.containsKey(path)) return; | |
130 var watcher = new _LinuxDirectoryWatcher(path); | |
131 _subWatchers[path] = watcher; | |
132 | |
133 // TODO(nweiz): Catch any errors here that indicate that the directory in | |
134 // question doesn't exist and silently stop watching it instead of | |
135 // propagating the errors. | |
136 _listen(watcher.events, (event) { | |
137 if (isReady) _eventsController.add(event); | |
138 }, onError: (error, stackTrace) { | |
139 _eventsController.addError(error, stackTrace); | |
140 close(); | |
141 }, onDone: () { | |
142 if (_subWatchers[path] == watcher) _subWatchers.remove(path); | |
143 | |
144 // It's possible that a directory was removed and recreated very quickly. | |
145 // If so, make sure we're still watching it. | |
146 if (new Directory(path).existsSync()) _watchSubdir(path); | |
147 }); | |
148 | |
149 // TODO(nweiz): Right now it's possible for the watcher to emit an event for | 110 // TODO(nweiz): Right now it's possible for the watcher to emit an event for |
150 // a file before the directory list is complete. This could lead to the user | 111 // a file before the directory list is complete. This could lead to the user |
151 // seeing a MODIFY or REMOVE event for a file before they see an ADD event, | 112 // seeing a MODIFY or REMOVE event for a file before they see an ADD event, |
152 // which is bad. We should handle that. | 113 // which is bad. We should handle that. |
153 // | 114 // |
154 // One possibility is to provide a general means (e.g. | 115 // One possibility is to provide a general means (e.g. |
155 // `DirectoryWatcher.eventsAndExistingFiles`) to tell a watcher to emit | 116 // `DirectoryWatcher.eventsAndExistingFiles`) to tell a watcher to emit |
156 // events for all the files that already exist. This would be useful for | 117 // events for all the files that already exist. This would be useful for |
157 // top-level clients such as barback as well, and could be implemented with | 118 // top-level clients such as barback as well, and could be implemented with |
158 // a wrapper similar to how listening/canceling works now. | 119 // a wrapper similar to how listening/canceling works now. |
159 | 120 |
160 // If a directory is added after we're finished with the initial scan, emit | 121 // TODO(nweiz): Catch any errors here that indicate that the directory in |
161 // an event for each entry in it. This gives the user consistently gets an | 122 // question doesn't exist and silently stop watching it instead of |
162 // event for every new file. | 123 // propagating the errors. |
163 watcher.ready.then((_) { | 124 var stream = new Directory(path).watch(); |
164 if (!isReady || _eventsController.isClosed) return; | 125 _subdirStreams[path] = stream; |
165 _listen(new Directory(path).list(recursive: true), (entry) { | 126 _nativeEvents.add(stream); |
166 if (entry is Directory) return; | |
167 _eventsController.add(new WatchEvent(ChangeType.ADD, entry.path)); | |
168 }, onError: (error, stackTrace) { | |
169 // Ignore an exception caused by the dir not existing. It's fine if it | |
170 // was added and then quickly removed. | |
171 if (error is FileSystemException) return; | |
172 | |
173 _eventsController.addError(error, stackTrace); | |
174 close(); | |
175 }, cancelOnError: true); | |
176 }); | |
177 } | 127 } |
178 | 128 |
179 /// The callback that's run when a batch of changes comes in. | 129 /// The callback that's run when a batch of changes comes in. |
180 void _onBatch(List<FileSystemEvent> batch) { | 130 void _onBatch(List<FileSystemEvent> batch) { |
181 var changedEntries = new Set<String>(); | 131 var files = new Set<String>(); |
182 var oldEntries = new Map.from(_entries); | 132 var dirs = new Set<String>(); |
| 133 var changed = new Set<String>(); |
183 | 134 |
184 // inotify event batches are ordered by occurrence, so we treat them as a | 135 // inotify event batches are ordered by occurrence, so we treat them as a |
185 // log of what happened to a file. | 136 // log of what happened to a file. We only emit events based on the |
| 137 // difference between the state before the batch and the state after it, not |
| 138 // the intermediate state. |
186 for (var event in batch) { | 139 for (var event in batch) { |
187 // If the watched directory is deleted or moved, we'll get a deletion | 140 // If the watched directory is deleted or moved, we'll get a deletion |
188 // event for it. Ignore it; we handle closing [this] when the underlying | 141 // event for it. Ignore it; we handle closing [this] when the underlying |
189 // stream is closed. | 142 // stream is closed. |
190 if (event.path == path) continue; | 143 if (event.path == path) continue; |
191 | 144 |
192 changedEntries.add(event.path); | 145 changed.add(event.path); |
193 | 146 |
194 if (event is FileSystemMoveEvent) { | 147 if (event is FileSystemMoveEvent) { |
195 changedEntries.add(event.destination); | 148 files.remove(event.path); |
196 _changeEntryState(event.path, ChangeType.REMOVE, event.isDirectory); | 149 dirs.remove(event.path); |
197 _changeEntryState(event.destination, ChangeType.ADD, event.isDirectory); | 150 |
| 151 changed.add(event.destination); |
| 152 if (event.isDirectory) { |
| 153 files.remove(event.destination); |
| 154 dirs.add(event.destination); |
| 155 } else { |
| 156 files.add(event.destination); |
| 157 dirs.remove(event.destination); |
| 158 } |
| 159 } else if (event is FileSystemDeleteEvent) { |
| 160 files.remove(event.path); |
| 161 dirs.remove(event.path); |
| 162 } else if (event.isDirectory) { |
| 163 files.remove(event.path); |
| 164 dirs.add(event.path); |
198 } else { | 165 } else { |
199 _changeEntryState(event.path, _changeTypeFor(event), event.isDirectory); | 166 files.add(event.path); |
| 167 dirs.remove(event.path); |
200 } | 168 } |
201 } | 169 } |
202 | 170 |
203 for (var path in changedEntries) { | 171 _applyChanges(files, dirs, changed); |
204 emitEvent(ChangeType type) { | 172 } |
205 if (isReady) _eventsController.add(new WatchEvent(type, path)); | 173 |
| 174 /// Applies the net changes computed for a batch. |
| 175 /// |
| 176 /// The [files] and [dirs] sets contain the files and directories that now |
| 177 /// exist, respectively. The [changed] set contains all files and directories |
| 178 /// that have changed (including being removed), and so is a superset of |
| 179 /// [files] and [dirs]. |
| 180 void _applyChanges(Set<String> files, Set<String> dirs, Set<String> changed) { |
| 181 for (var path in changed) { |
| 182 var stream = _subdirStreams.remove(path); |
| 183 if (stream != null) _nativeEvents.add(stream); |
| 184 |
| 185 // Unless [path] was a file and still is, emit REMOVE events for it or its |
| 186 // contents, |
| 187 if (files.contains(path) && _files.contains(path)) continue; |
| 188 for (var file in _files.remove(path)) { |
| 189 _emit(ChangeType.REMOVE, file); |
206 } | 190 } |
| 191 } |
207 | 192 |
208 var oldState = oldEntries[path]; | 193 for (var file in files) { |
209 var newState = _entries[path]; | 194 if (_files.contains(file)) { |
| 195 _emit(ChangeType.MODIFY, file); |
| 196 } else { |
| 197 _emit(ChangeType.ADD, file); |
| 198 _files.add(file); |
| 199 } |
| 200 } |
210 | 201 |
211 if (oldState != _EntryState.FILE && newState == _EntryState.FILE) { | 202 for (var dir in dirs) { |
212 emitEvent(ChangeType.ADD); | 203 _watchSubdir(dir); |
213 } else if (oldState == _EntryState.FILE && newState == _EntryState.FILE) { | 204 _addSubdir(dir); |
214 emitEvent(ChangeType.MODIFY); | |
215 } else if (oldState == _EntryState.FILE && newState != _EntryState.FILE) { | |
216 emitEvent(ChangeType.REMOVE); | |
217 } | |
218 | |
219 if (oldState == _EntryState.DIRECTORY) { | |
220 var watcher = _subWatchers.remove(path); | |
221 if (watcher == null) continue; | |
222 for (var path in watcher._allFiles) { | |
223 _eventsController.add(new WatchEvent(ChangeType.REMOVE, path)); | |
224 } | |
225 watcher.close(); | |
226 } | |
227 | |
228 if (newState == _EntryState.DIRECTORY) _watchSubdir(path); | |
229 } | 205 } |
230 } | 206 } |
231 | 207 |
232 /// Changes the known state of the entry at [path] based on [change] and | 208 /// Emits [ChangeType.ADD] events for the recursive contents of [path]. |
233 /// [isDir]. | 209 void _addSubdir(String path) { |
234 void _changeEntryState(String path, ChangeType change, bool isDir) { | 210 _listen(new Directory(path).list(recursive: true), (entity) { |
235 if (change == ChangeType.ADD || change == ChangeType.MODIFY) { | 211 if (entity is Directory) { |
236 _entries[path] = new _EntryState(isDir); | 212 _watchSubdir(entity.path); |
237 } else { | 213 } else { |
238 assert(change == ChangeType.REMOVE); | 214 _files.add(entity.path); |
239 _entries.remove(path); | 215 _emit(ChangeType.ADD, entity.path); |
240 } | 216 } |
241 } | 217 }, onError: (error, stackTrace) { |
| 218 // Ignore an exception caused by the dir not existing. It's fine if it |
| 219 // was added and then quickly removed. |
| 220 if (error is FileSystemException) return; |
242 | 221 |
243 /// Determines the [ChangeType] associated with [event]. | 222 _eventsController.addError(error, stackTrace); |
244 ChangeType _changeTypeFor(FileSystemEvent event) { | 223 close(); |
245 if (event is FileSystemDeleteEvent) return ChangeType.REMOVE; | 224 }, cancelOnError: true); |
246 if (event is FileSystemCreateEvent) return ChangeType.ADD; | |
247 | |
248 assert(event is FileSystemModifyEvent); | |
249 return ChangeType.MODIFY; | |
250 } | 225 } |
251 | 226 |
252 /// Handles the underlying event stream closing, indicating that the directory | 227 /// Handles the underlying event stream closing, indicating that the directory |
253 /// being watched was removed. | 228 /// being watched was removed. |
254 void _onDone() { | 229 void _onDone() { |
255 // Most of the time when a directory is removed, its contents will get | 230 // Most of the time when a directory is removed, its contents will get |
256 // individual REMOVE events before the watch stream is closed -- in that | 231 // individual REMOVE events before the watch stream is closed -- in that |
257 // case, [_entries] will be empty here. However, if the directory's removal | 232 // case, [_files] will be empty here. However, if the directory's removal is |
258 // is caused by a MOVE, we need to manually emit events. | 233 // caused by a MOVE, we need to manually emit events. |
259 if (isReady) { | 234 if (isReady) { |
260 _entries.forEach((path, state) { | 235 for (var file in _files.paths) { |
261 if (state == _EntryState.DIRECTORY) return; | 236 _emit(ChangeType.REMOVE, file); |
262 _eventsController.add(new WatchEvent(ChangeType.REMOVE, path)); | 237 } |
263 }); | |
264 } | 238 } |
265 | 239 |
266 // The parent directory often gets a close event before the subdirectories | 240 close(); |
267 // are done emitting events. We wait for them to finish before we close | 241 } |
268 // [events] so that we can be sure to emit a remove event for every file | 242 |
269 // that used to exist. | 243 /// Emits a [WatchEvent] with [type] and [path] if this watcher is in a state |
270 Future.wait(_subWatchers.values.map((watcher) { | 244 /// to emit events. |
271 try { | 245 void _emit(ChangeType type, String path) { |
272 return watcher.events.toList(); | 246 if (!isReady) return; |
273 } on StateError catch (_) { | 247 if (_eventsController.isClosed) return; |
274 // It's possible that [watcher.events] is closed but the onDone event | 248 _eventsController.add(new WatchEvent(type, path)); |
275 // hasn't reached us yet. It's fine if so. | |
276 return new Future.value(); | |
277 } | |
278 })).then((_) => close()); | |
279 } | 249 } |
280 | 250 |
281 /// Like [Stream.listen], but automatically adds the subscription to | 251 /// Like [Stream.listen], but automatically adds the subscription to |
282 /// [_subscriptions] so that it can be canceled when [close] is called. | 252 /// [_subscriptions] so that it can be canceled when [close] is called. |
283 void _listen(Stream stream, void onData(event), {Function onError, | 253 void _listen(Stream stream, void onData(event), {Function onError, |
284 void onDone(), bool cancelOnError}) { | 254 void onDone(), bool cancelOnError}) { |
285 var subscription; | 255 var subscription; |
286 subscription = stream.listen(onData, onError: onError, onDone: () { | 256 subscription = stream.listen(onData, onError: onError, onDone: () { |
287 _subscriptions.remove(subscription); | 257 _subscriptions.remove(subscription); |
288 if (onDone != null) onDone(); | 258 if (onDone != null) onDone(); |
289 }, cancelOnError: cancelOnError); | 259 }, cancelOnError: cancelOnError); |
290 _subscriptions.add(subscription); | 260 _subscriptions.add(subscription); |
291 } | 261 } |
292 } | 262 } |
293 | |
294 /// An enum for the possible states of entries in a watched directory. | |
295 class _EntryState { | |
296 final String _name; | |
297 | |
298 /// The entry is a file. | |
299 static const FILE = const _EntryState._("file"); | |
300 | |
301 /// The entry is a directory. | |
302 static const DIRECTORY = const _EntryState._("directory"); | |
303 | |
304 const _EntryState._(this._name); | |
305 | |
306 /// Returns [DIRECTORY] if [isDir] is true, and [FILE] otherwise. | |
307 factory _EntryState(bool isDir) => | |
308 isDir ? _EntryState.DIRECTORY : _EntryState.FILE; | |
309 | |
310 String toString() => _name; | |
311 } | |
OLD | NEW |