Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(748)

Unified Diff: pkg/watcher/lib/src/directory_watcher/polling.dart

Issue 46843003: Wrap Directory.watch on linux for the watcher package. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: code review Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: pkg/watcher/lib/src/directory_watcher/polling.dart
diff --git a/pkg/watcher/lib/src/directory_watcher.dart b/pkg/watcher/lib/src/directory_watcher/polling.dart
similarity index 55%
copy from pkg/watcher/lib/src/directory_watcher.dart
copy to pkg/watcher/lib/src/directory_watcher/polling.dart
index 679e227e8d181cefb7124a272e416824d5b5bc8b..91ca005f31fb2a9cf33c99f8bb4bc452c5399c58 100644
--- a/pkg/watcher/lib/src/directory_watcher.dart
+++ b/pkg/watcher/lib/src/directory_watcher/polling.dart
@@ -2,51 +2,49 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-library watcher.directory_watcher;
+library watcher.directory_watcher.polling;
import 'dart:async';
import 'dart:io';
import 'package:crypto/crypto.dart';
-import 'async_queue.dart';
-import 'stat.dart';
-import 'utils.dart';
-import 'watch_event.dart';
+import '../async_queue.dart';
+import '../directory_watcher.dart';
+import '../stat.dart';
+import '../utils.dart';
+import '../watch_event.dart';
+import 'resubscribable.dart';
-/// Watches the contents of a directory and emits [WatchEvent]s when something
-/// in the directory has changed.
-class DirectoryWatcher {
- /// The directory whose contents are being monitored.
+/// Periodically polls a directory for changes.
+class PollingDirectoryWatcher extends ResubscribableDirectoryWatcher {
+ /// Creates a new polling watcher monitoring [directory].
+ ///
+ /// If [_pollingDelay] is passed, it specifies the amount of time the watcher
+ /// will pause between successive polls of the directory contents. Making this
+ /// shorter will give more immediate feedback at the expense of doing more IO
+ /// and higher CPU usage. Defaults to one second.
+ PollingDirectoryWatcher(String directory, {Duration pollingDelay})
+ : super(directory, () {
+ return new _PollingDirectoryWatcher(directory,
+ pollingDelay != null ? pollingDelay : new Duration(seconds: 1));
+ });
+}
+
+class _PollingDirectoryWatcher implements ManuallyClosedDirectoryWatcher {
final String directory;
- /// The broadcast [Stream] of events that have occurred to files in
- /// [directory].
- ///
- /// Changes will only be monitored while this stream has subscribers. Any
- /// file changes that occur during periods when there are no subscribers
- /// will not be reported the next time a subscriber is added.
Stream<WatchEvent> get events => _events.stream;
- StreamController<WatchEvent> _events;
+ final _events = new StreamController<WatchEvent>.broadcast();
- _WatchState _state = _WatchState.UNSUBSCRIBED;
+ bool get isReady => _ready.isCompleted;
- /// A [Future] that completes when the watcher is initialized and watching
- /// for file changes.
- ///
- /// If the watcher is not currently monitoring the directory (because there
- /// are no subscribers to [events]), this returns a future that isn't
- /// complete yet. It will complete when a subscriber starts listening and
- /// the watcher finishes any initialization work it needs to do.
- ///
- /// If the watcher is already monitoring, this returns an already complete
- /// future.
Future get ready => _ready.future;
- Completer _ready = new Completer();
+ final _ready = new Completer();
/// The amount of time the watcher pauses between successive polls of the
/// directory contents.
- final Duration pollingDelay;
+ final Duration _pollingDelay;
/// The previous status of the files in the directory.
///
@@ -73,34 +71,15 @@ class DirectoryWatcher {
/// but not in here when a poll completes have been removed.
final _polledFiles = new Set<String>();
- /// Creates a new [DirectoryWatcher] monitoring [directory].
- ///
- /// If [pollingDelay] is passed, it specifies the amount of time the watcher
- /// will pause between successive polls of the directory contents. Making
- /// this shorter will give more immediate feedback at the expense of doing
- /// more IO and higher CPU usage. Defaults to one second.
- DirectoryWatcher(this.directory, {Duration pollingDelay})
- : pollingDelay = pollingDelay != null ? pollingDelay :
- new Duration(seconds: 1) {
- _events = new StreamController<WatchEvent>.broadcast(
- onListen: _watch, onCancel: _cancel);
-
+ _PollingDirectoryWatcher(this.directory, this._pollingDelay) {
_filesToProcess = new AsyncQueue<String>(_processFile,
onError: _events.addError);
- }
- /// Scans to see which files were already present before the watcher was
- /// subscribed to, and then starts watching the directory for changes.
- void _watch() {
- assert(_state == _WatchState.UNSUBSCRIBED);
- _state = _WatchState.SCANNING;
_poll();
}
- /// Stops watching the directory when there are no more subscribers.
- void _cancel() {
- assert(_state != _WatchState.UNSUBSCRIBED);
- _state = _WatchState.UNSUBSCRIBED;
+ void close() {
+ _events.close();
// If we're in the middle of listing the directory, stop.
if (_listSubscription != null) _listSubscription.cancel();
@@ -109,8 +88,6 @@ class DirectoryWatcher {
_filesToProcess.clear();
_polledFiles.clear();
_statuses.clear();
-
- _ready = new Completer();
}
/// Scans the contents of the directory once to see which files have been
@@ -120,7 +97,7 @@ class DirectoryWatcher {
_polledFiles.clear();
endListing() {
- assert(_state != _WatchState.UNSUBSCRIBED);
+ assert(!_events.isClosed);
_listSubscription = null;
// Null tells the queue consumer that we're done listing.
@@ -129,11 +106,11 @@ class DirectoryWatcher {
var stream = new Directory(directory).list(recursive: true);
_listSubscription = stream.listen((entity) {
- assert(_state != _WatchState.UNSUBSCRIBED);
+ assert(!_events.isClosed);
if (entity is! File) return;
_filesToProcess.add(entity.path);
- }, onError: (error, StackTrace stackTrace) {
+ }, onError: (error, stackTrace) {
if (!isDirectoryNotFoundException(error)) {
// It's some unknown error. Pipe it over to the event stream so the
// user can see it.
@@ -150,13 +127,11 @@ class DirectoryWatcher {
/// Processes [file] to determine if it has been modified since the last
/// time it was scanned.
Future _processFile(String file) {
- assert(_state != _WatchState.UNSUBSCRIBED);
-
// `null` is the sentinel which means the directory listing is complete.
if (file == null) return _completePoll();
return getModificationTime(file).then((modified) {
- if (_checkForCancel()) return null;
+ if (_events.isClosed) return null;
var lastStatus = _statuses[file];
@@ -168,14 +143,14 @@ class DirectoryWatcher {
}
return _hashFile(file).then((hash) {
- if (_checkForCancel()) return;
+ if (_events.isClosed) return;
var status = new _FileStatus(modified, hash);
_statuses[file] = status;
_polledFiles.add(file);
- // Only notify while in the watching state.
- if (_state != _WatchState.WATCHING) return;
+ // Only notify if we're ready to emit events.
+ if (!isReady) return;
// And the file is different.
var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash);
@@ -194,34 +169,19 @@ class DirectoryWatcher {
// status for must have been removed.
var removedFiles = _statuses.keys.toSet().difference(_polledFiles);
for (var removed in removedFiles) {
- if (_state == _WatchState.WATCHING) {
- _events.add(new WatchEvent(ChangeType.REMOVE, removed));
- }
+ if (isReady) _events.add(new WatchEvent(ChangeType.REMOVE, removed));
_statuses.remove(removed);
}
- if (_state == _WatchState.SCANNING) {
- _state = _WatchState.WATCHING;
- _ready.complete();
- }
+ if (!isReady) _ready.complete();
// Wait and then poll again.
- return new Future.delayed(pollingDelay).then((_) {
- if (_checkForCancel()) return;
+ return new Future.delayed(_pollingDelay).then((_) {
+ if (_events.isClosed) return;
_poll();
});
}
- /// Returns `true` and clears the processing queue if the watcher has been
- /// unsubscribed.
- bool _checkForCancel() {
- if (_state != _WatchState.UNSUBSCRIBED) return false;
-
- // Don't process any more files.
- _filesToProcess.clear();
- return true;
- }
-
/// Calculates the SHA-1 hash of the file at [path].
Future<List<int>> _hashFile(String path) {
return new File(path).readAsBytes().then((bytes) {
@@ -245,31 +205,6 @@ class DirectoryWatcher {
}
}
-/// Enum class for the states that the [DirectoryWatcher] can be in.
-class _WatchState {
- /// There are no subscribers to the watcher's event stream and no watching
- /// is going on.
- static const UNSUBSCRIBED = const _WatchState("unsubscribed");
-
- /// There are subscribers and the watcher is doing an initial scan of the
- /// directory to see which files were already present before watching started.
- ///
- /// The watcher does not send notifications for changes that occurred while
- /// there were no subscribers, or for files already present before watching.
- /// The initial scan is used to determine what "before watching" state of
- /// the file system was.
- static const SCANNING = const _WatchState("scanning");
-
- /// There are subscribers and the watcher is polling the directory to look
- /// for changes.
- static const WATCHING = const _WatchState("watching");
-
- /// The name of the state.
- final String name;
-
- const _WatchState(this.name);
-}
-
class _FileStatus {
/// The last time the file was modified.
DateTime modified;
@@ -278,4 +213,5 @@ class _FileStatus {
List<int> hash;
_FileStatus(this.modified, this.hash);
-}
+}
+
« no previous file with comments | « pkg/watcher/lib/src/directory_watcher/linux.dart ('k') | pkg/watcher/lib/src/directory_watcher/resubscribable.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698