| Index: pkg/barback/lib/src/file_pool.dart
|
| diff --git a/pkg/barback/lib/src/file_pool.dart b/pkg/barback/lib/src/file_pool.dart
|
| index 89e3e80ea24c172159cc1e3f92336acd516986c0..7e95c68e4325c299d36342e0a1c82eedbdd6c790 100644
|
| --- a/pkg/barback/lib/src/file_pool.dart
|
| +++ b/pkg/barback/lib/src/file_pool.dart
|
| @@ -9,8 +9,7 @@ import 'dart:collection';
|
| import 'dart:convert';
|
| import 'dart:io';
|
|
|
| -import 'package:stack_trace/stack_trace.dart';
|
| -
|
| +import 'pool.dart';
|
| import 'utils.dart';
|
|
|
| /// Manages a pool of files that are opened for reading to cope with maximum
|
| @@ -21,26 +20,12 @@ import 'utils.dart';
|
| /// again. If this doesn't succeed after a certain amount of time, the open
|
| /// will fail and the original "too many files" exception will be thrown.
|
| class FilePool {
|
| - /// [_FileReader]s whose last [listen] call failed and that are waiting for
|
| - /// another file to close so they can be retried.
|
| - final _pendingListens = new Queue<_FileReader>();
|
| -
|
| - /// The timeout timer.
|
| - ///
|
| - /// This timer is set as soon as the file limit is reached and is reset every
|
| - /// time a file finishes being read or a new file is opened. If it fires, that
|
| - /// indicates that the caller became deadlocked, likely due to files waiting
|
| - /// for additional files to be read before they could be closed.
|
| - Timer _timer;
|
| -
|
| - /// The number of files currently open in the pool.
|
| - int _openFiles = 0;
|
| -
|
| - /// The maximum number of file descriptors that the pool will allocate.
|
| + /// The underlying pool.
|
| ///
|
| - /// This is based on empirical tests that indicate that beyond 32, additional
|
| - /// file reads don't provide substantial additional throughput.
|
| - final int _maxOpenFiles = 32;
|
| + /// The maximum number of allocated descriptors is based on empirical tests
|
| + /// that indicate that beyond 32, additional file reads don't provide
|
| + /// substantial additional throughput.
|
| + final Pool _pool = new Pool(32, timeout: new Duration(seconds: 60));
|
|
|
| /// Opens [file] for reading.
|
| ///
|
| @@ -48,15 +33,13 @@ class FilePool {
|
| /// open, this will wait for a previously opened file to be closed and then
|
| /// try again.
|
| Stream<List<int>> openRead(File file) {
|
| - var reader = new _FileReader(this, file);
|
| - if (_openFiles < _maxOpenFiles) {
|
| - _openFiles++;
|
| - reader.start();
|
| - } else {
|
| - _pendingListens.add(reader);
|
| - _heartbeat();
|
| - }
|
| - return reader.stream;
|
| + return futureStream(_pool.checkOut().then((resource) {
|
| + return file.openRead().transform(new StreamTransformer.fromHandlers(
|
| + handleDone: (sink) {
|
| + sink.close();
|
| + resource.release();
|
| + }));
|
| + }));
|
| }
|
|
|
| /// Reads [file] as a string using [encoding].
|
| @@ -79,107 +62,4 @@ class FilePool {
|
|
|
| return completer.future;
|
| }
|
| -
|
| - /// If there are any file reads that are waiting for available descriptors,
|
| - /// this will allow the oldest one to start reading.
|
| - void _startPendingListen() {
|
| - if (_pendingListens.isEmpty) {
|
| - _openFiles--;
|
| - if (_timer != null) {
|
| - _timer.cancel();
|
| - _timer = null;
|
| - }
|
| - return;
|
| - }
|
| -
|
| - _heartbeat();
|
| - var pending = _pendingListens.removeFirst();
|
| - pending.start();
|
| - }
|
| -
|
| - /// Indicates that some external action has occurred and the timer should be
|
| - /// restarted.
|
| - void _heartbeat() {
|
| - if (_timer != null) _timer.cancel();
|
| - _timer = new Timer(new Duration(seconds: 60), _onTimeout);
|
| - }
|
| -
|
| - /// Handles [_timer] timing out by causing all pending file readers to emit
|
| - /// exceptions.
|
| - void _onTimeout() {
|
| - for (var reader in _pendingListens) {
|
| - reader.timeout();
|
| - }
|
| - _pendingListens.clear();
|
| - _timer = null;
|
| - }
|
| -}
|
| -
|
| -/// Wraps a raw file reading stream in a stream that handles "too many files"
|
| -/// errors.
|
| -///
|
| -/// This also notifies the pool when the underlying file stream is closed so
|
| -/// that it can try to open a waiting file.
|
| -class _FileReader {
|
| - final FilePool _pool;
|
| - final File _file;
|
| -
|
| - /// Whether the caller has paused this reader's stream.
|
| - bool _isPaused = false;
|
| -
|
| - /// The underyling file stream.
|
| - Stream<List<int>> _fileStream;
|
| -
|
| - /// The controller for the stream wrapper.
|
| - StreamController<List<int>> _controller;
|
| -
|
| - /// The current subscription to the underlying file stream.
|
| - ///
|
| - /// This will only be non-null while the wrapped stream is being listened to.
|
| - StreamSubscription _subscription;
|
| -
|
| - /// The wrapped stream that the file can be read from.
|
| - Stream<List<int>> get stream => _controller.stream;
|
| -
|
| - _FileReader(this._pool, this._file) {
|
| - _controller = new StreamController<List<int>>(onPause: () {
|
| - _isPaused = true;
|
| - if (_subscription != null) _subscription.pause();
|
| - }, onResume: () {
|
| - _isPaused = false;
|
| - if (_subscription != null) _subscription.resume();
|
| - }, onCancel: () {
|
| - if (_subscription != null) _subscription.cancel();
|
| - _subscription = null;
|
| - }, sync: true);
|
| - }
|
| -
|
| - /// Starts listening to the underlying file stream.
|
| - void start() {
|
| - _fileStream = _file.openRead();
|
| - _subscription = _fileStream.listen(_controller.add,
|
| - onError: _onError, onDone: _onDone, cancelOnError: true);
|
| - if (_isPaused) _subscription.pause();
|
| - }
|
| -
|
| - /// Emits a timeout exception.
|
| - void timeout() {
|
| - assert(_subscription == null);
|
| - _controller.addError("FilePool deadlock: all file descriptors have been in "
|
| - "use for too long.", new Trace.current().vmTrace);
|
| - _controller.close();
|
| - }
|
| -
|
| - /// Forwards an error from the underlying file stream.
|
| - void _onError(Object exception, StackTrace stackTrace) {
|
| - _controller.addError(exception, stackTrace);
|
| - _onDone();
|
| - }
|
| -
|
| - /// Handles the underlying file stream finishing.
|
| - void _onDone() {
|
| - _subscription = null;
|
| - _controller.close();
|
| - _pool._startPendingListen();
|
| - }
|
| }
|
|
|