OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library watcher.directory_watcher; | 5 library watcher.directory_watcher; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:io'; | 8 import 'dart:io'; |
9 | 9 |
10 import 'package:crypto/crypto.dart'; | |
11 | |
12 import 'async_queue.dart'; | |
13 import 'stat.dart'; | |
14 import 'utils.dart'; | |
15 import 'watch_event.dart'; | 10 import 'watch_event.dart'; |
| 11 import 'directory_watcher/linux.dart'; |
| 12 import 'directory_watcher/polling.dart'; |
16 | 13 |
17 /// Watches the contents of a directory and emits [WatchEvent]s when something | 14 /// Watches the contents of a directory and emits [WatchEvent]s when something |
18 /// in the directory has changed. | 15 /// in the directory has changed. |
19 class DirectoryWatcher { | 16 abstract class DirectoryWatcher { |
20 /// The directory whose contents are being monitored. | 17 /// The directory whose contents are being monitored. |
21 final String directory; | 18 String get directory; |
22 | 19 |
23 /// The broadcast [Stream] of events that have occurred to files in | 20 /// The broadcast [Stream] of events that have occurred to files in |
24 /// [directory]. | 21 /// [directory]. |
25 /// | 22 /// |
26 /// Changes will only be monitored while this stream has subscribers. Any | 23 /// Changes will only be monitored while this stream has subscribers. Any |
27 /// file changes that occur during periods when there are no subscribers | 24 /// file changes that occur during periods when there are no subscribers |
28 /// will not be reported the next time a subscriber is added. | 25 /// will not be reported the next time a subscriber is added. |
29 Stream<WatchEvent> get events => _events.stream; | 26 Stream<WatchEvent> get events; |
30 StreamController<WatchEvent> _events; | |
31 | 27 |
32 _WatchState _state = _WatchState.UNSUBSCRIBED; | 28 /// Whether the watcher is initialized and watching for file changes. |
| 29 /// |
| 30 /// This is true if and only if [ready] is complete. |
| 31 bool get isReady; |
33 | 32 |
34 /// A [Future] that completes when the watcher is initialized and watching | 33 /// A [Future] that completes when the watcher is initialized and watching |
35 /// for file changes. | 34 /// for file changes. |
36 /// | 35 /// |
37 /// If the watcher is not currently monitoring the directory (because there | 36 /// If the watcher is not currently monitoring the directory (because there |
38 /// are no subscribers to [events]), this returns a future that isn't | 37 /// are no subscribers to [events]), this returns a future that isn't |
39 /// complete yet. It will complete when a subscriber starts listening and | 38 /// complete yet. It will complete when a subscriber starts listening and |
40 /// the watcher finishes any initialization work it needs to do. | 39 /// the watcher finishes any initialization work it needs to do. |
41 /// | 40 /// |
42 /// If the watcher is already monitoring, this returns an already complete | 41 /// If the watcher is already monitoring, this returns an already complete |
43 /// future. | 42 /// future. |
44 Future get ready => _ready.future; | 43 Future get ready; |
45 Completer _ready = new Completer(); | |
46 | |
47 /// The amount of time the watcher pauses between successive polls of the | |
48 /// directory contents. | |
49 final Duration pollingDelay; | |
50 | |
51 /// The previous status of the files in the directory. | |
52 /// | |
53 /// Used to tell which files have been modified. | |
54 final _statuses = new Map<String, _FileStatus>(); | |
55 | |
56 /// The subscription used while [directory] is being listed. | |
57 /// | |
58 /// Will be `null` if a list is not currently happening. | |
59 StreamSubscription<FileSystemEntity> _listSubscription; | |
60 | |
61 /// The queue of files waiting to be processed to see if they have been | |
62 /// modified. | |
63 /// | |
64 /// Processing a file is asynchronous, as is listing the directory, so the | |
65 /// queue exists to let each of those proceed at their own rate. The lister | |
66 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued | |
67 /// and processed sequentially. | |
68 AsyncQueue<String> _filesToProcess; | |
69 | |
70 /// The set of files that have been seen in the current directory listing. | |
71 /// | |
72 /// Used to tell which files have been removed: files that are in [_statuses] | |
73 /// but not in here when a poll completes have been removed. | |
74 final _polledFiles = new Set<String>(); | |
75 | 44 |
76 /// Creates a new [DirectoryWatcher] monitoring [directory]. | 45 /// Creates a new [DirectoryWatcher] monitoring [directory]. |
77 /// | 46 /// |
78 /// If [pollingDelay] is passed, it specifies the amount of time the watcher | 47 /// If a native directory watcher is available for this platform, this will |
79 /// will pause between successive polls of the directory contents. Making | 48 /// use it. Otherwise, it will fall back to a [PollingDirectoryWatcher]. |
80 /// this shorter will give more immediate feedback at the expense of doing | 49 /// |
81 /// more IO and higher CPU usage. Defaults to one second. | 50 /// If [_pollingDelay] is passed, it specifies the amount of time the watcher |
82 DirectoryWatcher(this.directory, {Duration pollingDelay}) | 51 /// will pause between successive polls of the directory contents. Making this |
83 : pollingDelay = pollingDelay != null ? pollingDelay : | 52 /// shorter will give more immediate feedback at the expense of doing more IO |
84 new Duration(seconds: 1) { | 53 /// and higher CPU usage. Defaults to one second. Ignored for non-polling |
85 _events = new StreamController<WatchEvent>.broadcast( | 54 /// watchers. |
86 onListen: _watch, onCancel: _cancel); | 55 factory DirectoryWatcher(String directory, {Duration pollingDelay}) { |
87 | 56 if (Platform.isLinux) return new LinuxDirectoryWatcher(directory); |
88 _filesToProcess = new AsyncQueue<String>(_processFile, | 57 return new PollingDirectoryWatcher(directory, pollingDelay: pollingDelay); |
89 onError: _events.addError); | |
90 } | |
91 | |
92 /// Scans to see which files were already present before the watcher was | |
93 /// subscribed to, and then starts watching the directory for changes. | |
94 void _watch() { | |
95 assert(_state == _WatchState.UNSUBSCRIBED); | |
96 _state = _WatchState.SCANNING; | |
97 _poll(); | |
98 } | |
99 | |
100 /// Stops watching the directory when there are no more subscribers. | |
101 void _cancel() { | |
102 assert(_state != _WatchState.UNSUBSCRIBED); | |
103 _state = _WatchState.UNSUBSCRIBED; | |
104 | |
105 // If we're in the middle of listing the directory, stop. | |
106 if (_listSubscription != null) _listSubscription.cancel(); | |
107 | |
108 // Don't process any remaining files. | |
109 _filesToProcess.clear(); | |
110 _polledFiles.clear(); | |
111 _statuses.clear(); | |
112 | |
113 _ready = new Completer(); | |
114 } | |
115 | |
116 /// Scans the contents of the directory once to see which files have been | |
117 /// added, removed, and modified. | |
118 void _poll() { | |
119 _filesToProcess.clear(); | |
120 _polledFiles.clear(); | |
121 | |
122 endListing() { | |
123 assert(_state != _WatchState.UNSUBSCRIBED); | |
124 _listSubscription = null; | |
125 | |
126 // Null tells the queue consumer that we're done listing. | |
127 _filesToProcess.add(null); | |
128 } | |
129 | |
130 var stream = new Directory(directory).list(recursive: true); | |
131 _listSubscription = stream.listen((entity) { | |
132 assert(_state != _WatchState.UNSUBSCRIBED); | |
133 | |
134 if (entity is! File) return; | |
135 _filesToProcess.add(entity.path); | |
136 }, onError: (error, StackTrace stackTrace) { | |
137 if (!isDirectoryNotFoundException(error)) { | |
138 // It's some unknown error. Pipe it over to the event stream so the | |
139 // user can see it. | |
140 _events.addError(error, stackTrace); | |
141 } | |
142 | |
143 // When an error occurs, we end the listing normally, which has the | |
144 // desired effect of marking all files that were in the directory as | |
145 // being removed. | |
146 endListing(); | |
147 }, onDone: endListing, cancelOnError: true); | |
148 } | |
149 | |
150 /// Processes [file] to determine if it has been modified since the last | |
151 /// time it was scanned. | |
152 Future _processFile(String file) { | |
153 assert(_state != _WatchState.UNSUBSCRIBED); | |
154 | |
155 // `null` is the sentinel which means the directory listing is complete. | |
156 if (file == null) return _completePoll(); | |
157 | |
158 return getModificationTime(file).then((modified) { | |
159 if (_checkForCancel()) return null; | |
160 | |
161 var lastStatus = _statuses[file]; | |
162 | |
163 // If its modification time hasn't changed, assume the file is unchanged. | |
164 if (lastStatus != null && lastStatus.modified == modified) { | |
165 // The file is still here. | |
166 _polledFiles.add(file); | |
167 return null; | |
168 } | |
169 | |
170 return _hashFile(file).then((hash) { | |
171 if (_checkForCancel()) return; | |
172 | |
173 var status = new _FileStatus(modified, hash); | |
174 _statuses[file] = status; | |
175 _polledFiles.add(file); | |
176 | |
177 // Only notify while in the watching state. | |
178 if (_state != _WatchState.WATCHING) return; | |
179 | |
180 // And the file is different. | |
181 var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); | |
182 if (!changed) return; | |
183 | |
184 var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; | |
185 _events.add(new WatchEvent(type, file)); | |
186 }); | |
187 }); | |
188 } | |
189 | |
190 /// After the directory listing is complete, this determines which files were | |
191 /// removed and then restarts the next poll. | |
192 Future _completePoll() { | |
193 // Any files that were not seen in the last poll but that we have a | |
194 // status for must have been removed. | |
195 var removedFiles = _statuses.keys.toSet().difference(_polledFiles); | |
196 for (var removed in removedFiles) { | |
197 if (_state == _WatchState.WATCHING) { | |
198 _events.add(new WatchEvent(ChangeType.REMOVE, removed)); | |
199 } | |
200 _statuses.remove(removed); | |
201 } | |
202 | |
203 if (_state == _WatchState.SCANNING) { | |
204 _state = _WatchState.WATCHING; | |
205 _ready.complete(); | |
206 } | |
207 | |
208 // Wait and then poll again. | |
209 return new Future.delayed(pollingDelay).then((_) { | |
210 if (_checkForCancel()) return; | |
211 _poll(); | |
212 }); | |
213 } | |
214 | |
215 /// Returns `true` and clears the processing queue if the watcher has been | |
216 /// unsubscribed. | |
217 bool _checkForCancel() { | |
218 if (_state != _WatchState.UNSUBSCRIBED) return false; | |
219 | |
220 // Don't process any more files. | |
221 _filesToProcess.clear(); | |
222 return true; | |
223 } | |
224 | |
225 /// Calculates the SHA-1 hash of the file at [path]. | |
226 Future<List<int>> _hashFile(String path) { | |
227 return new File(path).readAsBytes().then((bytes) { | |
228 var sha1 = new SHA1(); | |
229 sha1.add(bytes); | |
230 return sha1.close(); | |
231 }); | |
232 } | |
233 | |
234 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same | |
235 /// series of byte values. | |
236 bool _sameHash(List<int> a, List<int> b) { | |
237 // Hashes should always be the same size. | |
238 assert(a.length == b.length); | |
239 | |
240 for (var i = 0; i < a.length; i++) { | |
241 if (a[i] != b[i]) return false; | |
242 } | |
243 | |
244 return true; | |
245 } | 58 } |
246 } | 59 } |
247 | |
248 /// Enum class for the states that the [DirectoryWatcher] can be in. | |
249 class _WatchState { | |
250 /// There are no subscribers to the watcher's event stream and no watching | |
251 /// is going on. | |
252 static const UNSUBSCRIBED = const _WatchState("unsubscribed"); | |
253 | |
254 /// There are subscribers and the watcher is doing an initial scan of the | |
255 /// directory to see which files were already present before watching started. | |
256 /// | |
257 /// The watcher does not send notifications for changes that occurred while | |
258 /// there were no subscribers, or for files already present before watching. | |
259 /// The initial scan is used to determine what "before watching" state of | |
260 /// the file system was. | |
261 static const SCANNING = const _WatchState("scanning"); | |
262 | |
263 /// There are subscribers and the watcher is polling the directory to look | |
264 /// for changes. | |
265 static const WATCHING = const _WatchState("watching"); | |
266 | |
267 /// The name of the state. | |
268 final String name; | |
269 | |
270 const _WatchState(this.name); | |
271 } | |
272 | |
273 class _FileStatus { | |
274 /// The last time the file was modified. | |
275 DateTime modified; | |
276 | |
277 /// The SHA-1 hash of the contents of the file. | |
278 List<int> hash; | |
279 | |
280 _FileStatus(this.modified, this.hash); | |
281 } | |
OLD | NEW |