Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(46)

Side by Side Diff: observatory_pub_packages/watcher/src/directory_watcher/polling.dart

Issue 816693004: Add observatory_pub_packages snapshot to third_party (Closed) Base URL: http://dart.googlecode.com/svn/third_party/
Patch Set: Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library watcher.directory_watcher.polling;
6
7 import 'dart:async';
8 import 'dart:io';
9
10 import 'package:stack_trace/stack_trace.dart';
11
12 import '../async_queue.dart';
13 import '../stat.dart';
14 import '../utils.dart';
15 import '../watch_event.dart';
16 import 'resubscribable.dart';
17
18 /// Periodically polls a directory for changes.
19 class PollingDirectoryWatcher extends ResubscribableDirectoryWatcher {
20 /// Creates a new polling watcher monitoring [directory].
21 ///
22 /// If [_pollingDelay] is passed, it specifies the amount of time the watcher
23 /// will pause between successive polls of the directory contents. Making this
24 /// shorter will give more immediate feedback at the expense of doing more IO
25 /// and higher CPU usage. Defaults to one second.
26 PollingDirectoryWatcher(String directory, {Duration pollingDelay})
27 : super(directory, () {
28 return new _PollingDirectoryWatcher(directory,
29 pollingDelay != null ? pollingDelay : new Duration(seconds: 1));
30 });
31 }
32
33 class _PollingDirectoryWatcher implements ManuallyClosedDirectoryWatcher {
34 final String directory;
35
36 Stream<WatchEvent> get events => _events.stream;
37 final _events = new StreamController<WatchEvent>.broadcast();
38
39 bool get isReady => _ready.isCompleted;
40
41 Future get ready => _ready.future;
42 final _ready = new Completer();
43
44 /// The amount of time the watcher pauses between successive polls of the
45 /// directory contents.
46 final Duration _pollingDelay;
47
48 /// The previous modification times of the files in the directory.
49 ///
50 /// Used to tell which files have been modified.
51 final _lastModifieds = new Map<String, DateTime>();
52
53 /// The subscription used while [directory] is being listed.
54 ///
55 /// Will be `null` if a list is not currently happening.
56 StreamSubscription<FileSystemEntity> _listSubscription;
57
58 /// The queue of files waiting to be processed to see if they have been
59 /// modified.
60 ///
61 /// Processing a file is asynchronous, as is listing the directory, so the
62 /// queue exists to let each of those proceed at their own rate. The lister
63 /// will enqueue files as quickly as it can. Meanwhile, files are dequeued
64 /// and processed sequentially.
65 AsyncQueue<String> _filesToProcess;
66
67 /// The set of files that have been seen in the current directory listing.
68 ///
69 /// Used to tell which files have been removed: files that are in [_statuses]
70 /// but not in here when a poll completes have been removed.
71 final _polledFiles = new Set<String>();
72
73 _PollingDirectoryWatcher(this.directory, this._pollingDelay) {
74 _filesToProcess = new AsyncQueue<String>(_processFile,
75 onError: (e, stackTrace) {
76 if (!_events.isClosed) _events.addError(e, stackTrace);
77 });
78
79 _poll();
80 }
81
82 void close() {
83 _events.close();
84
85 // If we're in the middle of listing the directory, stop.
86 if (_listSubscription != null) _listSubscription.cancel();
87
88 // Don't process any remaining files.
89 _filesToProcess.clear();
90 _polledFiles.clear();
91 _lastModifieds.clear();
92 }
93
94 /// Scans the contents of the directory once to see which files have been
95 /// added, removed, and modified.
96 void _poll() {
97 _filesToProcess.clear();
98 _polledFiles.clear();
99
100 endListing() {
101 assert(!_events.isClosed);
102 _listSubscription = null;
103
104 // Null tells the queue consumer that we're done listing.
105 _filesToProcess.add(null);
106 }
107
108 var stream = Chain.track(new Directory(directory).list(recursive: true));
109 _listSubscription = stream.listen((entity) {
110 assert(!_events.isClosed);
111
112 if (entity is! File) return;
113 _filesToProcess.add(entity.path);
114 }, onError: (error, stackTrace) {
115 if (!isDirectoryNotFoundException(error)) {
116 // It's some unknown error. Pipe it over to the event stream so the
117 // user can see it.
118 _events.addError(error, stackTrace);
119 }
120
121 // When an error occurs, we end the listing normally, which has the
122 // desired effect of marking all files that were in the directory as
123 // being removed.
124 endListing();
125 }, onDone: endListing, cancelOnError: true);
126 }
127
128 /// Processes [file] to determine if it has been modified since the last
129 /// time it was scanned.
130 Future _processFile(String file) {
131 // `null` is the sentinel which means the directory listing is complete.
132 if (file == null) return _completePoll();
133
134 return getModificationTime(file).then((modified) {
135 if (_events.isClosed) return null;
136
137 var lastModified = _lastModifieds[file];
138
139 // If its modification time hasn't changed, assume the file is unchanged.
140 if (lastModified != null && lastModified == modified) {
141 // The file is still here.
142 _polledFiles.add(file);
143 return null;
144 }
145
146 if (_events.isClosed) return null;
147
148 _lastModifieds[file] = modified;
149 _polledFiles.add(file);
150
151 // Only notify if we're ready to emit events.
152 if (!isReady) return null;
153
154 var type = lastModified == null ? ChangeType.ADD : ChangeType.MODIFY;
155 _events.add(new WatchEvent(type, file));
156 });
157 }
158
159 /// After the directory listing is complete, this determines which files were
160 /// removed and then restarts the next poll.
161 Future _completePoll() {
162 // Any files that were not seen in the last poll but that we have a
163 // status for must have been removed.
164 var removedFiles = _lastModifieds.keys.toSet().difference(_polledFiles);
165 for (var removed in removedFiles) {
166 if (isReady) _events.add(new WatchEvent(ChangeType.REMOVE, removed));
167 _lastModifieds.remove(removed);
168 }
169
170 if (!isReady) _ready.complete();
171
172 // Wait and then poll again.
173 return new Future.delayed(_pollingDelay).then((_) {
174 if (_events.isClosed) return;
175 _poll();
176 });
177 }
178 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698