| Index: pkg/watcher/lib/src/directory_watcher/polling.dart
|
| diff --git a/pkg/watcher/lib/src/directory_watcher.dart b/pkg/watcher/lib/src/directory_watcher/polling.dart
|
| similarity index 55%
|
| copy from pkg/watcher/lib/src/directory_watcher.dart
|
| copy to pkg/watcher/lib/src/directory_watcher/polling.dart
|
| index 679e227e8d181cefb7124a272e416824d5b5bc8b..91ca005f31fb2a9cf33c99f8bb4bc452c5399c58 100644
|
| --- a/pkg/watcher/lib/src/directory_watcher.dart
|
| +++ b/pkg/watcher/lib/src/directory_watcher/polling.dart
|
| @@ -2,51 +2,49 @@
|
| // for details. All rights reserved. Use of this source code is governed by a
|
| // BSD-style license that can be found in the LICENSE file.
|
|
|
| -library watcher.directory_watcher;
|
| +library watcher.directory_watcher.polling;
|
|
|
| import 'dart:async';
|
| import 'dart:io';
|
|
|
| import 'package:crypto/crypto.dart';
|
|
|
| -import 'async_queue.dart';
|
| -import 'stat.dart';
|
| -import 'utils.dart';
|
| -import 'watch_event.dart';
|
| +import '../async_queue.dart';
|
| +import '../directory_watcher.dart';
|
| +import '../stat.dart';
|
| +import '../utils.dart';
|
| +import '../watch_event.dart';
|
| +import 'resubscribable.dart';
|
|
|
| -/// Watches the contents of a directory and emits [WatchEvent]s when something
|
| -/// in the directory has changed.
|
| -class DirectoryWatcher {
|
| - /// The directory whose contents are being monitored.
|
| +/// Periodically polls a directory for changes.
|
| +class PollingDirectoryWatcher extends ResubscribableDirectoryWatcher {
|
| + /// Creates a new polling watcher monitoring [directory].
|
| + ///
|
| + /// If [_pollingDelay] is passed, it specifies the amount of time the watcher
|
| + /// will pause between successive polls of the directory contents. Making this
|
| + /// shorter will give more immediate feedback at the expense of doing more IO
|
| + /// and higher CPU usage. Defaults to one second.
|
| + PollingDirectoryWatcher(String directory, {Duration pollingDelay})
|
| + : super(directory, () {
|
| + return new _PollingDirectoryWatcher(directory,
|
| + pollingDelay != null ? pollingDelay : new Duration(seconds: 1));
|
| + });
|
| +}
|
| +
|
| +class _PollingDirectoryWatcher implements ManuallyClosedDirectoryWatcher {
|
| final String directory;
|
|
|
| - /// The broadcast [Stream] of events that have occurred to files in
|
| - /// [directory].
|
| - ///
|
| - /// Changes will only be monitored while this stream has subscribers. Any
|
| - /// file changes that occur during periods when there are no subscribers
|
| - /// will not be reported the next time a subscriber is added.
|
| Stream<WatchEvent> get events => _events.stream;
|
| - StreamController<WatchEvent> _events;
|
| + final _events = new StreamController<WatchEvent>.broadcast();
|
|
|
| - _WatchState _state = _WatchState.UNSUBSCRIBED;
|
| + bool get isReady => _ready.isCompleted;
|
|
|
| - /// A [Future] that completes when the watcher is initialized and watching
|
| - /// for file changes.
|
| - ///
|
| - /// If the watcher is not currently monitoring the directory (because there
|
| - /// are no subscribers to [events]), this returns a future that isn't
|
| - /// complete yet. It will complete when a subscriber starts listening and
|
| - /// the watcher finishes any initialization work it needs to do.
|
| - ///
|
| - /// If the watcher is already monitoring, this returns an already complete
|
| - /// future.
|
| Future get ready => _ready.future;
|
| - Completer _ready = new Completer();
|
| + final _ready = new Completer();
|
|
|
| /// The amount of time the watcher pauses between successive polls of the
|
| /// directory contents.
|
| - final Duration pollingDelay;
|
| + final Duration _pollingDelay;
|
|
|
| /// The previous status of the files in the directory.
|
| ///
|
| @@ -73,34 +71,15 @@ class DirectoryWatcher {
|
| /// but not in here when a poll completes have been removed.
|
| final _polledFiles = new Set<String>();
|
|
|
| - /// Creates a new [DirectoryWatcher] monitoring [directory].
|
| - ///
|
| - /// If [pollingDelay] is passed, it specifies the amount of time the watcher
|
| - /// will pause between successive polls of the directory contents. Making
|
| - /// this shorter will give more immediate feedback at the expense of doing
|
| - /// more IO and higher CPU usage. Defaults to one second.
|
| - DirectoryWatcher(this.directory, {Duration pollingDelay})
|
| - : pollingDelay = pollingDelay != null ? pollingDelay :
|
| - new Duration(seconds: 1) {
|
| - _events = new StreamController<WatchEvent>.broadcast(
|
| - onListen: _watch, onCancel: _cancel);
|
| -
|
| + _PollingDirectoryWatcher(this.directory, this._pollingDelay) {
|
| _filesToProcess = new AsyncQueue<String>(_processFile,
|
| onError: _events.addError);
|
| - }
|
|
|
| - /// Scans to see which files were already present before the watcher was
|
| - /// subscribed to, and then starts watching the directory for changes.
|
| - void _watch() {
|
| - assert(_state == _WatchState.UNSUBSCRIBED);
|
| - _state = _WatchState.SCANNING;
|
| _poll();
|
| }
|
|
|
| - /// Stops watching the directory when there are no more subscribers.
|
| - void _cancel() {
|
| - assert(_state != _WatchState.UNSUBSCRIBED);
|
| - _state = _WatchState.UNSUBSCRIBED;
|
| + void close() {
|
| + _events.close();
|
|
|
| // If we're in the middle of listing the directory, stop.
|
| if (_listSubscription != null) _listSubscription.cancel();
|
| @@ -109,8 +88,6 @@ class DirectoryWatcher {
|
| _filesToProcess.clear();
|
| _polledFiles.clear();
|
| _statuses.clear();
|
| -
|
| - _ready = new Completer();
|
| }
|
|
|
| /// Scans the contents of the directory once to see which files have been
|
| @@ -120,7 +97,7 @@ class DirectoryWatcher {
|
| _polledFiles.clear();
|
|
|
| endListing() {
|
| - assert(_state != _WatchState.UNSUBSCRIBED);
|
| + assert(!_events.isClosed);
|
| _listSubscription = null;
|
|
|
| // Null tells the queue consumer that we're done listing.
|
| @@ -129,11 +106,11 @@ class DirectoryWatcher {
|
|
|
| var stream = new Directory(directory).list(recursive: true);
|
| _listSubscription = stream.listen((entity) {
|
| - assert(_state != _WatchState.UNSUBSCRIBED);
|
| + assert(!_events.isClosed);
|
|
|
| if (entity is! File) return;
|
| _filesToProcess.add(entity.path);
|
| - }, onError: (error, StackTrace stackTrace) {
|
| + }, onError: (error, stackTrace) {
|
| if (!isDirectoryNotFoundException(error)) {
|
| // It's some unknown error. Pipe it over to the event stream so the
|
| // user can see it.
|
| @@ -150,13 +127,11 @@ class DirectoryWatcher {
|
| /// Processes [file] to determine if it has been modified since the last
|
| /// time it was scanned.
|
| Future _processFile(String file) {
|
| - assert(_state != _WatchState.UNSUBSCRIBED);
|
| -
|
| // `null` is the sentinel which means the directory listing is complete.
|
| if (file == null) return _completePoll();
|
|
|
| return getModificationTime(file).then((modified) {
|
| - if (_checkForCancel()) return null;
|
| + if (_events.isClosed) return null;
|
|
|
| var lastStatus = _statuses[file];
|
|
|
| @@ -168,14 +143,14 @@ class DirectoryWatcher {
|
| }
|
|
|
| return _hashFile(file).then((hash) {
|
| - if (_checkForCancel()) return;
|
| + if (_events.isClosed) return;
|
|
|
| var status = new _FileStatus(modified, hash);
|
| _statuses[file] = status;
|
| _polledFiles.add(file);
|
|
|
| - // Only notify while in the watching state.
|
| - if (_state != _WatchState.WATCHING) return;
|
| + // Only notify if we're ready to emit events.
|
| + if (!isReady) return;
|
|
|
| // And the file is different.
|
| var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash);
|
| @@ -194,34 +169,19 @@ class DirectoryWatcher {
|
| // status for must have been removed.
|
| var removedFiles = _statuses.keys.toSet().difference(_polledFiles);
|
| for (var removed in removedFiles) {
|
| - if (_state == _WatchState.WATCHING) {
|
| - _events.add(new WatchEvent(ChangeType.REMOVE, removed));
|
| - }
|
| + if (isReady) _events.add(new WatchEvent(ChangeType.REMOVE, removed));
|
| _statuses.remove(removed);
|
| }
|
|
|
| - if (_state == _WatchState.SCANNING) {
|
| - _state = _WatchState.WATCHING;
|
| - _ready.complete();
|
| - }
|
| + if (!isReady) _ready.complete();
|
|
|
| // Wait and then poll again.
|
| - return new Future.delayed(pollingDelay).then((_) {
|
| - if (_checkForCancel()) return;
|
| + return new Future.delayed(_pollingDelay).then((_) {
|
| + if (_events.isClosed) return;
|
| _poll();
|
| });
|
| }
|
|
|
| - /// Returns `true` and clears the processing queue if the watcher has been
|
| - /// unsubscribed.
|
| - bool _checkForCancel() {
|
| - if (_state != _WatchState.UNSUBSCRIBED) return false;
|
| -
|
| - // Don't process any more files.
|
| - _filesToProcess.clear();
|
| - return true;
|
| - }
|
| -
|
| /// Calculates the SHA-1 hash of the file at [path].
|
| Future<List<int>> _hashFile(String path) {
|
| return new File(path).readAsBytes().then((bytes) {
|
| @@ -245,31 +205,6 @@ class DirectoryWatcher {
|
| }
|
| }
|
|
|
| -/// Enum class for the states that the [DirectoryWatcher] can be in.
|
| -class _WatchState {
|
| - /// There are no subscribers to the watcher's event stream and no watching
|
| - /// is going on.
|
| - static const UNSUBSCRIBED = const _WatchState("unsubscribed");
|
| -
|
| - /// There are subscribers and the watcher is doing an initial scan of the
|
| - /// directory to see which files were already present before watching started.
|
| - ///
|
| - /// The watcher does not send notifications for changes that occurred while
|
| - /// there were no subscribers, or for files already present before watching.
|
| - /// The initial scan is used to determine what "before watching" state of
|
| - /// the file system was.
|
| - static const SCANNING = const _WatchState("scanning");
|
| -
|
| - /// There are subscribers and the watcher is polling the directory to look
|
| - /// for changes.
|
| - static const WATCHING = const _WatchState("watching");
|
| -
|
| - /// The name of the state.
|
| - final String name;
|
| -
|
| - const _WatchState(this.name);
|
| -}
|
| -
|
| class _FileStatus {
|
| /// The last time the file was modified.
|
| DateTime modified;
|
| @@ -278,4 +213,5 @@ class _FileStatus {
|
| List<int> hash;
|
|
|
| _FileStatus(this.modified, this.hash);
|
| -}
|
| +}
|
| +
|
|
|