Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(295)

Side by Side Diff: pkg/watcher/lib/src/directory_watcher.dart

Issue 46843003: Wrap Directory.watch on linux for the watcher package. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: code review Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/pkg.status ('k') | pkg/watcher/lib/src/directory_watcher/linux.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library watcher.directory_watcher; 5 library watcher.directory_watcher;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:io'; 8 import 'dart:io';
9 9
10 import 'package:crypto/crypto.dart';
11
12 import 'async_queue.dart';
13 import 'stat.dart';
14 import 'utils.dart';
15 import 'watch_event.dart'; 10 import 'watch_event.dart';
11 import 'directory_watcher/linux.dart';
12 import 'directory_watcher/polling.dart';
16 13
17 /// Watches the contents of a directory and emits [WatchEvent]s when something 14 /// Watches the contents of a directory and emits [WatchEvent]s when something
18 /// in the directory has changed. 15 /// in the directory has changed.
19 class DirectoryWatcher { 16 abstract class DirectoryWatcher {
20 /// The directory whose contents are being monitored. 17 /// The directory whose contents are being monitored.
21 final String directory; 18 String get directory;
22 19
23 /// The broadcast [Stream] of events that have occurred to files in 20 /// The broadcast [Stream] of events that have occurred to files in
24 /// [directory]. 21 /// [directory].
25 /// 22 ///
26 /// Changes will only be monitored while this stream has subscribers. Any 23 /// Changes will only be monitored while this stream has subscribers. Any
27 /// file changes that occur during periods when there are no subscribers 24 /// file changes that occur during periods when there are no subscribers
28 /// will not be reported the next time a subscriber is added. 25 /// will not be reported the next time a subscriber is added.
29 Stream<WatchEvent> get events => _events.stream; 26 Stream<WatchEvent> get events;
30 StreamController<WatchEvent> _events;
31 27
32 _WatchState _state = _WatchState.UNSUBSCRIBED; 28 /// Whether the watcher is initialized and watching for file changes.
29 ///
30 /// This is true if and only if [ready] is complete.
31 bool get isReady;
33 32
34 /// A [Future] that completes when the watcher is initialized and watching 33 /// A [Future] that completes when the watcher is initialized and watching
35 /// for file changes. 34 /// for file changes.
36 /// 35 ///
37 /// If the watcher is not currently monitoring the directory (because there 36 /// If the watcher is not currently monitoring the directory (because there
38 /// are no subscribers to [events]), this returns a future that isn't 37 /// are no subscribers to [events]), this returns a future that isn't
39 /// complete yet. It will complete when a subscriber starts listening and 38 /// complete yet. It will complete when a subscriber starts listening and
40 /// the watcher finishes any initialization work it needs to do. 39 /// the watcher finishes any initialization work it needs to do.
41 /// 40 ///
42 /// If the watcher is already monitoring, this returns an already complete 41 /// If the watcher is already monitoring, this returns an already complete
43 /// future. 42 /// future.
44 Future get ready => _ready.future; 43 Future get ready;
45 Completer _ready = new Completer();
46
47 /// The amount of time the watcher pauses between successive polls of the
48 /// directory contents.
49 final Duration pollingDelay;
50
51 /// The previous status of the files in the directory.
52 ///
53 /// Used to tell which files have been modified.
54 final _statuses = new Map<String, _FileStatus>();
55
56 /// The subscription used while [directory] is being listed.
57 ///
58 /// Will be `null` if a list is not currently happening.
59 StreamSubscription<FileSystemEntity> _listSubscription;
60
61 /// The queue of files waiting to be processed to see if they have been
62 /// modified.
63 ///
64 /// Processing a file is asynchronous, as is listing the directory, so the
65 /// queue exists to let each of those proceed at their own rate. The lister
66 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued
67 /// and processed sequentially.
68 AsyncQueue<String> _filesToProcess;
69
70 /// The set of files that have been seen in the current directory listing.
71 ///
72 /// Used to tell which files have been removed: files that are in [_statuses]
73 /// but not in here when a poll completes have been removed.
74 final _polledFiles = new Set<String>();
75 44
76 /// Creates a new [DirectoryWatcher] monitoring [directory]. 45 /// Creates a new [DirectoryWatcher] monitoring [directory].
77 /// 46 ///
78 /// If [pollingDelay] is passed, it specifies the amount of time the watcher 47 /// If a native directory watcher is available for this platform, this will
79 /// will pause between successive polls of the directory contents. Making 48 /// use it. Otherwise, it will fall back to a [PollingDirectoryWatcher].
80 /// this shorter will give more immediate feedback at the expense of doing 49 ///
81 /// more IO and higher CPU usage. Defaults to one second. 50 /// If [_pollingDelay] is passed, it specifies the amount of time the watcher
82 DirectoryWatcher(this.directory, {Duration pollingDelay}) 51 /// will pause between successive polls of the directory contents. Making this
83 : pollingDelay = pollingDelay != null ? pollingDelay : 52 /// shorter will give more immediate feedback at the expense of doing more IO
84 new Duration(seconds: 1) { 53 /// and higher CPU usage. Defaults to one second. Ignored for non-polling
85 _events = new StreamController<WatchEvent>.broadcast( 54 /// watchers.
86 onListen: _watch, onCancel: _cancel); 55 factory DirectoryWatcher(String directory, {Duration pollingDelay}) {
87 56 if (Platform.isLinux) return new LinuxDirectoryWatcher(directory);
88 _filesToProcess = new AsyncQueue<String>(_processFile, 57 return new PollingDirectoryWatcher(directory, pollingDelay: pollingDelay);
89 onError: _events.addError);
90 }
91
92 /// Scans to see which files were already present before the watcher was
93 /// subscribed to, and then starts watching the directory for changes.
94 void _watch() {
95 assert(_state == _WatchState.UNSUBSCRIBED);
96 _state = _WatchState.SCANNING;
97 _poll();
98 }
99
100 /// Stops watching the directory when there are no more subscribers.
101 void _cancel() {
102 assert(_state != _WatchState.UNSUBSCRIBED);
103 _state = _WatchState.UNSUBSCRIBED;
104
105 // If we're in the middle of listing the directory, stop.
106 if (_listSubscription != null) _listSubscription.cancel();
107
108 // Don't process any remaining files.
109 _filesToProcess.clear();
110 _polledFiles.clear();
111 _statuses.clear();
112
113 _ready = new Completer();
114 }
115
116 /// Scans the contents of the directory once to see which files have been
117 /// added, removed, and modified.
118 void _poll() {
119 _filesToProcess.clear();
120 _polledFiles.clear();
121
122 endListing() {
123 assert(_state != _WatchState.UNSUBSCRIBED);
124 _listSubscription = null;
125
126 // Null tells the queue consumer that we're done listing.
127 _filesToProcess.add(null);
128 }
129
130 var stream = new Directory(directory).list(recursive: true);
131 _listSubscription = stream.listen((entity) {
132 assert(_state != _WatchState.UNSUBSCRIBED);
133
134 if (entity is! File) return;
135 _filesToProcess.add(entity.path);
136 }, onError: (error, StackTrace stackTrace) {
137 if (!isDirectoryNotFoundException(error)) {
138 // It's some unknown error. Pipe it over to the event stream so the
139 // user can see it.
140 _events.addError(error, stackTrace);
141 }
142
143 // When an error occurs, we end the listing normally, which has the
144 // desired effect of marking all files that were in the directory as
145 // being removed.
146 endListing();
147 }, onDone: endListing, cancelOnError: true);
148 }
149
150 /// Processes [file] to determine if it has been modified since the last
151 /// time it was scanned.
152 Future _processFile(String file) {
153 assert(_state != _WatchState.UNSUBSCRIBED);
154
155 // `null` is the sentinel which means the directory listing is complete.
156 if (file == null) return _completePoll();
157
158 return getModificationTime(file).then((modified) {
159 if (_checkForCancel()) return null;
160
161 var lastStatus = _statuses[file];
162
163 // If its modification time hasn't changed, assume the file is unchanged.
164 if (lastStatus != null && lastStatus.modified == modified) {
165 // The file is still here.
166 _polledFiles.add(file);
167 return null;
168 }
169
170 return _hashFile(file).then((hash) {
171 if (_checkForCancel()) return;
172
173 var status = new _FileStatus(modified, hash);
174 _statuses[file] = status;
175 _polledFiles.add(file);
176
177 // Only notify while in the watching state.
178 if (_state != _WatchState.WATCHING) return;
179
180 // And the file is different.
181 var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash);
182 if (!changed) return;
183
184 var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY;
185 _events.add(new WatchEvent(type, file));
186 });
187 });
188 }
189
190 /// After the directory listing is complete, this determines which files were
191 /// removed and then restarts the next poll.
192 Future _completePoll() {
193 // Any files that were not seen in the last poll but that we have a
194 // status for must have been removed.
195 var removedFiles = _statuses.keys.toSet().difference(_polledFiles);
196 for (var removed in removedFiles) {
197 if (_state == _WatchState.WATCHING) {
198 _events.add(new WatchEvent(ChangeType.REMOVE, removed));
199 }
200 _statuses.remove(removed);
201 }
202
203 if (_state == _WatchState.SCANNING) {
204 _state = _WatchState.WATCHING;
205 _ready.complete();
206 }
207
208 // Wait and then poll again.
209 return new Future.delayed(pollingDelay).then((_) {
210 if (_checkForCancel()) return;
211 _poll();
212 });
213 }
214
215 /// Returns `true` and clears the processing queue if the watcher has been
216 /// unsubscribed.
217 bool _checkForCancel() {
218 if (_state != _WatchState.UNSUBSCRIBED) return false;
219
220 // Don't process any more files.
221 _filesToProcess.clear();
222 return true;
223 }
224
225 /// Calculates the SHA-1 hash of the file at [path].
226 Future<List<int>> _hashFile(String path) {
227 return new File(path).readAsBytes().then((bytes) {
228 var sha1 = new SHA1();
229 sha1.add(bytes);
230 return sha1.close();
231 });
232 }
233
234 /// Returns `true` if [a] and [b] are the same hash value, i.e. the same
235 /// series of byte values.
236 bool _sameHash(List<int> a, List<int> b) {
237 // Hashes should always be the same size.
238 assert(a.length == b.length);
239
240 for (var i = 0; i < a.length; i++) {
241 if (a[i] != b[i]) return false;
242 }
243
244 return true;
245 } 58 }
246 } 59 }
247
248 /// Enum class for the states that the [DirectoryWatcher] can be in.
249 class _WatchState {
250 /// There are no subscribers to the watcher's event stream and no watching
251 /// is going on.
252 static const UNSUBSCRIBED = const _WatchState("unsubscribed");
253
254 /// There are subscribers and the watcher is doing an initial scan of the
255 /// directory to see which files were already present before watching started.
256 ///
257 /// The watcher does not send notifications for changes that occurred while
258 /// there were no subscribers, or for files already present before watching.
259 /// The initial scan is used to determine what "before watching" state of
260 /// the file system was.
261 static const SCANNING = const _WatchState("scanning");
262
263 /// There are subscribers and the watcher is polling the directory to look
264 /// for changes.
265 static const WATCHING = const _WatchState("watching");
266
267 /// The name of the state.
268 final String name;
269
270 const _WatchState(this.name);
271 }
272
273 class _FileStatus {
274 /// The last time the file was modified.
275 DateTime modified;
276
277 /// The SHA-1 hash of the contents of the file.
278 List<int> hash;
279
280 _FileStatus(this.modified, this.hash);
281 }
OLDNEW
« no previous file with comments | « pkg/pkg.status ('k') | pkg/watcher/lib/src/directory_watcher/linux.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698