OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library watcher.directory_watcher; | 5 library watcher.directory_watcher.polling; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:io'; | 8 import 'dart:io'; |
9 | 9 |
10 import 'package:crypto/crypto.dart'; | 10 import 'package:crypto/crypto.dart'; |
11 | 11 |
12 import 'async_queue.dart'; | 12 import '../async_queue.dart'; |
13 import 'stat.dart'; | 13 import '../directory_watcher.dart'; |
14 import 'utils.dart'; | 14 import '../stat.dart'; |
15 import 'watch_event.dart'; | 15 import '../utils.dart'; |
| 16 import '../watch_event.dart'; |
| 17 import 'resubscribable.dart'; |
16 | 18 |
17 /// Watches the contents of a directory and emits [WatchEvent]s when something | 19 /// Periodically polls a directory for changes. |
18 /// in the directory has changed. | 20 class PollingDirectoryWatcher extends ResubscribableDirectoryWatcher { |
19 class DirectoryWatcher { | 21 /// Creates a new polling watcher monitoring [directory]. |
20 /// The directory whose contents are being monitored. | 22 /// |
| 23 /// If [_pollingDelay] is passed, it specifies the amount of time the watcher |
| 24 /// will pause between successive polls of the directory contents. Making this |
| 25 /// shorter will give more immediate feedback at the expense of doing more IO |
| 26 /// and higher CPU usage. Defaults to one second. |
| 27 PollingDirectoryWatcher(String directory, {Duration pollingDelay}) |
| 28 : super(directory, () { |
| 29 return new _PollingDirectoryWatcher(directory, |
| 30 pollingDelay != null ? pollingDelay : new Duration(seconds: 1)); |
| 31 }); |
| 32 } |
| 33 |
| 34 class _PollingDirectoryWatcher implements ManuallyClosedDirectoryWatcher { |
21 final String directory; | 35 final String directory; |
22 | 36 |
23 /// The broadcast [Stream] of events that have occurred to files in | |
24 /// [directory]. | |
25 /// | |
26 /// Changes will only be monitored while this stream has subscribers. Any | |
27 /// file changes that occur during periods when there are no subscribers | |
28 /// will not be reported the next time a subscriber is added. | |
29 Stream<WatchEvent> get events => _events.stream; | 37 Stream<WatchEvent> get events => _events.stream; |
30 StreamController<WatchEvent> _events; | 38 final _events = new StreamController<WatchEvent>.broadcast(); |
31 | 39 |
32 _WatchState _state = _WatchState.UNSUBSCRIBED; | 40 bool get isReady => _ready.isCompleted; |
33 | 41 |
34 /// A [Future] that completes when the watcher is initialized and watching | |
35 /// for file changes. | |
36 /// | |
37 /// If the watcher is not currently monitoring the directory (because there | |
38 /// are no subscribers to [events]), this returns a future that isn't | |
39 /// complete yet. It will complete when a subscriber starts listening and | |
40 /// the watcher finishes any initialization work it needs to do. | |
41 /// | |
42 /// If the watcher is already monitoring, this returns an already complete | |
43 /// future. | |
44 Future get ready => _ready.future; | 42 Future get ready => _ready.future; |
45 Completer _ready = new Completer(); | 43 final _ready = new Completer(); |
46 | 44 |
47 /// The amount of time the watcher pauses between successive polls of the | 45 /// The amount of time the watcher pauses between successive polls of the |
48 /// directory contents. | 46 /// directory contents. |
49 final Duration pollingDelay; | 47 final Duration _pollingDelay; |
50 | 48 |
51 /// The previous status of the files in the directory. | 49 /// The previous status of the files in the directory. |
52 /// | 50 /// |
53 /// Used to tell which files have been modified. | 51 /// Used to tell which files have been modified. |
54 final _statuses = new Map<String, _FileStatus>(); | 52 final _statuses = new Map<String, _FileStatus>(); |
55 | 53 |
56 /// The subscription used while [directory] is being listed. | 54 /// The subscription used while [directory] is being listed. |
57 /// | 55 /// |
58 /// Will be `null` if a list is not currently happening. | 56 /// Will be `null` if a list is not currently happening. |
59 StreamSubscription<FileSystemEntity> _listSubscription; | 57 StreamSubscription<FileSystemEntity> _listSubscription; |
60 | 58 |
61 /// The queue of files waiting to be processed to see if they have been | 59 /// The queue of files waiting to be processed to see if they have been |
62 /// modified. | 60 /// modified. |
63 /// | 61 /// |
64 /// Processing a file is asynchronous, as is listing the directory, so the | 62 /// Processing a file is asynchronous, as is listing the directory, so the |
65 /// queue exists to let each of those proceed at their own rate. The lister | 63 /// queue exists to let each of those proceed at their own rate. The lister |
66 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued | 64 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued |
67 /// and processed sequentially. | 65 /// and processed sequentially. |
68 AsyncQueue<String> _filesToProcess; | 66 AsyncQueue<String> _filesToProcess; |
69 | 67 |
70 /// The set of files that have been seen in the current directory listing. | 68 /// The set of files that have been seen in the current directory listing. |
71 /// | 69 /// |
72 /// Used to tell which files have been removed: files that are in [_statuses] | 70 /// Used to tell which files have been removed: files that are in [_statuses] |
73 /// but not in here when a poll completes have been removed. | 71 /// but not in here when a poll completes have been removed. |
74 final _polledFiles = new Set<String>(); | 72 final _polledFiles = new Set<String>(); |
75 | 73 |
76 /// Creates a new [DirectoryWatcher] monitoring [directory]. | 74 _PollingDirectoryWatcher(this.directory, this._pollingDelay) { |
77 /// | |
78 /// If [pollingDelay] is passed, it specifies the amount of time the watcher | |
79 /// will pause between successive polls of the directory contents. Making | |
80 /// this shorter will give more immediate feedback at the expense of doing | |
81 /// more IO and higher CPU usage. Defaults to one second. | |
82 DirectoryWatcher(this.directory, {Duration pollingDelay}) | |
83 : pollingDelay = pollingDelay != null ? pollingDelay : | |
84 new Duration(seconds: 1) { | |
85 _events = new StreamController<WatchEvent>.broadcast( | |
86 onListen: _watch, onCancel: _cancel); | |
87 | |
88 _filesToProcess = new AsyncQueue<String>(_processFile, | 75 _filesToProcess = new AsyncQueue<String>(_processFile, |
89 onError: _events.addError); | 76 onError: _events.addError); |
90 } | |
91 | 77 |
92 /// Scans to see which files were already present before the watcher was | |
93 /// subscribed to, and then starts watching the directory for changes. | |
94 void _watch() { | |
95 assert(_state == _WatchState.UNSUBSCRIBED); | |
96 _state = _WatchState.SCANNING; | |
97 _poll(); | 78 _poll(); |
98 } | 79 } |
99 | 80 |
100 /// Stops watching the directory when there are no more subscribers. | 81 void close() { |
101 void _cancel() { | 82 _events.close(); |
102 assert(_state != _WatchState.UNSUBSCRIBED); | |
103 _state = _WatchState.UNSUBSCRIBED; | |
104 | 83 |
105 // If we're in the middle of listing the directory, stop. | 84 // If we're in the middle of listing the directory, stop. |
106 if (_listSubscription != null) _listSubscription.cancel(); | 85 if (_listSubscription != null) _listSubscription.cancel(); |
107 | 86 |
108 // Don't process any remaining files. | 87 // Don't process any remaining files. |
109 _filesToProcess.clear(); | 88 _filesToProcess.clear(); |
110 _polledFiles.clear(); | 89 _polledFiles.clear(); |
111 _statuses.clear(); | 90 _statuses.clear(); |
112 | |
113 _ready = new Completer(); | |
114 } | 91 } |
115 | 92 |
116 /// Scans the contents of the directory once to see which files have been | 93 /// Scans the contents of the directory once to see which files have been |
117 /// added, removed, and modified. | 94 /// added, removed, and modified. |
118 void _poll() { | 95 void _poll() { |
119 _filesToProcess.clear(); | 96 _filesToProcess.clear(); |
120 _polledFiles.clear(); | 97 _polledFiles.clear(); |
121 | 98 |
122 endListing() { | 99 endListing() { |
123 assert(_state != _WatchState.UNSUBSCRIBED); | 100 assert(!_events.isClosed); |
124 _listSubscription = null; | 101 _listSubscription = null; |
125 | 102 |
126 // Null tells the queue consumer that we're done listing. | 103 // Null tells the queue consumer that we're done listing. |
127 _filesToProcess.add(null); | 104 _filesToProcess.add(null); |
128 } | 105 } |
129 | 106 |
130 var stream = new Directory(directory).list(recursive: true); | 107 var stream = new Directory(directory).list(recursive: true); |
131 _listSubscription = stream.listen((entity) { | 108 _listSubscription = stream.listen((entity) { |
132 assert(_state != _WatchState.UNSUBSCRIBED); | 109 assert(!_events.isClosed); |
133 | 110 |
134 if (entity is! File) return; | 111 if (entity is! File) return; |
135 _filesToProcess.add(entity.path); | 112 _filesToProcess.add(entity.path); |
136 }, onError: (error, StackTrace stackTrace) { | 113 }, onError: (error, stackTrace) { |
137 if (!isDirectoryNotFoundException(error)) { | 114 if (!isDirectoryNotFoundException(error)) { |
138 // It's some unknown error. Pipe it over to the event stream so the | 115 // It's some unknown error. Pipe it over to the event stream so the |
139 // user can see it. | 116 // user can see it. |
140 _events.addError(error, stackTrace); | 117 _events.addError(error, stackTrace); |
141 } | 118 } |
142 | 119 |
143 // When an error occurs, we end the listing normally, which has the | 120 // When an error occurs, we end the listing normally, which has the |
144 // desired effect of marking all files that were in the directory as | 121 // desired effect of marking all files that were in the directory as |
145 // being removed. | 122 // being removed. |
146 endListing(); | 123 endListing(); |
147 }, onDone: endListing, cancelOnError: true); | 124 }, onDone: endListing, cancelOnError: true); |
148 } | 125 } |
149 | 126 |
150 /// Processes [file] to determine if it has been modified since the last | 127 /// Processes [file] to determine if it has been modified since the last |
151 /// time it was scanned. | 128 /// time it was scanned. |
152 Future _processFile(String file) { | 129 Future _processFile(String file) { |
153 assert(_state != _WatchState.UNSUBSCRIBED); | |
154 | |
155 // `null` is the sentinel which means the directory listing is complete. | 130 // `null` is the sentinel which means the directory listing is complete. |
156 if (file == null) return _completePoll(); | 131 if (file == null) return _completePoll(); |
157 | 132 |
158 return getModificationTime(file).then((modified) { | 133 return getModificationTime(file).then((modified) { |
159 if (_checkForCancel()) return null; | 134 if (_events.isClosed) return null; |
160 | 135 |
161 var lastStatus = _statuses[file]; | 136 var lastStatus = _statuses[file]; |
162 | 137 |
163 // If its modification time hasn't changed, assume the file is unchanged. | 138 // If its modification time hasn't changed, assume the file is unchanged. |
164 if (lastStatus != null && lastStatus.modified == modified) { | 139 if (lastStatus != null && lastStatus.modified == modified) { |
165 // The file is still here. | 140 // The file is still here. |
166 _polledFiles.add(file); | 141 _polledFiles.add(file); |
167 return null; | 142 return null; |
168 } | 143 } |
169 | 144 |
170 return _hashFile(file).then((hash) { | 145 return _hashFile(file).then((hash) { |
171 if (_checkForCancel()) return; | 146 if (_events.isClosed) return; |
172 | 147 |
173 var status = new _FileStatus(modified, hash); | 148 var status = new _FileStatus(modified, hash); |
174 _statuses[file] = status; | 149 _statuses[file] = status; |
175 _polledFiles.add(file); | 150 _polledFiles.add(file); |
176 | 151 |
177 // Only notify while in the watching state. | 152 // Only notify if we're ready to emit events. |
178 if (_state != _WatchState.WATCHING) return; | 153 if (!isReady) return; |
179 | 154 |
180 // And the file is different. | 155 // And the file is different. |
181 var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); | 156 var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); |
182 if (!changed) return; | 157 if (!changed) return; |
183 | 158 |
184 var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; | 159 var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; |
185 _events.add(new WatchEvent(type, file)); | 160 _events.add(new WatchEvent(type, file)); |
186 }); | 161 }); |
187 }); | 162 }); |
188 } | 163 } |
189 | 164 |
190 /// After the directory listing is complete, this determines which files were | 165 /// After the directory listing is complete, this determines which files were |
191 /// removed and then restarts the next poll. | 166 /// removed and then restarts the next poll. |
192 Future _completePoll() { | 167 Future _completePoll() { |
193 // Any files that were not seen in the last poll but that we have a | 168 // Any files that were not seen in the last poll but that we have a |
194 // status for must have been removed. | 169 // status for must have been removed. |
195 var removedFiles = _statuses.keys.toSet().difference(_polledFiles); | 170 var removedFiles = _statuses.keys.toSet().difference(_polledFiles); |
196 for (var removed in removedFiles) { | 171 for (var removed in removedFiles) { |
197 if (_state == _WatchState.WATCHING) { | 172 if (isReady) _events.add(new WatchEvent(ChangeType.REMOVE, removed)); |
198 _events.add(new WatchEvent(ChangeType.REMOVE, removed)); | |
199 } | |
200 _statuses.remove(removed); | 173 _statuses.remove(removed); |
201 } | 174 } |
202 | 175 |
203 if (_state == _WatchState.SCANNING) { | 176 if (!isReady) _ready.complete(); |
204 _state = _WatchState.WATCHING; | |
205 _ready.complete(); | |
206 } | |
207 | 177 |
208 // Wait and then poll again. | 178 // Wait and then poll again. |
209 return new Future.delayed(pollingDelay).then((_) { | 179 return new Future.delayed(_pollingDelay).then((_) { |
210 if (_checkForCancel()) return; | 180 if (_events.isClosed) return; |
211 _poll(); | 181 _poll(); |
212 }); | 182 }); |
213 } | 183 } |
214 | 184 |
215 /// Returns `true` and clears the processing queue if the watcher has been | |
216 /// unsubscribed. | |
217 bool _checkForCancel() { | |
218 if (_state != _WatchState.UNSUBSCRIBED) return false; | |
219 | |
220 // Don't process any more files. | |
221 _filesToProcess.clear(); | |
222 return true; | |
223 } | |
224 | |
225 /// Calculates the SHA-1 hash of the file at [path]. | 185 /// Calculates the SHA-1 hash of the file at [path]. |
226 Future<List<int>> _hashFile(String path) { | 186 Future<List<int>> _hashFile(String path) { |
227 return new File(path).readAsBytes().then((bytes) { | 187 return new File(path).readAsBytes().then((bytes) { |
228 var sha1 = new SHA1(); | 188 var sha1 = new SHA1(); |
229 sha1.add(bytes); | 189 sha1.add(bytes); |
230 return sha1.close(); | 190 return sha1.close(); |
231 }); | 191 }); |
232 } | 192 } |
233 | 193 |
234 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same | 194 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same |
235 /// series of byte values. | 195 /// series of byte values. |
236 bool _sameHash(List<int> a, List<int> b) { | 196 bool _sameHash(List<int> a, List<int> b) { |
237 // Hashes should always be the same size. | 197 // Hashes should always be the same size. |
238 assert(a.length == b.length); | 198 assert(a.length == b.length); |
239 | 199 |
240 for (var i = 0; i < a.length; i++) { | 200 for (var i = 0; i < a.length; i++) { |
241 if (a[i] != b[i]) return false; | 201 if (a[i] != b[i]) return false; |
242 } | 202 } |
243 | 203 |
244 return true; | 204 return true; |
245 } | 205 } |
246 } | 206 } |
247 | 207 |
248 /// Enum class for the states that the [DirectoryWatcher] can be in. | |
249 class _WatchState { | |
250 /// There are no subscribers to the watcher's event stream and no watching | |
251 /// is going on. | |
252 static const UNSUBSCRIBED = const _WatchState("unsubscribed"); | |
253 | |
254 /// There are subscribers and the watcher is doing an initial scan of the | |
255 /// directory to see which files were already present before watching started. | |
256 /// | |
257 /// The watcher does not send notifications for changes that occurred while | |
258 /// there were no subscribers, or for files already present before watching. | |
259 /// The initial scan is used to determine what "before watching" state of | |
260 /// the file system was. | |
261 static const SCANNING = const _WatchState("scanning"); | |
262 | |
263 /// There are subscribers and the watcher is polling the directory to look | |
264 /// for changes. | |
265 static const WATCHING = const _WatchState("watching"); | |
266 | |
267 /// The name of the state. | |
268 final String name; | |
269 | |
270 const _WatchState(this.name); | |
271 } | |
272 | |
273 class _FileStatus { | 208 class _FileStatus { |
274 /// The last time the file was modified. | 209 /// The last time the file was modified. |
275 DateTime modified; | 210 DateTime modified; |
276 | 211 |
277 /// The SHA-1 hash of the contents of the file. | 212 /// The SHA-1 hash of the contents of the file. |
278 List<int> hash; | 213 List<int> hash; |
279 | 214 |
280 _FileStatus(this.modified, this.hash); | 215 _FileStatus(this.modified, this.hash); |
281 } | 216 } |
| 217 |
OLD | NEW |