Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(581)

Side by Side Diff: pkg/watcher/lib/src/directory_watcher/polling.dart

Issue 46843003: Wrap Directory.watch on linux for the watcher package. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: code review Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library watcher.directory_watcher; 5 library watcher.directory_watcher.polling;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:io'; 8 import 'dart:io';
9 9
10 import 'package:crypto/crypto.dart'; 10 import 'package:crypto/crypto.dart';
11 11
12 import 'async_queue.dart'; 12 import '../async_queue.dart';
13 import 'stat.dart'; 13 import '../directory_watcher.dart';
14 import 'utils.dart'; 14 import '../stat.dart';
15 import 'watch_event.dart'; 15 import '../utils.dart';
16 import '../watch_event.dart';
17 import 'resubscribable.dart';
16 18
17 /// Watches the contents of a directory and emits [WatchEvent]s when something 19 /// Periodically polls a directory for changes.
18 /// in the directory has changed. 20 class PollingDirectoryWatcher extends ResubscribableDirectoryWatcher {
19 class DirectoryWatcher { 21 /// Creates a new polling watcher monitoring [directory].
20 /// The directory whose contents are being monitored. 22 ///
23 /// If [_pollingDelay] is passed, it specifies the amount of time the watcher
24 /// will pause between successive polls of the directory contents. Making this
25 /// shorter will give more immediate feedback at the expense of doing more IO
26 /// and higher CPU usage. Defaults to one second.
27 PollingDirectoryWatcher(String directory, {Duration pollingDelay})
28 : super(directory, () {
29 return new _PollingDirectoryWatcher(directory,
30 pollingDelay != null ? pollingDelay : new Duration(seconds: 1));
31 });
32 }
33
34 class _PollingDirectoryWatcher implements ManuallyClosedDirectoryWatcher {
21 final String directory; 35 final String directory;
22 36
23 /// The broadcast [Stream] of events that have occurred to files in
24 /// [directory].
25 ///
26 /// Changes will only be monitored while this stream has subscribers. Any
27 /// file changes that occur during periods when there are no subscribers
28 /// will not be reported the next time a subscriber is added.
29 Stream<WatchEvent> get events => _events.stream; 37 Stream<WatchEvent> get events => _events.stream;
30 StreamController<WatchEvent> _events; 38 final _events = new StreamController<WatchEvent>.broadcast();
31 39
32 _WatchState _state = _WatchState.UNSUBSCRIBED; 40 bool get isReady => _ready.isCompleted;
33 41
34 /// A [Future] that completes when the watcher is initialized and watching
35 /// for file changes.
36 ///
37 /// If the watcher is not currently monitoring the directory (because there
38 /// are no subscribers to [events]), this returns a future that isn't
39 /// complete yet. It will complete when a subscriber starts listening and
40 /// the watcher finishes any initialization work it needs to do.
41 ///
42 /// If the watcher is already monitoring, this returns an already complete
43 /// future.
44 Future get ready => _ready.future; 42 Future get ready => _ready.future;
45 Completer _ready = new Completer(); 43 final _ready = new Completer();
46 44
47 /// The amount of time the watcher pauses between successive polls of the 45 /// The amount of time the watcher pauses between successive polls of the
48 /// directory contents. 46 /// directory contents.
49 final Duration pollingDelay; 47 final Duration _pollingDelay;
50 48
51 /// The previous status of the files in the directory. 49 /// The previous status of the files in the directory.
52 /// 50 ///
53 /// Used to tell which files have been modified. 51 /// Used to tell which files have been modified.
54 final _statuses = new Map<String, _FileStatus>(); 52 final _statuses = new Map<String, _FileStatus>();
55 53
56 /// The subscription used while [directory] is being listed. 54 /// The subscription used while [directory] is being listed.
57 /// 55 ///
58 /// Will be `null` if a list is not currently happening. 56 /// Will be `null` if a list is not currently happening.
59 StreamSubscription<FileSystemEntity> _listSubscription; 57 StreamSubscription<FileSystemEntity> _listSubscription;
60 58
61 /// The queue of files waiting to be processed to see if they have been 59 /// The queue of files waiting to be processed to see if they have been
62 /// modified. 60 /// modified.
63 /// 61 ///
64 /// Processing a file is asynchronous, as is listing the directory, so the 62 /// Processing a file is asynchronous, as is listing the directory, so the
65 /// queue exists to let each of those proceed at their own rate. The lister 63 /// queue exists to let each of those proceed at their own rate. The lister
66 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued 64 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued
67 /// and processed sequentially. 65 /// and processed sequentially.
68 AsyncQueue<String> _filesToProcess; 66 AsyncQueue<String> _filesToProcess;
69 67
70 /// The set of files that have been seen in the current directory listing. 68 /// The set of files that have been seen in the current directory listing.
71 /// 69 ///
72 /// Used to tell which files have been removed: files that are in [_statuses] 70 /// Used to tell which files have been removed: files that are in [_statuses]
73 /// but not in here when a poll completes have been removed. 71 /// but not in here when a poll completes have been removed.
74 final _polledFiles = new Set<String>(); 72 final _polledFiles = new Set<String>();
75 73
76 /// Creates a new [DirectoryWatcher] monitoring [directory]. 74 _PollingDirectoryWatcher(this.directory, this._pollingDelay) {
77 ///
78 /// If [pollingDelay] is passed, it specifies the amount of time the watcher
79 /// will pause between successive polls of the directory contents. Making
80 /// this shorter will give more immediate feedback at the expense of doing
81 /// more IO and higher CPU usage. Defaults to one second.
82 DirectoryWatcher(this.directory, {Duration pollingDelay})
83 : pollingDelay = pollingDelay != null ? pollingDelay :
84 new Duration(seconds: 1) {
85 _events = new StreamController<WatchEvent>.broadcast(
86 onListen: _watch, onCancel: _cancel);
87
88 _filesToProcess = new AsyncQueue<String>(_processFile, 75 _filesToProcess = new AsyncQueue<String>(_processFile,
89 onError: _events.addError); 76 onError: _events.addError);
90 }
91 77
92 /// Scans to see which files were already present before the watcher was
93 /// subscribed to, and then starts watching the directory for changes.
94 void _watch() {
95 assert(_state == _WatchState.UNSUBSCRIBED);
96 _state = _WatchState.SCANNING;
97 _poll(); 78 _poll();
98 } 79 }
99 80
100 /// Stops watching the directory when there are no more subscribers. 81 void close() {
101 void _cancel() { 82 _events.close();
102 assert(_state != _WatchState.UNSUBSCRIBED);
103 _state = _WatchState.UNSUBSCRIBED;
104 83
105 // If we're in the middle of listing the directory, stop. 84 // If we're in the middle of listing the directory, stop.
106 if (_listSubscription != null) _listSubscription.cancel(); 85 if (_listSubscription != null) _listSubscription.cancel();
107 86
108 // Don't process any remaining files. 87 // Don't process any remaining files.
109 _filesToProcess.clear(); 88 _filesToProcess.clear();
110 _polledFiles.clear(); 89 _polledFiles.clear();
111 _statuses.clear(); 90 _statuses.clear();
112
113 _ready = new Completer();
114 } 91 }
115 92
116 /// Scans the contents of the directory once to see which files have been 93 /// Scans the contents of the directory once to see which files have been
117 /// added, removed, and modified. 94 /// added, removed, and modified.
118 void _poll() { 95 void _poll() {
119 _filesToProcess.clear(); 96 _filesToProcess.clear();
120 _polledFiles.clear(); 97 _polledFiles.clear();
121 98
122 endListing() { 99 endListing() {
123 assert(_state != _WatchState.UNSUBSCRIBED); 100 assert(!_events.isClosed);
124 _listSubscription = null; 101 _listSubscription = null;
125 102
126 // Null tells the queue consumer that we're done listing. 103 // Null tells the queue consumer that we're done listing.
127 _filesToProcess.add(null); 104 _filesToProcess.add(null);
128 } 105 }
129 106
130 var stream = new Directory(directory).list(recursive: true); 107 var stream = new Directory(directory).list(recursive: true);
131 _listSubscription = stream.listen((entity) { 108 _listSubscription = stream.listen((entity) {
132 assert(_state != _WatchState.UNSUBSCRIBED); 109 assert(!_events.isClosed);
133 110
134 if (entity is! File) return; 111 if (entity is! File) return;
135 _filesToProcess.add(entity.path); 112 _filesToProcess.add(entity.path);
136 }, onError: (error, StackTrace stackTrace) { 113 }, onError: (error, stackTrace) {
137 if (!isDirectoryNotFoundException(error)) { 114 if (!isDirectoryNotFoundException(error)) {
138 // It's some unknown error. Pipe it over to the event stream so the 115 // It's some unknown error. Pipe it over to the event stream so the
139 // user can see it. 116 // user can see it.
140 _events.addError(error, stackTrace); 117 _events.addError(error, stackTrace);
141 } 118 }
142 119
143 // When an error occurs, we end the listing normally, which has the 120 // When an error occurs, we end the listing normally, which has the
144 // desired effect of marking all files that were in the directory as 121 // desired effect of marking all files that were in the directory as
145 // being removed. 122 // being removed.
146 endListing(); 123 endListing();
147 }, onDone: endListing, cancelOnError: true); 124 }, onDone: endListing, cancelOnError: true);
148 } 125 }
149 126
150 /// Processes [file] to determine if it has been modified since the last 127 /// Processes [file] to determine if it has been modified since the last
151 /// time it was scanned. 128 /// time it was scanned.
152 Future _processFile(String file) { 129 Future _processFile(String file) {
153 assert(_state != _WatchState.UNSUBSCRIBED);
154
155 // `null` is the sentinel which means the directory listing is complete. 130 // `null` is the sentinel which means the directory listing is complete.
156 if (file == null) return _completePoll(); 131 if (file == null) return _completePoll();
157 132
158 return getModificationTime(file).then((modified) { 133 return getModificationTime(file).then((modified) {
159 if (_checkForCancel()) return null; 134 if (_events.isClosed) return null;
160 135
161 var lastStatus = _statuses[file]; 136 var lastStatus = _statuses[file];
162 137
163 // If its modification time hasn't changed, assume the file is unchanged. 138 // If its modification time hasn't changed, assume the file is unchanged.
164 if (lastStatus != null && lastStatus.modified == modified) { 139 if (lastStatus != null && lastStatus.modified == modified) {
165 // The file is still here. 140 // The file is still here.
166 _polledFiles.add(file); 141 _polledFiles.add(file);
167 return null; 142 return null;
168 } 143 }
169 144
170 return _hashFile(file).then((hash) { 145 return _hashFile(file).then((hash) {
171 if (_checkForCancel()) return; 146 if (_events.isClosed) return;
172 147
173 var status = new _FileStatus(modified, hash); 148 var status = new _FileStatus(modified, hash);
174 _statuses[file] = status; 149 _statuses[file] = status;
175 _polledFiles.add(file); 150 _polledFiles.add(file);
176 151
177 // Only notify while in the watching state. 152 // Only notify if we're ready to emit events.
178 if (_state != _WatchState.WATCHING) return; 153 if (!isReady) return;
179 154
180 // And the file is different. 155 // And the file is different.
181 var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash); 156 var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash);
182 if (!changed) return; 157 if (!changed) return;
183 158
184 var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY; 159 var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY;
185 _events.add(new WatchEvent(type, file)); 160 _events.add(new WatchEvent(type, file));
186 }); 161 });
187 }); 162 });
188 } 163 }
189 164
190 /// After the directory listing is complete, this determines which files were 165 /// After the directory listing is complete, this determines which files were
191 /// removed and then restarts the next poll. 166 /// removed and then restarts the next poll.
192 Future _completePoll() { 167 Future _completePoll() {
193 // Any files that were not seen in the last poll but that we have a 168 // Any files that were not seen in the last poll but that we have a
194 // status for must have been removed. 169 // status for must have been removed.
195 var removedFiles = _statuses.keys.toSet().difference(_polledFiles); 170 var removedFiles = _statuses.keys.toSet().difference(_polledFiles);
196 for (var removed in removedFiles) { 171 for (var removed in removedFiles) {
197 if (_state == _WatchState.WATCHING) { 172 if (isReady) _events.add(new WatchEvent(ChangeType.REMOVE, removed));
198 _events.add(new WatchEvent(ChangeType.REMOVE, removed));
199 }
200 _statuses.remove(removed); 173 _statuses.remove(removed);
201 } 174 }
202 175
203 if (_state == _WatchState.SCANNING) { 176 if (!isReady) _ready.complete();
204 _state = _WatchState.WATCHING;
205 _ready.complete();
206 }
207 177
208 // Wait and then poll again. 178 // Wait and then poll again.
209 return new Future.delayed(pollingDelay).then((_) { 179 return new Future.delayed(_pollingDelay).then((_) {
210 if (_checkForCancel()) return; 180 if (_events.isClosed) return;
211 _poll(); 181 _poll();
212 }); 182 });
213 } 183 }
214 184
215 /// Returns `true` and clears the processing queue if the watcher has been
216 /// unsubscribed.
217 bool _checkForCancel() {
218 if (_state != _WatchState.UNSUBSCRIBED) return false;
219
220 // Don't process any more files.
221 _filesToProcess.clear();
222 return true;
223 }
224
225 /// Calculates the SHA-1 hash of the file at [path]. 185 /// Calculates the SHA-1 hash of the file at [path].
226 Future<List<int>> _hashFile(String path) { 186 Future<List<int>> _hashFile(String path) {
227 return new File(path).readAsBytes().then((bytes) { 187 return new File(path).readAsBytes().then((bytes) {
228 var sha1 = new SHA1(); 188 var sha1 = new SHA1();
229 sha1.add(bytes); 189 sha1.add(bytes);
230 return sha1.close(); 190 return sha1.close();
231 }); 191 });
232 } 192 }
233 193
234 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same 194 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same
235 /// series of byte values. 195 /// series of byte values.
236 bool _sameHash(List<int> a, List<int> b) { 196 bool _sameHash(List<int> a, List<int> b) {
237 // Hashes should always be the same size. 197 // Hashes should always be the same size.
238 assert(a.length == b.length); 198 assert(a.length == b.length);
239 199
240 for (var i = 0; i < a.length; i++) { 200 for (var i = 0; i < a.length; i++) {
241 if (a[i] != b[i]) return false; 201 if (a[i] != b[i]) return false;
242 } 202 }
243 203
244 return true; 204 return true;
245 } 205 }
246 } 206 }
247 207
248 /// Enum class for the states that the [DirectoryWatcher] can be in.
249 class _WatchState {
250 /// There are no subscribers to the watcher's event stream and no watching
251 /// is going on.
252 static const UNSUBSCRIBED = const _WatchState("unsubscribed");
253
254 /// There are subscribers and the watcher is doing an initial scan of the
255 /// directory to see which files were already present before watching started.
256 ///
257 /// The watcher does not send notifications for changes that occurred while
258 /// there were no subscribers, or for files already present before watching.
259 /// The initial scan is used to determine what "before watching" state of
260 /// the file system was.
261 static const SCANNING = const _WatchState("scanning");
262
263 /// There are subscribers and the watcher is polling the directory to look
264 /// for changes.
265 static const WATCHING = const _WatchState("watching");
266
267 /// The name of the state.
268 final String name;
269
270 const _WatchState(this.name);
271 }
272
273 class _FileStatus { 208 class _FileStatus {
274 /// The last time the file was modified. 209 /// The last time the file was modified.
275 DateTime modified; 210 DateTime modified;
276 211
277 /// The SHA-1 hash of the contents of the file. 212 /// The SHA-1 hash of the contents of the file.
278 List<int> hash; 213 List<int> hash;
279 214
280 _FileStatus(this.modified, this.hash); 215 _FileStatus(this.modified, this.hash);
281 } 216 }
217
OLDNEW
« no previous file with comments | « pkg/watcher/lib/src/directory_watcher/linux.dart ('k') | pkg/watcher/lib/src/directory_watcher/resubscribable.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698