Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library watcher.directory_watcher; | 5 library watcher.directory_watcher; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection'; | |
| 8 import 'dart:io'; | 9 import 'dart:io'; |
| 9 | 10 |
| 10 import 'package:crypto/crypto.dart'; | 11 import 'package:crypto/crypto.dart'; |
| 11 | 12 |
| 12 import 'stat.dart'; | 13 import 'stat.dart'; |
| 13 import 'watch_event.dart'; | 14 import 'watch_event.dart'; |
| 14 | 15 |
| 15 /// Watches the contents of a directory and emits [WatchEvent]s when something | 16 /// Watches the contents of a directory and emits [WatchEvent]s when something |
| 16 /// in the directory has changed. | 17 /// in the directory has changed. |
| 17 class DirectoryWatcher { | 18 class DirectoryWatcher { |
| 18 /// The directory whose contents are being monitored. | 19 /// The directory whose contents are being monitored. |
| 19 final String directory; | 20 final String directory; |
| 20 | 21 |
| 21 /// The broadcast [Stream] of events that have occurred to files in | 22 /// The broadcast [Stream] of events that have occurred to files in |
| 22 /// [directory]. | 23 /// [directory]. |
| 23 /// | 24 /// |
| 24 /// Changes will only be monitored while this stream has subscribers. Any | 25 /// Changes will only be monitored while this stream has subscribers. Any |
| 25 /// file changes that occur during periods when there are no subscribers | 26 /// file changes that occur during periods when there are no subscribers |
| 26 /// will not be reported the next time a subscriber is added. | 27 /// will not be reported the next time a subscriber is added. |
| 27 Stream<WatchEvent> get events => _events.stream; | 28 Stream<WatchEvent> get events => _events.stream; |
| 28 StreamController<WatchEvent> _events; | 29 StreamController<WatchEvent> _events; |
| 29 | 30 |
| 30 _WatchState _state = _WatchState.notWatching; | 31 _WatchState _state = _WatchState.UNSUBSCRIBED; |
| 31 | 32 |
| 32 /// A [Future] that completes when the watcher is initialized and watching | 33 /// A [Future] that completes when the watcher is initialized and watching |
| 33 /// for file changes. | 34 /// for file changes. |
| 34 /// | 35 /// |
| 35 /// If the watcher is not currently monitoring the directory (because there | 36 /// If the watcher is not currently monitoring the directory (because there |
| 36 /// are no subscribers to [events]), this returns a future that isn't | 37 /// are no subscribers to [events]), this returns a future that isn't |
| 37 /// complete yet. It will complete when a subscriber starts listening and | 38 /// complete yet. It will complete when a subscriber starts listening and |
| 38 /// the watcher finishes any initialization work it needs to do. | 39 /// the watcher finishes any initialization work it needs to do. |
| 39 /// | 40 /// |
| 40 /// If the watcher is already monitoring, this returns an already complete | 41 /// If the watcher is already monitoring, this returns an already complete |
| 41 /// future. | 42 /// future. |
| 42 Future get ready => _ready.future; | 43 Future get ready => _ready.future; |
| 43 Completer _ready = new Completer(); | 44 Completer _ready = new Completer(); |
| 44 | 45 |
| 45 /// The amount of time the watcher pauses between successive polls of the | 46 /// The amount of time the watcher pauses between successive polls of the |
| 46 /// directory contents. | 47 /// directory contents. |
| 47 final Duration pollingDelay; | 48 final Duration pollingDelay; |
| 48 | 49 |
| 49 /// The previous status of the files in the directory. | 50 /// The previous status of the files in the directory. |
| 50 /// | 51 /// |
| 51 /// Used to tell which files have been modified. | 52 /// Used to tell which files have been modified. |
| 52 final _statuses = new Map<String, _FileStatus>(); | 53 final _statuses = new Map<String, _FileStatus>(); |
| 53 | 54 |
| 55 /// The subscription used while [directory] is being listed. | |
| 56 /// | |
| 57 /// Will be `null` if a list is not currently happening. | |
| 58 StreamSubscription<FileSystemEntity> _listSubscription; | |
| 59 | |
| 60 /// This will be `true` if we are currently asynchronously processing files | |
| 61 /// from [_filesToCheck]. | |
|
nweiz
2013/08/01 22:52:39
"_filesToCheck" -> "_filesToProcess"
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
| 62 bool _isProcessing = false; | |
| 63 | |
| 64 /// The queue of files waiting to be processed to see if they have been | |
| 65 /// modified. | |
| 66 /// | |
| 67 /// Processing a file is asynchronous, as is listing the directory, so the | |
| 68 /// queue exists to let each of those proceed at their own rate. The lister | |
| 69 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued | |
| 70 /// and processed sequentially. | |
| 71 final _filesToProcess = new Queue<String>(); | |
|
nweiz
2013/08/01 22:52:39
See how this looks with a FutureGroup instead of a
Bob Nystrom
2013/08/02 17:26:29
I started going in that direction but it looks lik
nweiz
2013/08/02 20:07:52
SGTM. Can you abstract out the notion of a sequent
Bob Nystrom
2013/08/02 21:45:11
Done.
| |
| 72 | |
| 73 /// The set of files that have been seen in the current directory listing. | |
| 74 /// | |
| 75 /// Used to tell which files have been removed: files that are in [_statuses] | |
| 76 /// but not in here when a poll completes have been removed. | |
| 77 final _polledFiles = new Set<String>(); | |
| 78 | |
| 54 /// Creates a new [DirectoryWatcher] monitoring [directory]. | 79 /// Creates a new [DirectoryWatcher] monitoring [directory]. |
| 55 /// | 80 /// |
| 56 /// If [pollingDelay] is passed, it specifies the amount of time the watcher | 81 /// If [pollingDelay] is passed, it specifies the amount of time the watcher |
| 57 /// will pause between successive polls of the directory contents. Making | 82 /// will pause between successive polls of the directory contents. Making |
| 58 /// this shorter will give more immediate feedback at the expense of doing | 83 /// this shorter will give more immediate feedback at the expense of doing |
| 59 /// more IO and higher CPU usage. Defaults to one second. | 84 /// more IO and higher CPU usage. Defaults to one second. |
| 60 DirectoryWatcher(this.directory, {Duration pollingDelay}) | 85 DirectoryWatcher(this.directory, {Duration pollingDelay}) |
| 61 : pollingDelay = pollingDelay != null ? pollingDelay : | 86 : pollingDelay = pollingDelay != null ? pollingDelay : |
| 62 new Duration(seconds: 1) { | 87 new Duration(seconds: 1) { |
| 63 _events = new StreamController<WatchEvent>.broadcast(onListen: () { | 88 _events = new StreamController<WatchEvent>.broadcast( |
| 64 _state = _state.listen(this); | 89 onListen: _watch, onCancel: _cancel); |
| 65 }, onCancel: () { | 90 } |
| 66 _state = _state.cancel(this); | 91 |
| 92 /// Scans to see which files were already present before the watcher was | |
| 93 /// subscribed to, and then starts watching the directory for changes. | |
| 94 void _watch() { | |
| 95 assert(_state == _WatchState.UNSUBSCRIBED); | |
| 96 _state = _WatchState.SCANNING; | |
| 97 _poll(); | |
| 98 } | |
| 99 | |
| 100 /// Stops watching the directory when there are no more subscribers. | |
| 101 void _cancel() { | |
| 102 assert(_state != _WatchState.UNSUBSCRIBED); | |
| 103 _state = _WatchState.UNSUBSCRIBED; | |
| 104 | |
| 105 // If we're in the middle of listing the directory, stop. | |
| 106 if (_listSubscription != null) _listSubscription.cancel(); | |
| 107 | |
| 108 // Don't process any remaining files. | |
| 109 _filesToProcess.clear(); | |
| 110 _polledFiles.clear(); | |
|
nweiz
2013/08/01 22:52:39
Also clear _statuses.
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
| 111 | |
| 112 _ready = new Completer(); | |
| 113 } | |
| 114 | |
| 115 /// Scans the contents of the directory once to see which files have been | |
| 116 /// added, removed, and modified. | |
| 117 void _poll() { | |
| 118 _filesToProcess.clear(); | |
| 119 _polledFiles.clear(); | |
| 120 | |
| 121 var stream = new Directory(directory).list(recursive: true); | |
| 122 _listSubscription = stream.listen((entity) { | |
| 123 assert(_state != _WatchState.UNSUBSCRIBED); | |
| 124 | |
| 125 if (entity is! File) return; | |
| 126 _enqueueFile(entity.path); | |
| 127 }, onDone: () { | |
| 128 assert(_state != _WatchState.UNSUBSCRIBED); | |
| 129 _listSubscription = null; | |
| 130 | |
| 131 // Null tells the queue consumer that we're done listing. | |
| 132 _enqueueFile(null); | |
| 67 }); | 133 }); |
| 68 } | 134 } |
| 69 | 135 |
| 70 /// Starts the asynchronous polling process. | 136 /// Add [path] to the queue of files whose status needs to be processed. |
| 71 /// | 137 void _enqueueFile(String path) { |
| 72 /// Scans the contents of the directory and compares the results to the | 138 _filesToProcess.add(path); |
| 73 /// previous scan. Loops to continue monitoring as long as there are | |
| 74 /// subscribers to the [events] stream. | |
| 75 Future _watch() { | |
| 76 var files = new Set<String>(); | |
| 77 | 139 |
| 78 var stream = new Directory(directory).list(recursive: true); | 140 // Start up the asynchronous processing if not already running. |
| 141 if (_isProcessing) return; | |
| 142 _isProcessing = true; | |
| 79 | 143 |
| 80 return stream.map((entity) { | 144 refreshNextFile() { |
|
nweiz
2013/08/01 22:52:39
It feels like this warrants being its own method.
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
| 81 if (entity is! File) return new Future.value(); | 145 var file = _filesToProcess.removeFirst(); |
| 82 files.add(entity.path); | 146 |
| 83 // TODO(rnystrom): These all run as fast as possible and read the | 147 // `null` is the sentinel which means the directory listing is complete. |
| 84 // contents of the files. That means there's a pretty big IO hit all at | 148 if (file == null) { |
| 85 // once. Maybe these should be queued up and rate limited? | 149 _isProcessing = false; |
| 86 return _refreshFile(entity.path); | 150 |
| 87 }).toList().then((futures) { | 151 // Any files that were not seen in the last poll but that we have a |
| 88 // Once the listing is done, make sure to wait until each file is also | 152 // status for must have been removed. |
| 89 // done. | 153 var removedFiles = _statuses.keys.toSet().difference(_polledFiles); |
| 90 return Future.wait(futures); | 154 for (var removed in removedFiles) { |
| 91 }).then((_) { | 155 if (_state == _WatchState.WATCHING) { |
| 92 var removedFiles = _statuses.keys.toSet().difference(files); | 156 _events.add(new WatchEvent(ChangeType.REMOVE, removed)); |
| 93 for (var removed in removedFiles) { | 157 } |
| 94 if (_state.shouldNotify) { | 158 _statuses.remove(removed); |
| 95 _events.add(new WatchEvent(ChangeType.REMOVE, removed)); | |
| 96 } | 159 } |
| 97 _statuses.remove(removed); | 160 |
| 161 assert(_state != _WatchState.UNSUBSCRIBED); | |
|
nweiz
2013/08/01 22:52:39
This should be at the top of [refreshNextFile].
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
| 162 if (_state == _WatchState.SCANNING) { | |
| 163 _state = _WatchState.WATCHING; | |
| 164 print("done scanning, watching"); | |
|
nweiz
2013/08/01 22:52:39
Left over from debugging?
Bob Nystrom
2013/08/02 17:26:29
Yup. Done.
| |
| 165 _ready.complete(); | |
| 166 } | |
| 167 | |
| 168 // Wait. | |
| 169 return new Future.delayed(pollingDelay).then((_) { | |
| 170 // Stop if we unsubscribed while waiting. | |
| 171 if (_state == _WatchState.UNSUBSCRIBED) return; | |
| 172 | |
| 173 // And then poll again. | |
| 174 _poll(); | |
| 175 }); | |
| 98 } | 176 } |
| 99 | 177 |
| 100 var previousState = _state; | 178 return _processFile(file).then((_) { |
| 101 _state = _state.finish(this); | 179 // Stop if we unsubscribed while waiting. |
| 180 if (_state == _WatchState.UNSUBSCRIBED) return; | |
| 102 | 181 |
| 103 // If we were already sending notifications, add a bit of delay before | 182 // If we have drained the queue (i.e. we are processing faster than |
| 104 // restarting just so that we don't whale on the file system. | 183 // we are listing files), then stop processing and wait until something |
| 105 // TODO(rnystrom): Tune this and/or make it tunable? | 184 // has been enqueued. |
| 106 if (_state.shouldNotify) { | 185 if (_filesToProcess.isEmpty) { |
| 107 return new Future.delayed(pollingDelay); | 186 _isProcessing = false; |
| 108 } | 187 return; |
| 109 }).then((_) { | 188 } |
| 110 // Make sure we haven't transitioned to a non-watching state during the | 189 |
| 111 // delay. | 190 return refreshNextFile(); |
| 112 if (_state.shouldWatch) _watch(); | 191 }); |
| 192 } | |
| 193 | |
| 194 // Pipe all errors to the notification stream. | |
| 195 refreshNextFile().catchError((error) { | |
| 196 _events.addError(error, getAttachedStackTrace(error)); | |
|
nweiz
2013/08/01 22:52:39
You don't need to manually forward the stack trace
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
| 113 }); | 197 }); |
| 114 } | 198 } |
| 115 | 199 |
| 116 /// Compares the current state of the file at [path] to the state it was in | 200 Future _processFile(String path) { |
| 117 /// the last time it was scanned. | |
| 118 Future _refreshFile(String path) { | |
| 119 return getModificationTime(path).then((modified) { | 201 return getModificationTime(path).then((modified) { |
| 202 // Stop if we unsubscribed while waiting. | |
| 203 if (_state == _WatchState.UNSUBSCRIBED) return; | |
| 204 | |
| 120 var lastStatus = _statuses[path]; | 205 var lastStatus = _statuses[path]; |
| 121 | 206 |
| 122 // If it's modification time hasn't changed, assume the file is unchanged. | 207 // If it's modification time hasn't changed, assume the file is unchanged. |
|
nweiz
2013/08/01 22:52:39
"it's" -> "its"
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
| 123 if (lastStatus != null && lastStatus.modified == modified) return; | 208 if (lastStatus != null && lastStatus.modified == modified) { |
| 209 // The file is still here. | |
| 210 _polledFiles.add(path); | |
| 211 return; | |
| 212 } | |
| 124 | 213 |
| 125 return _hashFile(path).then((hash) { | 214 return _hashFile(path).then((hash) { |
| 215 // Stop if we unsubscribed while waiting. | |
| 216 if (_state == _WatchState.UNSUBSCRIBED) return; | |
| 217 | |
| 126 var status = new _FileStatus(modified, hash); | 218 var status = new _FileStatus(modified, hash); |
| 127 _statuses[path] = status; | 219 _statuses[path] = status; |
| 220 _polledFiles.add(path); | |
| 128 | 221 |
| 129 // Only notify if the file contents changed. | 222 // Only notify while in the watching state. |
| 130 if (_state.shouldNotify && | 223 if (_state == _WatchState.WATCHING) { |
|
nweiz
2013/08/01 22:52:39
Short-circuit here and below.
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
| 131 (lastStatus == null || !_sameHash(lastStatus.hash, hash))) { | 224 var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); |
| 132 var change = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; | 225 if (changed) { |
| 133 _events.add(new WatchEvent(change, path)); | 226 var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; |
| 227 _events.add(new WatchEvent(type, path)); | |
| 228 } | |
| 134 } | 229 } |
| 135 }); | 230 }); |
| 136 }); | 231 }); |
| 137 } | 232 } |
| 138 | 233 |
| 139 /// Calculates the SHA-1 hash of the file at [path]. | 234 /// Calculates the SHA-1 hash of the file at [path]. |
| 140 Future<List<int>> _hashFile(String path) { | 235 Future<List<int>> _hashFile(String path) { |
| 141 return new File(path).readAsBytes().then((bytes) { | 236 return new File(path).readAsBytes().then((bytes) { |
| 142 var sha1 = new SHA1(); | 237 var sha1 = new SHA1(); |
| 143 sha1.add(bytes); | 238 sha1.add(bytes); |
| 144 return sha1.close(); | 239 return sha1.close(); |
| 145 }); | 240 }); |
| 146 } | 241 } |
| 147 | 242 |
| 148 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same | 243 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same |
| 149 /// series of byte values. | 244 /// series of byte values. |
| 150 bool _sameHash(List<int> a, List<int> b) { | 245 bool _sameHash(List<int> a, List<int> b) { |
| 151 // Hashes should always be the same size. | 246 // Hashes should always be the same size. |
| 152 assert(a.length == b.length); | 247 assert(a.length == b.length); |
| 153 | 248 |
| 154 for (var i = 0; i < a.length; i++) { | 249 for (var i = 0; i < a.length; i++) { |
| 155 if (a[i] != b[i]) return false; | 250 if (a[i] != b[i]) return false; |
| 156 } | 251 } |
| 157 | 252 |
| 158 return true; | 253 return true; |
| 159 } | 254 } |
| 160 } | 255 } |
| 161 | 256 |
| 162 /// An "event" that is sent to the [_WatchState] FSM to trigger state | 257 class _WatchState { |
| 163 /// transitions. | 258 static const UNSUBSCRIBED = const _WatchState("unsubscribed"); |
| 164 typedef _WatchState _WatchStateEvent(DirectoryWatcher watcher); | 259 static const SCANNING = const _WatchState("scanning"); |
| 260 static const WATCHING = const _WatchState("watching"); | |
|
nweiz
2013/08/01 22:52:39
Add some docs about what each state means.
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
| 165 | 261 |
| 166 /// The different states that the watcher can be in and the transitions between | 262 final String name; |
| 167 /// them. | |
| 168 /// | |
| 169 /// This class defines a finite state machine for keeping track of what the | |
| 170 /// asynchronous file polling is doing. Each instance of this is a state in the | |
| 171 /// machine and its [listen], [cancel], and [finish] fields define the state | |
| 172 /// transitions when those events occur. | |
| 173 class _WatchState { | |
| 174 /// The watcher has no subscribers. | |
| 175 static final notWatching = new _WatchState( | |
| 176 listen: (watcher) { | |
| 177 watcher._watch(); | |
| 178 return _WatchState.scanning; | |
| 179 }); | |
| 180 | 263 |
| 181 /// The watcher has subscribers and is scanning for pre-existing files. | 264 const _WatchState(this.name); |
| 182 static final scanning = new _WatchState( | |
| 183 cancel: (watcher) { | |
| 184 // No longer watching, so create a new incomplete ready future. | |
| 185 watcher._ready = new Completer(); | |
| 186 return _WatchState.cancelling; | |
| 187 }, finish: (watcher) { | |
| 188 watcher._ready.complete(); | |
| 189 return _WatchState.watching; | |
| 190 }, shouldWatch: true); | |
| 191 | |
| 192 /// The watcher was unsubscribed while polling and we're waiting for the poll | |
| 193 /// to finish. | |
| 194 static final cancelling = new _WatchState( | |
| 195 listen: (_) => _WatchState.scanning, | |
| 196 finish: (_) => _WatchState.notWatching); | |
| 197 | |
| 198 /// The watcher has subscribers, we have scanned for pre-existing files and | |
| 199 /// now we're polling for changes. | |
| 200 static final watching = new _WatchState( | |
| 201 cancel: (watcher) { | |
| 202 // No longer watching, so create a new incomplete ready future. | |
| 203 watcher._ready = new Completer(); | |
| 204 return _WatchState.cancelling; | |
| 205 }, finish: (_) => _WatchState.watching, | |
| 206 shouldWatch: true, shouldNotify: true); | |
| 207 | |
| 208 /// Called when the first subscriber to the watcher has been added. | |
| 209 final _WatchStateEvent listen; | |
| 210 | |
| 211 /// Called when all subscriptions on the watcher have been cancelled. | |
| 212 final _WatchStateEvent cancel; | |
| 213 | |
| 214 /// Called when a poll loop has finished. | |
| 215 final _WatchStateEvent finish; | |
| 216 | |
| 217 /// If the directory watcher should be watching the file system while in | |
| 218 /// this state. | |
| 219 final bool shouldWatch; | |
| 220 | |
| 221 /// If a change event should be sent for a file modification while in this | |
| 222 /// state. | |
| 223 final bool shouldNotify; | |
| 224 | |
| 225 _WatchState({this.listen, this.cancel, this.finish, | |
| 226 this.shouldWatch: false, this.shouldNotify: false}); | |
| 227 } | 265 } |
| 228 | 266 |
| 229 class _FileStatus { | 267 class _FileStatus { |
| 230 /// The last time the file was modified. | 268 /// The last time the file was modified. |
| 231 DateTime modified; | 269 DateTime modified; |
| 232 | 270 |
| 233 /// The SHA-1 hash of the contents of the file. | 271 /// The SHA-1 hash of the contents of the file. |
| 234 List<int> hash; | 272 List<int> hash; |
| 235 | 273 |
| 236 _FileStatus(this.modified, this.hash); | 274 _FileStatus(this.modified, this.hash); |
| 237 } | 275 } |
| OLD | NEW |