OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library watcher.directory_watcher; | 5 library watcher.directory_watcher; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | |
8 import 'dart:io'; | 9 import 'dart:io'; |
9 | 10 |
10 import 'package:crypto/crypto.dart'; | 11 import 'package:crypto/crypto.dart'; |
11 | 12 |
12 import 'stat.dart'; | 13 import 'stat.dart'; |
13 import 'watch_event.dart'; | 14 import 'watch_event.dart'; |
14 | 15 |
15 /// Watches the contents of a directory and emits [WatchEvent]s when something | 16 /// Watches the contents of a directory and emits [WatchEvent]s when something |
16 /// in the directory has changed. | 17 /// in the directory has changed. |
17 class DirectoryWatcher { | 18 class DirectoryWatcher { |
18 /// The directory whose contents are being monitored. | 19 /// The directory whose contents are being monitored. |
19 final String directory; | 20 final String directory; |
20 | 21 |
21 /// The broadcast [Stream] of events that have occurred to files in | 22 /// The broadcast [Stream] of events that have occurred to files in |
22 /// [directory]. | 23 /// [directory]. |
23 /// | 24 /// |
24 /// Changes will only be monitored while this stream has subscribers. Any | 25 /// Changes will only be monitored while this stream has subscribers. Any |
25 /// file changes that occur during periods when there are no subscribers | 26 /// file changes that occur during periods when there are no subscribers |
26 /// will not be reported the next time a subscriber is added. | 27 /// will not be reported the next time a subscriber is added. |
27 Stream<WatchEvent> get events => _events.stream; | 28 Stream<WatchEvent> get events => _events.stream; |
28 StreamController<WatchEvent> _events; | 29 StreamController<WatchEvent> _events; |
29 | 30 |
30 _WatchState _state = _WatchState.notWatching; | 31 _WatchState _state = _WatchState.UNSUBSCRIBED; |
31 | 32 |
32 /// A [Future] that completes when the watcher is initialized and watching | 33 /// A [Future] that completes when the watcher is initialized and watching |
33 /// for file changes. | 34 /// for file changes. |
34 /// | 35 /// |
35 /// If the watcher is not currently monitoring the directory (because there | 36 /// If the watcher is not currently monitoring the directory (because there |
36 /// are no subscribers to [events]), this returns a future that isn't | 37 /// are no subscribers to [events]), this returns a future that isn't |
37 /// complete yet. It will complete when a subscriber starts listening and | 38 /// complete yet. It will complete when a subscriber starts listening and |
38 /// the watcher finishes any initialization work it needs to do. | 39 /// the watcher finishes any initialization work it needs to do. |
39 /// | 40 /// |
40 /// If the watcher is already monitoring, this returns an already complete | 41 /// If the watcher is already monitoring, this returns an already complete |
41 /// future. | 42 /// future. |
42 Future get ready => _ready.future; | 43 Future get ready => _ready.future; |
43 Completer _ready = new Completer(); | 44 Completer _ready = new Completer(); |
44 | 45 |
45 /// The amount of time the watcher pauses between successive polls of the | 46 /// The amount of time the watcher pauses between successive polls of the |
46 /// directory contents. | 47 /// directory contents. |
47 final Duration pollingDelay; | 48 final Duration pollingDelay; |
48 | 49 |
49 /// The previous status of the files in the directory. | 50 /// The previous status of the files in the directory. |
50 /// | 51 /// |
51 /// Used to tell which files have been modified. | 52 /// Used to tell which files have been modified. |
52 final _statuses = new Map<String, _FileStatus>(); | 53 final _statuses = new Map<String, _FileStatus>(); |
53 | 54 |
55 /// The subscription used while [directory] is being listed. | |
56 /// | |
57 /// Will be `null` if a list is not currently happening. | |
58 StreamSubscription<FileSystemEntity> _listSubscription; | |
59 | |
60 /// This will be `true` if we are currently asynchronously processing files | |
61 /// from [_filesToCheck]. | |
nweiz
2013/08/01 22:52:39
"_filesToCheck" -> "_filesToProcess"
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
62 bool _isProcessing = false; | |
63 | |
64 /// The queue of files waiting to be processed to see if they have been | |
65 /// modified. | |
66 /// | |
67 /// Processing a file is asynchronous, as is listing the directory, so the | |
68 /// queue exists to let each of those proceed at their own rate. The lister | |
69 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued | |
70 /// and processed sequentially. | |
71 final _filesToProcess = new Queue<String>(); | |
nweiz
2013/08/01 22:52:39
See how this looks with a FutureGroup instead of a
Bob Nystrom
2013/08/02 17:26:29
I started going in that direction but it looks lik
nweiz
2013/08/02 20:07:52
SGTM. Can you abstract out the notion of a sequent
Bob Nystrom
2013/08/02 21:45:11
Done.
| |
72 | |
73 /// The set of files that have been seen in the current directory listing. | |
74 /// | |
75 /// Used to tell which files have been removed: files that are in [_statuses] | |
76 /// but not in here when a poll completes have been removed. | |
77 final _polledFiles = new Set<String>(); | |
78 | |
54 /// Creates a new [DirectoryWatcher] monitoring [directory]. | 79 /// Creates a new [DirectoryWatcher] monitoring [directory]. |
55 /// | 80 /// |
56 /// If [pollingDelay] is passed, it specifies the amount of time the watcher | 81 /// If [pollingDelay] is passed, it specifies the amount of time the watcher |
57 /// will pause between successive polls of the directory contents. Making | 82 /// will pause between successive polls of the directory contents. Making |
58 /// this shorter will give more immediate feedback at the expense of doing | 83 /// this shorter will give more immediate feedback at the expense of doing |
59 /// more IO and higher CPU usage. Defaults to one second. | 84 /// more IO and higher CPU usage. Defaults to one second. |
60 DirectoryWatcher(this.directory, {Duration pollingDelay}) | 85 DirectoryWatcher(this.directory, {Duration pollingDelay}) |
61 : pollingDelay = pollingDelay != null ? pollingDelay : | 86 : pollingDelay = pollingDelay != null ? pollingDelay : |
62 new Duration(seconds: 1) { | 87 new Duration(seconds: 1) { |
63 _events = new StreamController<WatchEvent>.broadcast(onListen: () { | 88 _events = new StreamController<WatchEvent>.broadcast( |
64 _state = _state.listen(this); | 89 onListen: _watch, onCancel: _cancel); |
65 }, onCancel: () { | 90 } |
66 _state = _state.cancel(this); | 91 |
92 /// Scans to see which files were already present before the watcher was | |
93 /// subscribed to, and then starts watching the directory for changes. | |
94 void _watch() { | |
95 assert(_state == _WatchState.UNSUBSCRIBED); | |
96 _state = _WatchState.SCANNING; | |
97 _poll(); | |
98 } | |
99 | |
100 /// Stops watching the directory when there are no more subscribers. | |
101 void _cancel() { | |
102 assert(_state != _WatchState.UNSUBSCRIBED); | |
103 _state = _WatchState.UNSUBSCRIBED; | |
104 | |
105 // If we're in the middle of listing the directory, stop. | |
106 if (_listSubscription != null) _listSubscription.cancel(); | |
107 | |
108 // Don't process any remaining files. | |
109 _filesToProcess.clear(); | |
110 _polledFiles.clear(); | |
nweiz
2013/08/01 22:52:39
Also clear _statuses.
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
111 | |
112 _ready = new Completer(); | |
113 } | |
114 | |
115 /// Scans the contents of the directory once to see which files have been | |
116 /// added, removed, and modified. | |
117 void _poll() { | |
118 _filesToProcess.clear(); | |
119 _polledFiles.clear(); | |
120 | |
121 var stream = new Directory(directory).list(recursive: true); | |
122 _listSubscription = stream.listen((entity) { | |
123 assert(_state != _WatchState.UNSUBSCRIBED); | |
124 | |
125 if (entity is! File) return; | |
126 _enqueueFile(entity.path); | |
127 }, onDone: () { | |
128 assert(_state != _WatchState.UNSUBSCRIBED); | |
129 _listSubscription = null; | |
130 | |
131 // Null tells the queue consumer that we're done listing. | |
132 _enqueueFile(null); | |
67 }); | 133 }); |
68 } | 134 } |
69 | 135 |
70 /// Starts the asynchronous polling process. | 136 /// Add [path] to the queue of files whose status needs to be processed. |
71 /// | 137 void _enqueueFile(String path) { |
72 /// Scans the contents of the directory and compares the results to the | 138 _filesToProcess.add(path); |
73 /// previous scan. Loops to continue monitoring as long as there are | |
74 /// subscribers to the [events] stream. | |
75 Future _watch() { | |
76 var files = new Set<String>(); | |
77 | 139 |
78 var stream = new Directory(directory).list(recursive: true); | 140 // Start up the asynchronous processing if not already running. |
141 if (_isProcessing) return; | |
142 _isProcessing = true; | |
79 | 143 |
80 return stream.map((entity) { | 144 refreshNextFile() { |
nweiz
2013/08/01 22:52:39
It feels like this warrants being its own method.
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
81 if (entity is! File) return new Future.value(); | 145 var file = _filesToProcess.removeFirst(); |
82 files.add(entity.path); | 146 |
83 // TODO(rnystrom): These all run as fast as possible and read the | 147 // `null` is the sentinel which means the directory listing is complete. |
84 // contents of the files. That means there's a pretty big IO hit all at | 148 if (file == null) { |
85 // once. Maybe these should be queued up and rate limited? | 149 _isProcessing = false; |
86 return _refreshFile(entity.path); | 150 |
87 }).toList().then((futures) { | 151 // Any files that were not seen in the last poll but that we have a |
88 // Once the listing is done, make sure to wait until each file is also | 152 // status for must have been removed. |
89 // done. | 153 var removedFiles = _statuses.keys.toSet().difference(_polledFiles); |
90 return Future.wait(futures); | 154 for (var removed in removedFiles) { |
91 }).then((_) { | 155 if (_state == _WatchState.WATCHING) { |
92 var removedFiles = _statuses.keys.toSet().difference(files); | 156 _events.add(new WatchEvent(ChangeType.REMOVE, removed)); |
93 for (var removed in removedFiles) { | 157 } |
94 if (_state.shouldNotify) { | 158 _statuses.remove(removed); |
95 _events.add(new WatchEvent(ChangeType.REMOVE, removed)); | |
96 } | 159 } |
97 _statuses.remove(removed); | 160 |
161 assert(_state != _WatchState.UNSUBSCRIBED); | |
nweiz
2013/08/01 22:52:39
This should be at the top of [refreshNextFile].
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
162 if (_state == _WatchState.SCANNING) { | |
163 _state = _WatchState.WATCHING; | |
164 print("done scanning, watching"); | |
nweiz
2013/08/01 22:52:39
Left over from debugging?
Bob Nystrom
2013/08/02 17:26:29
Yup. Done.
| |
165 _ready.complete(); | |
166 } | |
167 | |
168 // Wait. | |
169 return new Future.delayed(pollingDelay).then((_) { | |
170 // Stop if we unsubscribed while waiting. | |
171 if (_state == _WatchState.UNSUBSCRIBED) return; | |
172 | |
173 // And then poll again. | |
174 _poll(); | |
175 }); | |
98 } | 176 } |
99 | 177 |
100 var previousState = _state; | 178 return _processFile(file).then((_) { |
101 _state = _state.finish(this); | 179 // Stop if we unsubscribed while waiting. |
180 if (_state == _WatchState.UNSUBSCRIBED) return; | |
102 | 181 |
103 // If we were already sending notifications, add a bit of delay before | 182 // If we have drained the queue (i.e. we are processing faster than |
104 // restarting just so that we don't whale on the file system. | 183 // we are listing files), then stop processing and wait until something |
105 // TODO(rnystrom): Tune this and/or make it tunable? | 184 // has been enqueued. |
106 if (_state.shouldNotify) { | 185 if (_filesToProcess.isEmpty) { |
107 return new Future.delayed(pollingDelay); | 186 _isProcessing = false; |
108 } | 187 return; |
109 }).then((_) { | 188 } |
110 // Make sure we haven't transitioned to a non-watching state during the | 189 |
111 // delay. | 190 return refreshNextFile(); |
112 if (_state.shouldWatch) _watch(); | 191 }); |
192 } | |
193 | |
194 // Pipe all errors to the notification stream. | |
195 refreshNextFile().catchError((error) { | |
196 _events.addError(error, getAttachedStackTrace(error)); | |
nweiz
2013/08/01 22:52:39
You don't need to manually forward the stack trace
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
113 }); | 197 }); |
114 } | 198 } |
115 | 199 |
116 /// Compares the current state of the file at [path] to the state it was in | 200 Future _processFile(String path) { |
117 /// the last time it was scanned. | |
118 Future _refreshFile(String path) { | |
119 return getModificationTime(path).then((modified) { | 201 return getModificationTime(path).then((modified) { |
202 // Stop if we unsubscribed while waiting. | |
203 if (_state == _WatchState.UNSUBSCRIBED) return; | |
204 | |
120 var lastStatus = _statuses[path]; | 205 var lastStatus = _statuses[path]; |
121 | 206 |
122 // If it's modification time hasn't changed, assume the file is unchanged. | 207 // If it's modification time hasn't changed, assume the file is unchanged. |
nweiz
2013/08/01 22:52:39
"it's" -> "its"
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
123 if (lastStatus != null && lastStatus.modified == modified) return; | 208 if (lastStatus != null && lastStatus.modified == modified) { |
209 // The file is still here. | |
210 _polledFiles.add(path); | |
211 return; | |
212 } | |
124 | 213 |
125 return _hashFile(path).then((hash) { | 214 return _hashFile(path).then((hash) { |
215 // Stop if we unsubscribed while waiting. | |
216 if (_state == _WatchState.UNSUBSCRIBED) return; | |
217 | |
126 var status = new _FileStatus(modified, hash); | 218 var status = new _FileStatus(modified, hash); |
127 _statuses[path] = status; | 219 _statuses[path] = status; |
220 _polledFiles.add(path); | |
128 | 221 |
129 // Only notify if the file contents changed. | 222 // Only notify while in the watching state. |
130 if (_state.shouldNotify && | 223 if (_state == _WatchState.WATCHING) { |
nweiz
2013/08/01 22:52:39
Short-circuit here and below.
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
131 (lastStatus == null || !_sameHash(lastStatus.hash, hash))) { | 224 var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); |
132 var change = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; | 225 if (changed) { |
133 _events.add(new WatchEvent(change, path)); | 226 var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; |
227 _events.add(new WatchEvent(type, path)); | |
228 } | |
134 } | 229 } |
135 }); | 230 }); |
136 }); | 231 }); |
137 } | 232 } |
138 | 233 |
139 /// Calculates the SHA-1 hash of the file at [path]. | 234 /// Calculates the SHA-1 hash of the file at [path]. |
140 Future<List<int>> _hashFile(String path) { | 235 Future<List<int>> _hashFile(String path) { |
141 return new File(path).readAsBytes().then((bytes) { | 236 return new File(path).readAsBytes().then((bytes) { |
142 var sha1 = new SHA1(); | 237 var sha1 = new SHA1(); |
143 sha1.add(bytes); | 238 sha1.add(bytes); |
144 return sha1.close(); | 239 return sha1.close(); |
145 }); | 240 }); |
146 } | 241 } |
147 | 242 |
148 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same | 243 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same |
149 /// series of byte values. | 244 /// series of byte values. |
150 bool _sameHash(List<int> a, List<int> b) { | 245 bool _sameHash(List<int> a, List<int> b) { |
151 // Hashes should always be the same size. | 246 // Hashes should always be the same size. |
152 assert(a.length == b.length); | 247 assert(a.length == b.length); |
153 | 248 |
154 for (var i = 0; i < a.length; i++) { | 249 for (var i = 0; i < a.length; i++) { |
155 if (a[i] != b[i]) return false; | 250 if (a[i] != b[i]) return false; |
156 } | 251 } |
157 | 252 |
158 return true; | 253 return true; |
159 } | 254 } |
160 } | 255 } |
161 | 256 |
162 /// An "event" that is sent to the [_WatchState] FSM to trigger state | 257 class _WatchState { |
163 /// transitions. | 258 static const UNSUBSCRIBED = const _WatchState("unsubscribed"); |
164 typedef _WatchState _WatchStateEvent(DirectoryWatcher watcher); | 259 static const SCANNING = const _WatchState("scanning"); |
260 static const WATCHING = const _WatchState("watching"); | |
nweiz
2013/08/01 22:52:39
Add some docs about what each state means.
Bob Nystrom
2013/08/02 17:26:29
Done.
| |
165 | 261 |
166 /// The different states that the watcher can be in and the transitions between | 262 final String name; |
167 /// them. | |
168 /// | |
169 /// This class defines a finite state machine for keeping track of what the | |
170 /// asynchronous file polling is doing. Each instance of this is a state in the | |
171 /// machine and its [listen], [cancel], and [finish] fields define the state | |
172 /// transitions when those events occur. | |
173 class _WatchState { | |
174 /// The watcher has no subscribers. | |
175 static final notWatching = new _WatchState( | |
176 listen: (watcher) { | |
177 watcher._watch(); | |
178 return _WatchState.scanning; | |
179 }); | |
180 | 263 |
181 /// The watcher has subscribers and is scanning for pre-existing files. | 264 const _WatchState(this.name); |
182 static final scanning = new _WatchState( | |
183 cancel: (watcher) { | |
184 // No longer watching, so create a new incomplete ready future. | |
185 watcher._ready = new Completer(); | |
186 return _WatchState.cancelling; | |
187 }, finish: (watcher) { | |
188 watcher._ready.complete(); | |
189 return _WatchState.watching; | |
190 }, shouldWatch: true); | |
191 | |
192 /// The watcher was unsubscribed while polling and we're waiting for the poll | |
193 /// to finish. | |
194 static final cancelling = new _WatchState( | |
195 listen: (_) => _WatchState.scanning, | |
196 finish: (_) => _WatchState.notWatching); | |
197 | |
198 /// The watcher has subscribers, we have scanned for pre-existing files and | |
199 /// now we're polling for changes. | |
200 static final watching = new _WatchState( | |
201 cancel: (watcher) { | |
202 // No longer watching, so create a new incomplete ready future. | |
203 watcher._ready = new Completer(); | |
204 return _WatchState.cancelling; | |
205 }, finish: (_) => _WatchState.watching, | |
206 shouldWatch: true, shouldNotify: true); | |
207 | |
208 /// Called when the first subscriber to the watcher has been added. | |
209 final _WatchStateEvent listen; | |
210 | |
211 /// Called when all subscriptions on the watcher have been cancelled. | |
212 final _WatchStateEvent cancel; | |
213 | |
214 /// Called when a poll loop has finished. | |
215 final _WatchStateEvent finish; | |
216 | |
217 /// If the directory watcher should be watching the file system while in | |
218 /// this state. | |
219 final bool shouldWatch; | |
220 | |
221 /// If a change event should be sent for a file modification while in this | |
222 /// state. | |
223 final bool shouldNotify; | |
224 | |
225 _WatchState({this.listen, this.cancel, this.finish, | |
226 this.shouldWatch: false, this.shouldNotify: false}); | |
227 } | 265 } |
228 | 266 |
229 class _FileStatus { | 267 class _FileStatus { |
230 /// The last time the file was modified. | 268 /// The last time the file was modified. |
231 DateTime modified; | 269 DateTime modified; |
232 | 270 |
233 /// The SHA-1 hash of the contents of the file. | 271 /// The SHA-1 hash of the contents of the file. |
234 List<int> hash; | 272 List<int> hash; |
235 | 273 |
236 _FileStatus(this.modified, this.hash); | 274 _FileStatus(this.modified, this.hash); |
237 } | 275 } |
OLD | NEW |