| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library watcher.directory_watcher; | 5 library watcher.directory_watcher; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection'; |
| 8 import 'dart:io'; | 9 import 'dart:io'; |
| 9 | 10 |
| 10 import 'package:crypto/crypto.dart'; | 11 import 'package:crypto/crypto.dart'; |
| 11 | 12 |
| 13 import 'async_queue.dart'; |
| 12 import 'stat.dart'; | 14 import 'stat.dart'; |
| 13 import 'watch_event.dart'; | 15 import 'watch_event.dart'; |
| 14 | 16 |
| 15 /// Watches the contents of a directory and emits [WatchEvent]s when something | 17 /// Watches the contents of a directory and emits [WatchEvent]s when something |
| 16 /// in the directory has changed. | 18 /// in the directory has changed. |
| 17 class DirectoryWatcher { | 19 class DirectoryWatcher { |
| 18 /// The directory whose contents are being monitored. | 20 /// The directory whose contents are being monitored. |
| 19 final String directory; | 21 final String directory; |
| 20 | 22 |
| 21 /// The broadcast [Stream] of events that have occurred to files in | 23 /// The broadcast [Stream] of events that have occurred to files in |
| 22 /// [directory]. | 24 /// [directory]. |
| 23 /// | 25 /// |
| 24 /// Changes will only be monitored while this stream has subscribers. Any | 26 /// Changes will only be monitored while this stream has subscribers. Any |
| 25 /// file changes that occur during periods when there are no subscribers | 27 /// file changes that occur during periods when there are no subscribers |
| 26 /// will not be reported the next time a subscriber is added. | 28 /// will not be reported the next time a subscriber is added. |
| 27 Stream<WatchEvent> get events => _events.stream; | 29 Stream<WatchEvent> get events => _events.stream; |
| 28 StreamController<WatchEvent> _events; | 30 StreamController<WatchEvent> _events; |
| 29 | 31 |
| 30 _WatchState _state = _WatchState.notWatching; | 32 _WatchState _state = _WatchState.UNSUBSCRIBED; |
| 31 | 33 |
| 32 /// A [Future] that completes when the watcher is initialized and watching | 34 /// A [Future] that completes when the watcher is initialized and watching |
| 33 /// for file changes. | 35 /// for file changes. |
| 34 /// | 36 /// |
| 35 /// If the watcher is not currently monitoring the directory (because there | 37 /// If the watcher is not currently monitoring the directory (because there |
| 36 /// are no subscribers to [events]), this returns a future that isn't | 38 /// are no subscribers to [events]), this returns a future that isn't |
| 37 /// complete yet. It will complete when a subscriber starts listening and | 39 /// complete yet. It will complete when a subscriber starts listening and |
| 38 /// the watcher finishes any initialization work it needs to do. | 40 /// the watcher finishes any initialization work it needs to do. |
| 39 /// | 41 /// |
| 40 /// If the watcher is already monitoring, this returns an already complete | 42 /// If the watcher is already monitoring, this returns an already complete |
| 41 /// future. | 43 /// future. |
| 42 Future get ready => _ready.future; | 44 Future get ready => _ready.future; |
| 43 Completer _ready = new Completer(); | 45 Completer _ready = new Completer(); |
| 44 | 46 |
| 45 /// The amount of time the watcher pauses between successive polls of the | 47 /// The amount of time the watcher pauses between successive polls of the |
| 46 /// directory contents. | 48 /// directory contents. |
| 47 final Duration pollingDelay; | 49 final Duration pollingDelay; |
| 48 | 50 |
| 49 /// The previous status of the files in the directory. | 51 /// The previous status of the files in the directory. |
| 50 /// | 52 /// |
| 51 /// Used to tell which files have been modified. | 53 /// Used to tell which files have been modified. |
| 52 final _statuses = new Map<String, _FileStatus>(); | 54 final _statuses = new Map<String, _FileStatus>(); |
| 53 | 55 |
| 56 /// The subscription used while [directory] is being listed. |
| 57 /// |
| 58 /// Will be `null` if a list is not currently happening. |
| 59 StreamSubscription<FileSystemEntity> _listSubscription; |
| 60 |
| 61 /// The queue of files waiting to be processed to see if they have been |
| 62 /// modified. |
| 63 /// |
| 64 /// Processing a file is asynchronous, as is listing the directory, so the |
| 65 /// queue exists to let each of those proceed at their own rate. The lister |
| 66 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued |
| 67 /// and processed sequentially. |
| 68 AsyncQueue<String> _filesToProcess; |
| 69 |
| 70 /// The set of files that have been seen in the current directory listing. |
| 71 /// |
| 72 /// Used to tell which files have been removed: files that are in [_statuses] |
| 73 /// but not in here when a poll completes have been removed. |
| 74 final _polledFiles = new Set<String>(); |
| 75 |
| 54 /// Creates a new [DirectoryWatcher] monitoring [directory]. | 76 /// Creates a new [DirectoryWatcher] monitoring [directory]. |
| 55 /// | 77 /// |
| 56 /// If [pollingDelay] is passed, it specifies the amount of time the watcher | 78 /// If [pollingDelay] is passed, it specifies the amount of time the watcher |
| 57 /// will pause between successive polls of the directory contents. Making | 79 /// will pause between successive polls of the directory contents. Making |
| 58 /// this shorter will give more immediate feedback at the expense of doing | 80 /// this shorter will give more immediate feedback at the expense of doing |
| 59 /// more IO and higher CPU usage. Defaults to one second. | 81 /// more IO and higher CPU usage. Defaults to one second. |
| 60 DirectoryWatcher(this.directory, {Duration pollingDelay}) | 82 DirectoryWatcher(this.directory, {Duration pollingDelay}) |
| 61 : pollingDelay = pollingDelay != null ? pollingDelay : | 83 : pollingDelay = pollingDelay != null ? pollingDelay : |
| 62 new Duration(seconds: 1) { | 84 new Duration(seconds: 1) { |
| 63 _events = new StreamController<WatchEvent>.broadcast(onListen: () { | 85 _events = new StreamController<WatchEvent>.broadcast( |
| 64 _state = _state.listen(this); | 86 onListen: _watch, onCancel: _cancel); |
| 65 }, onCancel: () { | 87 |
| 66 _state = _state.cancel(this); | 88 _filesToProcess = new AsyncQueue<String>(_processFile, |
| 89 onError: _events.addError); |
| 90 } |
| 91 |
| 92 /// Scans to see which files were already present before the watcher was |
| 93 /// subscribed to, and then starts watching the directory for changes. |
| 94 void _watch() { |
| 95 assert(_state == _WatchState.UNSUBSCRIBED); |
| 96 _state = _WatchState.SCANNING; |
| 97 _poll(); |
| 98 } |
| 99 |
| 100 /// Stops watching the directory when there are no more subscribers. |
| 101 void _cancel() { |
| 102 assert(_state != _WatchState.UNSUBSCRIBED); |
| 103 _state = _WatchState.UNSUBSCRIBED; |
| 104 |
| 105 // If we're in the middle of listing the directory, stop. |
| 106 if (_listSubscription != null) _listSubscription.cancel(); |
| 107 |
| 108 // Don't process any remaining files. |
| 109 _filesToProcess.clear(); |
| 110 _polledFiles.clear(); |
| 111 _statuses.clear(); |
| 112 |
| 113 _ready = new Completer(); |
| 114 } |
| 115 |
| 116 /// Scans the contents of the directory once to see which files have been |
| 117 /// added, removed, and modified. |
| 118 void _poll() { |
| 119 _filesToProcess.clear(); |
| 120 _polledFiles.clear(); |
| 121 |
| 122 var stream = new Directory(directory).list(recursive: true); |
| 123 _listSubscription = stream.listen((entity) { |
| 124 assert(_state != _WatchState.UNSUBSCRIBED); |
| 125 |
| 126 if (entity is! File) return; |
| 127 _filesToProcess.add(entity.path); |
| 128 }, onDone: () { |
| 129 assert(_state != _WatchState.UNSUBSCRIBED); |
| 130 _listSubscription = null; |
| 131 |
| 132 // Null tells the queue consumer that we're done listing. |
| 133 _filesToProcess.add(null); |
| 67 }); | 134 }); |
| 68 } | 135 } |
| 69 | 136 |
| 70 /// Starts the asynchronous polling process. | 137 /// Processes [file] to determine if it has been modified since the last |
| 71 /// | 138 /// time it was scanned. |
| 72 /// Scans the contents of the directory and compares the results to the | 139 Future _processFile(String file) { |
| 73 /// previous scan. Loops to continue monitoring as long as there are | 140 assert(_state != _WatchState.UNSUBSCRIBED); |
| 74 /// subscribers to the [events] stream. | |
| 75 Future _watch() { | |
| 76 var files = new Set<String>(); | |
| 77 | 141 |
| 78 var stream = new Directory(directory).list(recursive: true); | 142 // `null` is the sentinel which means the directory listing is complete. |
| 143 if (file == null) return _completePoll(); |
| 79 | 144 |
| 80 return stream.map((entity) { | 145 return getModificationTime(file).then((modified) { |
| 81 if (entity is! File) return new Future.value(); | 146 if (_checkForCancel()) return; |
| 82 files.add(entity.path); | 147 |
| 83 // TODO(rnystrom): These all run as fast as possible and read the | 148 var lastStatus = _statuses[file]; |
| 84 // contents of the files. That means there's a pretty big IO hit all at | 149 |
| 85 // once. Maybe these should be queued up and rate limited? | 150 // If its modification time hasn't changed, assume the file is unchanged. |
| 86 return _refreshFile(entity.path); | 151 if (lastStatus != null && lastStatus.modified == modified) { |
| 87 }).toList().then((futures) { | 152 // The file is still here. |
| 88 // Once the listing is done, make sure to wait until each file is also | 153 _polledFiles.add(file); |
| 89 // done. | 154 return; |
| 90 return Future.wait(futures); | |
| 91 }).then((_) { | |
| 92 var removedFiles = _statuses.keys.toSet().difference(files); | |
| 93 for (var removed in removedFiles) { | |
| 94 if (_state.shouldNotify) { | |
| 95 _events.add(new WatchEvent(ChangeType.REMOVE, removed)); | |
| 96 } | |
| 97 _statuses.remove(removed); | |
| 98 } | 155 } |
| 99 | 156 |
| 100 var previousState = _state; | 157 return _hashFile(file).then((hash) { |
| 101 _state = _state.finish(this); | 158 if (_checkForCancel()) return; |
| 102 | 159 |
| 103 // If we were already sending notifications, add a bit of delay before | 160 var status = new _FileStatus(modified, hash); |
| 104 // restarting just so that we don't whale on the file system. | 161 _statuses[file] = status; |
| 105 // TODO(rnystrom): Tune this and/or make it tunable? | 162 _polledFiles.add(file); |
| 106 if (_state.shouldNotify) { | 163 |
| 107 return new Future.delayed(pollingDelay); | 164 // Only notify while in the watching state. |
| 108 } | 165 if (_state != _WatchState.WATCHING) return; |
| 109 }).then((_) { | 166 |
| 110 // Make sure we haven't transitioned to a non-watching state during the | 167 // And the file is different. |
| 111 // delay. | 168 var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); |
| 112 if (_state.shouldWatch) _watch(); | 169 if (!changed) return; |
| 170 |
| 171 var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; |
| 172 _events.add(new WatchEvent(type, file)); |
| 173 }); |
| 113 }); | 174 }); |
| 114 } | 175 } |
| 115 | 176 |
| 116 /// Compares the current state of the file at [path] to the state it was in | 177 /// After the directory listing is complete, this determines which files were |
| 117 /// the last time it was scanned. | 178 /// removed and then restarts the next poll. |
| 118 Future _refreshFile(String path) { | 179 Future _completePoll() { |
| 119 return getModificationTime(path).then((modified) { | 180 // Any files that were not seen in the last poll but that we have a |
| 120 var lastStatus = _statuses[path]; | 181 // status for must have been removed. |
| 182 var removedFiles = _statuses.keys.toSet().difference(_polledFiles); |
| 183 for (var removed in removedFiles) { |
| 184 if (_state == _WatchState.WATCHING) { |
| 185 _events.add(new WatchEvent(ChangeType.REMOVE, removed)); |
| 186 } |
| 187 _statuses.remove(removed); |
| 188 } |
| 121 | 189 |
| 122 // If it's modification time hasn't changed, assume the file is unchanged. | 190 if (_state == _WatchState.SCANNING) { |
| 123 if (lastStatus != null && lastStatus.modified == modified) return; | 191 _state = _WatchState.WATCHING; |
| 192 _ready.complete(); |
| 193 } |
| 124 | 194 |
| 125 return _hashFile(path).then((hash) { | 195 // Wait and then poll again. |
| 126 var status = new _FileStatus(modified, hash); | 196 return new Future.delayed(pollingDelay).then((_) { |
| 127 _statuses[path] = status; | 197 if (_checkForCancel()) return; |
| 198 _poll(); |
| 199 }); |
| 200 } |
| 128 | 201 |
| 129 // Only notify if the file contents changed. | 202 /// Returns `true` and clears the processing queue if the watcher has been |
| 130 if (_state.shouldNotify && | 203 /// unsubscribed. |
| 131 (lastStatus == null || !_sameHash(lastStatus.hash, hash))) { | 204 bool _checkForCancel() { |
| 132 var change = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; | 205 if (_state != _WatchState.UNSUBSCRIBED) return false; |
| 133 _events.add(new WatchEvent(change, path)); | 206 |
| 134 } | 207 // Don't process any more files. |
| 135 }); | 208 _filesToProcess.clear(); |
| 136 }); | 209 return true; |
| 137 } | 210 } |
| 138 | 211 |
| 139 /// Calculates the SHA-1 hash of the file at [path]. | 212 /// Calculates the SHA-1 hash of the file at [path]. |
| 140 Future<List<int>> _hashFile(String path) { | 213 Future<List<int>> _hashFile(String path) { |
| 141 return new File(path).readAsBytes().then((bytes) { | 214 return new File(path).readAsBytes().then((bytes) { |
| 142 var sha1 = new SHA1(); | 215 var sha1 = new SHA1(); |
| 143 sha1.add(bytes); | 216 sha1.add(bytes); |
| 144 return sha1.close(); | 217 return sha1.close(); |
| 145 }); | 218 }); |
| 146 } | 219 } |
| 147 | 220 |
| 148 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same | 221 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same |
| 149 /// series of byte values. | 222 /// series of byte values. |
| 150 bool _sameHash(List<int> a, List<int> b) { | 223 bool _sameHash(List<int> a, List<int> b) { |
| 151 // Hashes should always be the same size. | 224 // Hashes should always be the same size. |
| 152 assert(a.length == b.length); | 225 assert(a.length == b.length); |
| 153 | 226 |
| 154 for (var i = 0; i < a.length; i++) { | 227 for (var i = 0; i < a.length; i++) { |
| 155 if (a[i] != b[i]) return false; | 228 if (a[i] != b[i]) return false; |
| 156 } | 229 } |
| 157 | 230 |
| 158 return true; | 231 return true; |
| 159 } | 232 } |
| 160 } | 233 } |
| 161 | 234 |
| 162 /// An "event" that is sent to the [_WatchState] FSM to trigger state | 235 /// Enum class for the states that the [DirectoryWatcher] can be in. |
| 163 /// transitions. | 236 class _WatchState { |
| 164 typedef _WatchState _WatchStateEvent(DirectoryWatcher watcher); | 237 /// There are no subscribers to the watcher's event stream and no watching |
| 238 /// is going on. |
| 239 static const UNSUBSCRIBED = const _WatchState("unsubscribed"); |
| 165 | 240 |
| 166 /// The different states that the watcher can be in and the transitions between | 241 /// There are subscribers and the watcher is doing an initial scan of the |
| 167 /// them. | 242 /// directory to see which files were already present before watching started. |
| 168 /// | 243 /// |
| 169 /// This class defines a finite state machine for keeping track of what the | 244 /// The watcher does not send notifications for changes that occurred while |
| 170 /// asynchronous file polling is doing. Each instance of this is a state in the | 245 /// there were no subscribers, or for files already present before watching. |
| 171 /// machine and its [listen], [cancel], and [finish] fields define the state | 246 /// The initial scan is used to determine what "before watching" state of |
| 172 /// transitions when those events occur. | 247 /// the file system was. |
| 173 class _WatchState { | 248 static const SCANNING = const _WatchState("scanning"); |
| 174 /// The watcher has no subscribers. | |
| 175 static final notWatching = new _WatchState( | |
| 176 listen: (watcher) { | |
| 177 watcher._watch(); | |
| 178 return _WatchState.scanning; | |
| 179 }); | |
| 180 | 249 |
| 181 /// The watcher has subscribers and is scanning for pre-existing files. | 250 /// There are subscribers and the watcher is polling the directory to look |
| 182 static final scanning = new _WatchState( | 251 /// for changes. |
| 183 cancel: (watcher) { | 252 static const WATCHING = const _WatchState("watching"); |
| 184 // No longer watching, so create a new incomplete ready future. | |
| 185 watcher._ready = new Completer(); | |
| 186 return _WatchState.cancelling; | |
| 187 }, finish: (watcher) { | |
| 188 watcher._ready.complete(); | |
| 189 return _WatchState.watching; | |
| 190 }, shouldWatch: true); | |
| 191 | 253 |
| 192 /// The watcher was unsubscribed while polling and we're waiting for the poll | 254 /// The name of the state. |
| 193 /// to finish. | 255 final String name; |
| 194 static final cancelling = new _WatchState( | |
| 195 listen: (_) => _WatchState.scanning, | |
| 196 finish: (_) => _WatchState.notWatching); | |
| 197 | 256 |
| 198 /// The watcher has subscribers, we have scanned for pre-existing files and | 257 const _WatchState(this.name); |
| 199 /// now we're polling for changes. | |
| 200 static final watching = new _WatchState( | |
| 201 cancel: (watcher) { | |
| 202 // No longer watching, so create a new incomplete ready future. | |
| 203 watcher._ready = new Completer(); | |
| 204 return _WatchState.cancelling; | |
| 205 }, finish: (_) => _WatchState.watching, | |
| 206 shouldWatch: true, shouldNotify: true); | |
| 207 | |
| 208 /// Called when the first subscriber to the watcher has been added. | |
| 209 final _WatchStateEvent listen; | |
| 210 | |
| 211 /// Called when all subscriptions on the watcher have been cancelled. | |
| 212 final _WatchStateEvent cancel; | |
| 213 | |
| 214 /// Called when a poll loop has finished. | |
| 215 final _WatchStateEvent finish; | |
| 216 | |
| 217 /// If the directory watcher should be watching the file system while in | |
| 218 /// this state. | |
| 219 final bool shouldWatch; | |
| 220 | |
| 221 /// If a change event should be sent for a file modification while in this | |
| 222 /// state. | |
| 223 final bool shouldNotify; | |
| 224 | |
| 225 _WatchState({this.listen, this.cancel, this.finish, | |
| 226 this.shouldWatch: false, this.shouldNotify: false}); | |
| 227 } | 258 } |
| 228 | 259 |
| 229 class _FileStatus { | 260 class _FileStatus { |
| 230 /// The last time the file was modified. | 261 /// The last time the file was modified. |
| 231 DateTime modified; | 262 DateTime modified; |
| 232 | 263 |
| 233 /// The SHA-1 hash of the contents of the file. | 264 /// The SHA-1 hash of the contents of the file. |
| 234 List<int> hash; | 265 List<int> hash; |
| 235 | 266 |
| 236 _FileStatus(this.modified, this.hash); | 267 _FileStatus(this.modified, this.hash); |
| 237 } | 268 } |
| OLD | NEW |