| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library watcher.directory_watcher; | 5 library watcher.directory_watcher.polling; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:io'; | 8 import 'dart:io'; |
| 9 | 9 |
| 10 import 'package:crypto/crypto.dart'; | 10 import 'package:crypto/crypto.dart'; |
| 11 | 11 |
| 12 import 'async_queue.dart'; | 12 import '../async_queue.dart'; |
| 13 import 'stat.dart'; | 13 import '../directory_watcher.dart'; |
| 14 import 'utils.dart'; | 14 import '../stat.dart'; |
| 15 import 'watch_event.dart'; | 15 import '../utils.dart'; |
| 16 import '../watch_event.dart'; |
| 17 import 'resubscribable.dart'; |
| 16 | 18 |
| 17 /// Watches the contents of a directory and emits [WatchEvent]s when something | 19 /// Periodically polls a directory for changes. |
| 18 /// in the directory has changed. | 20 class PollingDirectoryWatcher extends ResubscribableDirectoryWatcher { |
| 19 class DirectoryWatcher { | 21 /// Creates a new polling watcher monitoring [directory]. |
| 20 /// The directory whose contents are being monitored. | 22 /// |
| 23 /// If [_pollingDelay] is passed, it specifies the amount of time the watcher |
| 24 /// will pause between successive polls of the directory contents. Making this |
| 25 /// shorter will give more immediate feedback at the expense of doing more IO |
| 26 /// and higher CPU usage. Defaults to one second. |
| 27 PollingDirectoryWatcher(String directory, {Duration pollingDelay}) |
| 28 : super(directory, () { |
| 29 return new _PollingDirectoryWatcher(directory, |
| 30 pollingDelay != null ? pollingDelay : new Duration(seconds: 1)); |
| 31 }); |
| 32 } |
| 33 |
| 34 class _PollingDirectoryWatcher implements ManuallyClosedDirectoryWatcher { |
| 21 final String directory; | 35 final String directory; |
| 22 | 36 |
| 23 /// The broadcast [Stream] of events that have occurred to files in | |
| 24 /// [directory]. | |
| 25 /// | |
| 26 /// Changes will only be monitored while this stream has subscribers. Any | |
| 27 /// file changes that occur during periods when there are no subscribers | |
| 28 /// will not be reported the next time a subscriber is added. | |
| 29 Stream<WatchEvent> get events => _events.stream; | 37 Stream<WatchEvent> get events => _events.stream; |
| 30 StreamController<WatchEvent> _events; | 38 final _events = new StreamController<WatchEvent>.broadcast(); |
| 31 | 39 |
| 32 _WatchState _state = _WatchState.UNSUBSCRIBED; | 40 bool get isReady => _ready.isCompleted; |
| 33 | 41 |
| 34 /// A [Future] that completes when the watcher is initialized and watching | |
| 35 /// for file changes. | |
| 36 /// | |
| 37 /// If the watcher is not currently monitoring the directory (because there | |
| 38 /// are no subscribers to [events]), this returns a future that isn't | |
| 39 /// complete yet. It will complete when a subscriber starts listening and | |
| 40 /// the watcher finishes any initialization work it needs to do. | |
| 41 /// | |
| 42 /// If the watcher is already monitoring, this returns an already complete | |
| 43 /// future. | |
| 44 Future get ready => _ready.future; | 42 Future get ready => _ready.future; |
| 45 Completer _ready = new Completer(); | 43 final _ready = new Completer(); |
| 46 | 44 |
| 47 /// The amount of time the watcher pauses between successive polls of the | 45 /// The amount of time the watcher pauses between successive polls of the |
| 48 /// directory contents. | 46 /// directory contents. |
| 49 final Duration pollingDelay; | 47 final Duration _pollingDelay; |
| 50 | 48 |
| 51 /// The previous status of the files in the directory. | 49 /// The previous status of the files in the directory. |
| 52 /// | 50 /// |
| 53 /// Used to tell which files have been modified. | 51 /// Used to tell which files have been modified. |
| 54 final _statuses = new Map<String, _FileStatus>(); | 52 final _statuses = new Map<String, _FileStatus>(); |
| 55 | 53 |
| 56 /// The subscription used while [directory] is being listed. | 54 /// The subscription used while [directory] is being listed. |
| 57 /// | 55 /// |
| 58 /// Will be `null` if a list is not currently happening. | 56 /// Will be `null` if a list is not currently happening. |
| 59 StreamSubscription<FileSystemEntity> _listSubscription; | 57 StreamSubscription<FileSystemEntity> _listSubscription; |
| 60 | 58 |
| 61 /// The queue of files waiting to be processed to see if they have been | 59 /// The queue of files waiting to be processed to see if they have been |
| 62 /// modified. | 60 /// modified. |
| 63 /// | 61 /// |
| 64 /// Processing a file is asynchronous, as is listing the directory, so the | 62 /// Processing a file is asynchronous, as is listing the directory, so the |
| 65 /// queue exists to let each of those proceed at their own rate. The lister | 63 /// queue exists to let each of those proceed at their own rate. The lister |
| 66 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued | 64 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued |
| 67 /// and processed sequentially. | 65 /// and processed sequentially. |
| 68 AsyncQueue<String> _filesToProcess; | 66 AsyncQueue<String> _filesToProcess; |
| 69 | 67 |
| 70 /// The set of files that have been seen in the current directory listing. | 68 /// The set of files that have been seen in the current directory listing. |
| 71 /// | 69 /// |
| 72 /// Used to tell which files have been removed: files that are in [_statuses] | 70 /// Used to tell which files have been removed: files that are in [_statuses] |
| 73 /// but not in here when a poll completes have been removed. | 71 /// but not in here when a poll completes have been removed. |
| 74 final _polledFiles = new Set<String>(); | 72 final _polledFiles = new Set<String>(); |
| 75 | 73 |
| 76 /// Creates a new [DirectoryWatcher] monitoring [directory]. | 74 _PollingDirectoryWatcher(this.directory, this._pollingDelay) { |
| 77 /// | |
| 78 /// If [pollingDelay] is passed, it specifies the amount of time the watcher | |
| 79 /// will pause between successive polls of the directory contents. Making | |
| 80 /// this shorter will give more immediate feedback at the expense of doing | |
| 81 /// more IO and higher CPU usage. Defaults to one second. | |
| 82 DirectoryWatcher(this.directory, {Duration pollingDelay}) | |
| 83 : pollingDelay = pollingDelay != null ? pollingDelay : | |
| 84 new Duration(seconds: 1) { | |
| 85 _events = new StreamController<WatchEvent>.broadcast( | |
| 86 onListen: _watch, onCancel: _cancel); | |
| 87 | |
| 88 _filesToProcess = new AsyncQueue<String>(_processFile, | 75 _filesToProcess = new AsyncQueue<String>(_processFile, |
| 89 onError: _events.addError); | 76 onError: _events.addError); |
| 90 } | |
| 91 | 77 |
| 92 /// Scans to see which files were already present before the watcher was | |
| 93 /// subscribed to, and then starts watching the directory for changes. | |
| 94 void _watch() { | |
| 95 assert(_state == _WatchState.UNSUBSCRIBED); | |
| 96 _state = _WatchState.SCANNING; | |
| 97 _poll(); | 78 _poll(); |
| 98 } | 79 } |
| 99 | 80 |
| 100 /// Stops watching the directory when there are no more subscribers. | 81 void close() { |
| 101 void _cancel() { | 82 _events.close(); |
| 102 assert(_state != _WatchState.UNSUBSCRIBED); | |
| 103 _state = _WatchState.UNSUBSCRIBED; | |
| 104 | 83 |
| 105 // If we're in the middle of listing the directory, stop. | 84 // If we're in the middle of listing the directory, stop. |
| 106 if (_listSubscription != null) _listSubscription.cancel(); | 85 if (_listSubscription != null) _listSubscription.cancel(); |
| 107 | 86 |
| 108 // Don't process any remaining files. | 87 // Don't process any remaining files. |
| 109 _filesToProcess.clear(); | 88 _filesToProcess.clear(); |
| 110 _polledFiles.clear(); | 89 _polledFiles.clear(); |
| 111 _statuses.clear(); | 90 _statuses.clear(); |
| 112 | |
| 113 _ready = new Completer(); | |
| 114 } | 91 } |
| 115 | 92 |
| 116 /// Scans the contents of the directory once to see which files have been | 93 /// Scans the contents of the directory once to see which files have been |
| 117 /// added, removed, and modified. | 94 /// added, removed, and modified. |
| 118 void _poll() { | 95 void _poll() { |
| 119 _filesToProcess.clear(); | 96 _filesToProcess.clear(); |
| 120 _polledFiles.clear(); | 97 _polledFiles.clear(); |
| 121 | 98 |
| 122 endListing() { | 99 endListing() { |
| 123 assert(_state != _WatchState.UNSUBSCRIBED); | 100 assert(!_events.isClosed); |
| 124 _listSubscription = null; | 101 _listSubscription = null; |
| 125 | 102 |
| 126 // Null tells the queue consumer that we're done listing. | 103 // Null tells the queue consumer that we're done listing. |
| 127 _filesToProcess.add(null); | 104 _filesToProcess.add(null); |
| 128 } | 105 } |
| 129 | 106 |
| 130 var stream = new Directory(directory).list(recursive: true); | 107 var stream = new Directory(directory).list(recursive: true); |
| 131 _listSubscription = stream.listen((entity) { | 108 _listSubscription = stream.listen((entity) { |
| 132 assert(_state != _WatchState.UNSUBSCRIBED); | 109 assert(!_events.isClosed); |
| 133 | 110 |
| 134 if (entity is! File) return; | 111 if (entity is! File) return; |
| 135 _filesToProcess.add(entity.path); | 112 _filesToProcess.add(entity.path); |
| 136 }, onError: (error, StackTrace stackTrace) { | 113 }, onError: (error, stackTrace) { |
| 137 if (!isDirectoryNotFoundException(error)) { | 114 if (!isDirectoryNotFoundException(error)) { |
| 138 // It's some unknown error. Pipe it over to the event stream so the | 115 // It's some unknown error. Pipe it over to the event stream so the |
| 139 // user can see it. | 116 // user can see it. |
| 140 _events.addError(error, stackTrace); | 117 _events.addError(error, stackTrace); |
| 141 } | 118 } |
| 142 | 119 |
| 143 // When an error occurs, we end the listing normally, which has the | 120 // When an error occurs, we end the listing normally, which has the |
| 144 // desired effect of marking all files that were in the directory as | 121 // desired effect of marking all files that were in the directory as |
| 145 // being removed. | 122 // being removed. |
| 146 endListing(); | 123 endListing(); |
| 147 }, onDone: endListing, cancelOnError: true); | 124 }, onDone: endListing, cancelOnError: true); |
| 148 } | 125 } |
| 149 | 126 |
| 150 /// Processes [file] to determine if it has been modified since the last | 127 /// Processes [file] to determine if it has been modified since the last |
| 151 /// time it was scanned. | 128 /// time it was scanned. |
| 152 Future _processFile(String file) { | 129 Future _processFile(String file) { |
| 153 assert(_state != _WatchState.UNSUBSCRIBED); | |
| 154 | |
| 155 // `null` is the sentinel which means the directory listing is complete. | 130 // `null` is the sentinel which means the directory listing is complete. |
| 156 if (file == null) return _completePoll(); | 131 if (file == null) return _completePoll(); |
| 157 | 132 |
| 158 return getModificationTime(file).then((modified) { | 133 return getModificationTime(file).then((modified) { |
| 159 if (_checkForCancel()) return; | 134 if (_events.isClosed) return; |
| 160 | 135 |
| 161 var lastStatus = _statuses[file]; | 136 var lastStatus = _statuses[file]; |
| 162 | 137 |
| 163 // If its modification time hasn't changed, assume the file is unchanged. | 138 // If its modification time hasn't changed, assume the file is unchanged. |
| 164 if (lastStatus != null && lastStatus.modified == modified) { | 139 if (lastStatus != null && lastStatus.modified == modified) { |
| 165 // The file is still here. | 140 // The file is still here. |
| 166 _polledFiles.add(file); | 141 _polledFiles.add(file); |
| 167 return; | 142 return; |
| 168 } | 143 } |
| 169 | 144 |
| 170 return _hashFile(file).then((hash) { | 145 return _hashFile(file).then((hash) { |
| 171 if (_checkForCancel()) return; | 146 if (_events.isClosed) return; |
| 172 | 147 |
| 173 var status = new _FileStatus(modified, hash); | 148 var status = new _FileStatus(modified, hash); |
| 174 _statuses[file] = status; | 149 _statuses[file] = status; |
| 175 _polledFiles.add(file); | 150 _polledFiles.add(file); |
| 176 | 151 |
| 177 // Only notify while in the watching state. | 152 // Only notify if we're ready to emit events. |
| 178 if (_state != _WatchState.WATCHING) return; | 153 if (!isReady) return; |
| 179 | 154 |
| 180 // And the file is different. | 155 // And the file is different. |
| 181 var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); | 156 var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); |
| 182 if (!changed) return; | 157 if (!changed) return; |
| 183 | 158 |
| 184 var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; | 159 var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; |
| 185 _events.add(new WatchEvent(type, file)); | 160 _events.add(new WatchEvent(type, file)); |
| 186 }); | 161 }); |
| 187 }); | 162 }); |
| 188 } | 163 } |
| 189 | 164 |
| 190 /// After the directory listing is complete, this determines which files were | 165 /// After the directory listing is complete, this determines which files were |
| 191 /// removed and then restarts the next poll. | 166 /// removed and then restarts the next poll. |
| 192 Future _completePoll() { | 167 Future _completePoll() { |
| 193 // Any files that were not seen in the last poll but that we have a | 168 // Any files that were not seen in the last poll but that we have a |
| 194 // status for must have been removed. | 169 // status for must have been removed. |
| 195 var removedFiles = _statuses.keys.toSet().difference(_polledFiles); | 170 var removedFiles = _statuses.keys.toSet().difference(_polledFiles); |
| 196 for (var removed in removedFiles) { | 171 for (var removed in removedFiles) { |
| 197 if (_state == _WatchState.WATCHING) { | 172 if (isReady) _events.add(new WatchEvent(ChangeType.REMOVE, removed)); |
| 198 _events.add(new WatchEvent(ChangeType.REMOVE, removed)); | |
| 199 } | |
| 200 _statuses.remove(removed); | 173 _statuses.remove(removed); |
| 201 } | 174 } |
| 202 | 175 |
| 203 if (_state == _WatchState.SCANNING) { | 176 if (!isReady) _ready.complete(); |
| 204 _state = _WatchState.WATCHING; | |
| 205 _ready.complete(); | |
| 206 } | |
| 207 | 177 |
| 208 // Wait and then poll again. | 178 // Wait and then poll again. |
| 209 return new Future.delayed(pollingDelay).then((_) { | 179 return new Future.delayed(_pollingDelay).then((_) { |
| 210 if (_checkForCancel()) return; | 180 if (_events.isClosed) return; |
| 211 _poll(); | 181 _poll(); |
| 212 }); | 182 }); |
| 213 } | 183 } |
| 214 | 184 |
| 215 /// Returns `true` and clears the processing queue if the watcher has been | |
| 216 /// unsubscribed. | |
| 217 bool _checkForCancel() { | |
| 218 if (_state != _WatchState.UNSUBSCRIBED) return false; | |
| 219 | |
| 220 // Don't process any more files. | |
| 221 _filesToProcess.clear(); | |
| 222 return true; | |
| 223 } | |
| 224 | |
| 225 /// Calculates the SHA-1 hash of the file at [path]. | 185 /// Calculates the SHA-1 hash of the file at [path]. |
| 226 Future<List<int>> _hashFile(String path) { | 186 Future<List<int>> _hashFile(String path) { |
| 227 return new File(path).readAsBytes().then((bytes) { | 187 return new File(path).readAsBytes().then((bytes) { |
| 228 var sha1 = new SHA1(); | 188 var sha1 = new SHA1(); |
| 229 sha1.add(bytes); | 189 sha1.add(bytes); |
| 230 return sha1.close(); | 190 return sha1.close(); |
| 231 }); | 191 }); |
| 232 } | 192 } |
| 233 | 193 |
| 234 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same | 194 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same |
| 235 /// series of byte values. | 195 /// series of byte values. |
| 236 bool _sameHash(List<int> a, List<int> b) { | 196 bool _sameHash(List<int> a, List<int> b) { |
| 237 // Hashes should always be the same size. | 197 // Hashes should always be the same size. |
| 238 assert(a.length == b.length); | 198 assert(a.length == b.length); |
| 239 | 199 |
| 240 for (var i = 0; i < a.length; i++) { | 200 for (var i = 0; i < a.length; i++) { |
| 241 if (a[i] != b[i]) return false; | 201 if (a[i] != b[i]) return false; |
| 242 } | 202 } |
| 243 | 203 |
| 244 return true; | 204 return true; |
| 245 } | 205 } |
| 246 } | 206 } |
| 247 | 207 |
| 248 /// Enum class for the states that the [DirectoryWatcher] can be in. | |
| 249 class _WatchState { | |
| 250 /// There are no subscribers to the watcher's event stream and no watching | |
| 251 /// is going on. | |
| 252 static const UNSUBSCRIBED = const _WatchState("unsubscribed"); | |
| 253 | |
| 254 /// There are subscribers and the watcher is doing an initial scan of the | |
| 255 /// directory to see which files were already present before watching started. | |
| 256 /// | |
| 257 /// The watcher does not send notifications for changes that occurred while | |
| 258 /// there were no subscribers, or for files already present before watching. | |
| 259 /// The initial scan is used to determine what "before watching" state of | |
| 260 /// the file system was. | |
| 261 static const SCANNING = const _WatchState("scanning"); | |
| 262 | |
| 263 /// There are subscribers and the watcher is polling the directory to look | |
| 264 /// for changes. | |
| 265 static const WATCHING = const _WatchState("watching"); | |
| 266 | |
| 267 /// The name of the state. | |
| 268 final String name; | |
| 269 | |
| 270 const _WatchState(this.name); | |
| 271 } | |
| 272 | |
| 273 class _FileStatus { | 208 class _FileStatus { |
| 274 /// The last time the file was modified. | 209 /// The last time the file was modified. |
| 275 DateTime modified; | 210 DateTime modified; |
| 276 | 211 |
| 277 /// The SHA-1 hash of the contents of the file. | 212 /// The SHA-1 hash of the contents of the file. |
| 278 List<int> hash; | 213 List<int> hash; |
| 279 | 214 |
| 280 _FileStatus(this.modified, this.hash); | 215 _FileStatus(this.modified, this.hash); |
| 281 } | 216 } |
| 217 |
| OLD | NEW |