| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library watcher.directory_watcher; | 5 library watcher.directory_watcher; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:io'; | 8 import 'dart:io'; |
| 9 | 9 |
| 10 import 'package:crypto/crypto.dart'; | |
| 11 | |
| 12 import 'async_queue.dart'; | |
| 13 import 'stat.dart'; | |
| 14 import 'utils.dart'; | |
| 15 import 'watch_event.dart'; | 10 import 'watch_event.dart'; |
| 11 import 'directory_watcher/linux.dart'; |
| 12 import 'directory_watcher/polling.dart'; |
| 16 | 13 |
| 17 /// Watches the contents of a directory and emits [WatchEvent]s when something | 14 /// Watches the contents of a directory and emits [WatchEvent]s when something |
| 18 /// in the directory has changed. | 15 /// in the directory has changed. |
| 19 class DirectoryWatcher { | 16 abstract class DirectoryWatcher { |
| 20 /// The directory whose contents are being monitored. | 17 /// The directory whose contents are being monitored. |
| 21 final String directory; | 18 String get directory; |
| 22 | 19 |
| 23 /// The broadcast [Stream] of events that have occurred to files in | 20 /// The broadcast [Stream] of events that have occurred to files in |
| 24 /// [directory]. | 21 /// [directory]. |
| 25 /// | 22 /// |
| 26 /// Changes will only be monitored while this stream has subscribers. Any | 23 /// Changes will only be monitored while this stream has subscribers. Any |
| 27 /// file changes that occur during periods when there are no subscribers | 24 /// file changes that occur during periods when there are no subscribers |
| 28 /// will not be reported the next time a subscriber is added. | 25 /// will not be reported the next time a subscriber is added. |
| 29 Stream<WatchEvent> get events => _events.stream; | 26 Stream<WatchEvent> get events; |
| 30 StreamController<WatchEvent> _events; | |
| 31 | 27 |
| 32 _WatchState _state = _WatchState.UNSUBSCRIBED; | 28 /// Whether the watcher is initialized and watching for file changes. |
| 29 /// |
| 30 /// This is true if and only if [ready] is complete. |
| 31 bool get isReady; |
| 33 | 32 |
| 34 /// A [Future] that completes when the watcher is initialized and watching | 33 /// A [Future] that completes when the watcher is initialized and watching |
| 35 /// for file changes. | 34 /// for file changes. |
| 36 /// | 35 /// |
| 37 /// If the watcher is not currently monitoring the directory (because there | 36 /// If the watcher is not currently monitoring the directory (because there |
| 38 /// are no subscribers to [events]), this returns a future that isn't | 37 /// are no subscribers to [events]), this returns a future that isn't |
| 39 /// complete yet. It will complete when a subscriber starts listening and | 38 /// complete yet. It will complete when a subscriber starts listening and |
| 40 /// the watcher finishes any initialization work it needs to do. | 39 /// the watcher finishes any initialization work it needs to do. |
| 41 /// | 40 /// |
| 42 /// If the watcher is already monitoring, this returns an already complete | 41 /// If the watcher is already monitoring, this returns an already complete |
| 43 /// future. | 42 /// future. |
| 44 Future get ready => _ready.future; | 43 Future get ready; |
| 45 Completer _ready = new Completer(); | |
| 46 | |
| 47 /// The amount of time the watcher pauses between successive polls of the | |
| 48 /// directory contents. | |
| 49 final Duration pollingDelay; | |
| 50 | |
| 51 /// The previous status of the files in the directory. | |
| 52 /// | |
| 53 /// Used to tell which files have been modified. | |
| 54 final _statuses = new Map<String, _FileStatus>(); | |
| 55 | |
| 56 /// The subscription used while [directory] is being listed. | |
| 57 /// | |
| 58 /// Will be `null` if a list is not currently happening. | |
| 59 StreamSubscription<FileSystemEntity> _listSubscription; | |
| 60 | |
| 61 /// The queue of files waiting to be processed to see if they have been | |
| 62 /// modified. | |
| 63 /// | |
| 64 /// Processing a file is asynchronous, as is listing the directory, so the | |
| 65 /// queue exists to let each of those proceed at their own rate. The lister | |
| 66 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued | |
| 67 /// and processed sequentially. | |
| 68 AsyncQueue<String> _filesToProcess; | |
| 69 | |
| 70 /// The set of files that have been seen in the current directory listing. | |
| 71 /// | |
| 72 /// Used to tell which files have been removed: files that are in [_statuses] | |
| 73 /// but not in here when a poll completes have been removed. | |
| 74 final _polledFiles = new Set<String>(); | |
| 75 | 44 |
| 76 /// Creates a new [DirectoryWatcher] monitoring [directory]. | 45 /// Creates a new [DirectoryWatcher] monitoring [directory]. |
| 77 /// | 46 /// |
| 78 /// If [pollingDelay] is passed, it specifies the amount of time the watcher | 47 /// If a native directory watcher is available for this platform, this will |
| 79 /// will pause between successive polls of the directory contents. Making | 48 /// use it. Otherwise, it will fall back to a [PollingDirectoryWatcher]. |
| 80 /// this shorter will give more immediate feedback at the expense of doing | 49 /// |
| 81 /// more IO and higher CPU usage. Defaults to one second. | 50 /// If [_pollingDelay] is passed, it specifies the amount of time the watcher |
| 82 DirectoryWatcher(this.directory, {Duration pollingDelay}) | 51 /// will pause between successive polls of the directory contents. Making this |
| 83 : pollingDelay = pollingDelay != null ? pollingDelay : | 52 /// shorter will give more immediate feedback at the expense of doing more IO |
| 84 new Duration(seconds: 1) { | 53 /// and higher CPU usage. Defaults to one second. Ignored for non-polling |
| 85 _events = new StreamController<WatchEvent>.broadcast( | 54 /// watchers. |
| 86 onListen: _watch, onCancel: _cancel); | 55 factory DirectoryWatcher(String directory, {Duration pollingDelay}) { |
| 87 | 56 if (Platform.isLinux) return new LinuxDirectoryWatcher(directory); |
| 88 _filesToProcess = new AsyncQueue<String>(_processFile, | 57 return new PollingDirectoryWatcher(directory, pollingDelay: pollingDelay); |
| 89 onError: _events.addError); | |
| 90 } | |
| 91 | |
| 92 /// Scans to see which files were already present before the watcher was | |
| 93 /// subscribed to, and then starts watching the directory for changes. | |
| 94 void _watch() { | |
| 95 assert(_state == _WatchState.UNSUBSCRIBED); | |
| 96 _state = _WatchState.SCANNING; | |
| 97 _poll(); | |
| 98 } | |
| 99 | |
| 100 /// Stops watching the directory when there are no more subscribers. | |
| 101 void _cancel() { | |
| 102 assert(_state != _WatchState.UNSUBSCRIBED); | |
| 103 _state = _WatchState.UNSUBSCRIBED; | |
| 104 | |
| 105 // If we're in the middle of listing the directory, stop. | |
| 106 if (_listSubscription != null) _listSubscription.cancel(); | |
| 107 | |
| 108 // Don't process any remaining files. | |
| 109 _filesToProcess.clear(); | |
| 110 _polledFiles.clear(); | |
| 111 _statuses.clear(); | |
| 112 | |
| 113 _ready = new Completer(); | |
| 114 } | |
| 115 | |
| 116 /// Scans the contents of the directory once to see which files have been | |
| 117 /// added, removed, and modified. | |
| 118 void _poll() { | |
| 119 _filesToProcess.clear(); | |
| 120 _polledFiles.clear(); | |
| 121 | |
| 122 endListing() { | |
| 123 assert(_state != _WatchState.UNSUBSCRIBED); | |
| 124 _listSubscription = null; | |
| 125 | |
| 126 // Null tells the queue consumer that we're done listing. | |
| 127 _filesToProcess.add(null); | |
| 128 } | |
| 129 | |
| 130 var stream = new Directory(directory).list(recursive: true); | |
| 131 _listSubscription = stream.listen((entity) { | |
| 132 assert(_state != _WatchState.UNSUBSCRIBED); | |
| 133 | |
| 134 if (entity is! File) return; | |
| 135 _filesToProcess.add(entity.path); | |
| 136 }, onError: (error, StackTrace stackTrace) { | |
| 137 if (!isDirectoryNotFoundException(error)) { | |
| 138 // It's some unknown error. Pipe it over to the event stream so the | |
| 139 // user can see it. | |
| 140 _events.addError(error, stackTrace); | |
| 141 } | |
| 142 | |
| 143 // When an error occurs, we end the listing normally, which has the | |
| 144 // desired effect of marking all files that were in the directory as | |
| 145 // being removed. | |
| 146 endListing(); | |
| 147 }, onDone: endListing, cancelOnError: true); | |
| 148 } | |
| 149 | |
| 150 /// Processes [file] to determine if it has been modified since the last | |
| 151 /// time it was scanned. | |
| 152 Future _processFile(String file) { | |
| 153 assert(_state != _WatchState.UNSUBSCRIBED); | |
| 154 | |
| 155 // `null` is the sentinel which means the directory listing is complete. | |
| 156 if (file == null) return _completePoll(); | |
| 157 | |
| 158 return getModificationTime(file).then((modified) { | |
| 159 if (_checkForCancel()) return null; | |
| 160 | |
| 161 var lastStatus = _statuses[file]; | |
| 162 | |
| 163 // If its modification time hasn't changed, assume the file is unchanged. | |
| 164 if (lastStatus != null && lastStatus.modified == modified) { | |
| 165 // The file is still here. | |
| 166 _polledFiles.add(file); | |
| 167 return null; | |
| 168 } | |
| 169 | |
| 170 return _hashFile(file).then((hash) { | |
| 171 if (_checkForCancel()) return; | |
| 172 | |
| 173 var status = new _FileStatus(modified, hash); | |
| 174 _statuses[file] = status; | |
| 175 _polledFiles.add(file); | |
| 176 | |
| 177 // Only notify while in the watching state. | |
| 178 if (_state != _WatchState.WATCHING) return; | |
| 179 | |
| 180 // And the file is different. | |
| 181 var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); | |
| 182 if (!changed) return; | |
| 183 | |
| 184 var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; | |
| 185 _events.add(new WatchEvent(type, file)); | |
| 186 }); | |
| 187 }); | |
| 188 } | |
| 189 | |
| 190 /// After the directory listing is complete, this determines which files were | |
| 191 /// removed and then restarts the next poll. | |
| 192 Future _completePoll() { | |
| 193 // Any files that were not seen in the last poll but that we have a | |
| 194 // status for must have been removed. | |
| 195 var removedFiles = _statuses.keys.toSet().difference(_polledFiles); | |
| 196 for (var removed in removedFiles) { | |
| 197 if (_state == _WatchState.WATCHING) { | |
| 198 _events.add(new WatchEvent(ChangeType.REMOVE, removed)); | |
| 199 } | |
| 200 _statuses.remove(removed); | |
| 201 } | |
| 202 | |
| 203 if (_state == _WatchState.SCANNING) { | |
| 204 _state = _WatchState.WATCHING; | |
| 205 _ready.complete(); | |
| 206 } | |
| 207 | |
| 208 // Wait and then poll again. | |
| 209 return new Future.delayed(pollingDelay).then((_) { | |
| 210 if (_checkForCancel()) return; | |
| 211 _poll(); | |
| 212 }); | |
| 213 } | |
| 214 | |
| 215 /// Returns `true` and clears the processing queue if the watcher has been | |
| 216 /// unsubscribed. | |
| 217 bool _checkForCancel() { | |
| 218 if (_state != _WatchState.UNSUBSCRIBED) return false; | |
| 219 | |
| 220 // Don't process any more files. | |
| 221 _filesToProcess.clear(); | |
| 222 return true; | |
| 223 } | |
| 224 | |
| 225 /// Calculates the SHA-1 hash of the file at [path]. | |
| 226 Future<List<int>> _hashFile(String path) { | |
| 227 return new File(path).readAsBytes().then((bytes) { | |
| 228 var sha1 = new SHA1(); | |
| 229 sha1.add(bytes); | |
| 230 return sha1.close(); | |
| 231 }); | |
| 232 } | |
| 233 | |
| 234 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same | |
| 235 /// series of byte values. | |
| 236 bool _sameHash(List<int> a, List<int> b) { | |
| 237 // Hashes should always be the same size. | |
| 238 assert(a.length == b.length); | |
| 239 | |
| 240 for (var i = 0; i < a.length; i++) { | |
| 241 if (a[i] != b[i]) return false; | |
| 242 } | |
| 243 | |
| 244 return true; | |
| 245 } | 58 } |
| 246 } | 59 } |
| 247 | |
| 248 /// Enum class for the states that the [DirectoryWatcher] can be in. | |
| 249 class _WatchState { | |
| 250 /// There are no subscribers to the watcher's event stream and no watching | |
| 251 /// is going on. | |
| 252 static const UNSUBSCRIBED = const _WatchState("unsubscribed"); | |
| 253 | |
| 254 /// There are subscribers and the watcher is doing an initial scan of the | |
| 255 /// directory to see which files were already present before watching started. | |
| 256 /// | |
| 257 /// The watcher does not send notifications for changes that occurred while | |
| 258 /// there were no subscribers, or for files already present before watching. | |
| 259 /// The initial scan is used to determine what "before watching" state of | |
| 260 /// the file system was. | |
| 261 static const SCANNING = const _WatchState("scanning"); | |
| 262 | |
| 263 /// There are subscribers and the watcher is polling the directory to look | |
| 264 /// for changes. | |
| 265 static const WATCHING = const _WatchState("watching"); | |
| 266 | |
| 267 /// The name of the state. | |
| 268 final String name; | |
| 269 | |
| 270 const _WatchState(this.name); | |
| 271 } | |
| 272 | |
| 273 class _FileStatus { | |
| 274 /// The last time the file was modified. | |
| 275 DateTime modified; | |
| 276 | |
| 277 /// The SHA-1 hash of the contents of the file. | |
| 278 List<int> hash; | |
| 279 | |
| 280 _FileStatus(this.modified, this.hash); | |
| 281 } | |
| OLD | NEW |