Index: pkg/barback/lib/src/file_pool.dart |
diff --git a/pkg/barback/lib/src/file_pool.dart b/pkg/barback/lib/src/file_pool.dart |
index 06192e766ea4fc122dc6318637fbe0bd091448dd..64aa404e10b3fc8ec04f1147c129a955b7b615a7 100644 |
--- a/pkg/barback/lib/src/file_pool.dart |
+++ b/pkg/barback/lib/src/file_pool.dart |
@@ -9,6 +9,10 @@ import 'dart:collection'; |
import 'dart:convert'; |
import 'dart:io'; |
+import 'package:stack_trace/stack_trace.dart'; |
+ |
+import 'utils.dart'; |
+ |
/// Manages a pool of files that are opened for reading to cope with maximum |
/// file descriptor limits. |
/// |
@@ -21,12 +25,38 @@ class FilePool { |
/// another file to close so they can be retried. |
final _pendingListens = new Queue<_FileReader>(); |
+ /// The timeout timer. |
+ /// |
+ /// This timer is set as soon as the file limit is reached and is reset every |
+ /// time a file finishes being read or a new file is opened. If it fires, that |
+ /// indicates that the caller became deadlocked, likely due to files waiting |
+ /// for additional files to be read before they could be closed. |
+ Timer _timer; |
+ |
+ /// The number of files currently open in the pool. |
+ int _openFiles = 0; |
+ |
+ /// The maximum number of file descriptors that the pool will allocate. |
+ /// |
+ /// Barback may only use half the available file descriptors. |
+ int get _maxOpenFiles => (maxFileDescriptors / 2).floor(); |
+ |
/// Opens [file] for reading. |
/// |
/// When the returned stream is listened to, if there are too many files |
/// open, this will wait for a previously opened file to be closed and then |
/// try again. |
- Stream<List<int>> openRead(File file) => new _FileReader(this, file).stream; |
+ Stream<List<int>> openRead(File file) { |
+ var reader = new _FileReader(this, file); |
+ if (_openFiles < _maxOpenFiles) { |
+ _openFiles++; |
+ reader.start(); |
+ } else { |
+ _pendingListens.add(reader); |
+ _heartbeat(); |
+ } |
+ return reader.stream; |
+ } |
/// Reads [file] as a string using [encoding]. |
/// |
@@ -49,12 +79,38 @@ class FilePool { |
return completer.future; |
} |
- /// Tries to re-listen to the next pending file reader if there are any. |
- void _retryPendingListen() { |
- if (_pendingListens.isEmpty) return; |
+ /// If there are any file reads that are waiting for available descriptors, |
+ /// this will allow the oldest one to start reading. |
+ void _startPendingListen() { |
+ if (_pendingListens.isEmpty) { |
+ _openFiles--; |
+ if (_timer != null) { |
Alan Knight
2013/10/21 18:41:33
It seems a bit awkward to have the explicit cancel
nweiz
2013/10/22 18:37:28
We want to avoid restarting the timer here, since
|
+ _timer.cancel(); |
+ _timer = null; |
+ } |
+ return; |
+ } |
+ _heartbeat(); |
var pending = _pendingListens.removeFirst(); |
- pending._listen(); |
+ pending.start(); |
+ } |
+ |
+ /// Indicates that some external action has occurred and the timer should be |
+ /// restarted. |
+ void _heartbeat() { |
Bob Nystrom
2013/10/26 00:13:34
How about "_restartTimer"? It wasn't obvious to me
nweiz
2013/10/28 23:56:43
Done.
|
+ if (_timer != null) _timer.cancel(); |
+ _timer = new Timer(new Duration(seconds: 60), _onTimeout); |
+ } |
+ |
+ /// Handles [_timer] timing out by causing all pending file readers to emit |
+ /// exceptions. |
+ void _onTimeout() { |
+ for (var reader in _pendingListens) { |
+ reader.timeout(); |
+ } |
+ _pendingListens.clear(); |
+ _timer = null; |
} |
} |
@@ -67,6 +123,9 @@ class _FileReader { |
final FilePool _pool; |
final File _file; |
+ /// Whether the caller has paused this reader's stream. |
+ bool _isPaused = false; |
+ |
/// The underyling file stream. |
Stream<List<int>> _fileStream; |
@@ -78,29 +137,16 @@ class _FileReader { |
/// This will only be non-null while the wrapped stream is being listened to. |
StreamSubscription _subscription; |
- /// The timeout timer. |
- /// |
- /// If this timer fires before the listen is retried, it gives up and throws |
- /// the original error. |
- Timer _timer; |
- |
- /// When a [listen] call has thrown a "too many files" error, this will be |
- /// the exception object. |
- Object _exception; |
- |
- /// When a [listen] call has thrown a "too many files" error, this will be |
- /// the captured stack trace. |
- Object _stackTrace; |
- |
/// The wrapped stream that the file can be read from. |
Stream<List<int>> get stream => _controller.stream; |
_FileReader(this._pool, this._file) { |
- _controller = new StreamController<List<int>>(onListen: _listen, |
- onPause: () { |
- _subscription.pause(); |
+ _controller = new StreamController<List<int>>(onPause: () { |
+ _isPaused = true; |
+ if (_subscription != null) _subscription.pause(); |
}, onResume: () { |
- _subscription.resume(); |
+ _isPaused = false; |
+ if (_subscription != null) _subscription.resume(); |
}, onCancel: () { |
if (_subscription != null) _subscription.cancel(); |
_subscription = null; |
@@ -108,78 +154,31 @@ class _FileReader { |
} |
/// Starts listening to the underlying file stream. |
- void _listen() { |
- if (_timer != null) { |
- _timer.cancel(); |
- _timer = null; |
- } |
- |
- _exception = null; |
- _stackTrace = null; |
- |
+ void start() { |
_fileStream = _file.openRead(); |
_subscription = _fileStream.listen(_controller.add, |
onError: _onError, onDone: _onDone, cancelOnError: true); |
+ if (_isPaused) _subscription.pause(); |
} |
- /// Handles an error from the underlying file stream. |
- /// |
- /// "Too many file" errors are caught so that we can retry later. Other |
- /// errors are passed to the wrapped stream and the underlying stream |
- /// subscription is canceled. |
- void _onError(Object exception, Object stackTrace) { |
- assert(_subscription != null); |
- assert(_exception == null); |
- |
- // The subscription is canceled after an error. |
- _subscription = null; |
- |
- // We only handle "Too many open files errors". |
- if (exception is! FileException || exception.osError.errorCode != 24) { |
- _controller.addError(exception, stackTrace); |
- return; |
- } |
- |
- _exception = exception; |
- _stackTrace = stackTrace; |
- |
- // We'll try to defer the listen in the hopes that another file will close |
- // and we can try. If that doesn't happen after a while, give up and just |
- // throw the original error. |
- // TODO(rnystrom): The point of this timer is to not get stuck forever in |
- // a deadlock scenario. But this can also erroneously fire if there is a |
- // large number of slow reads that do incrementally finish. A file may not |
- // move to the front of the queue in time even though it is making |
- // progress. A better solution is to have a single deadlock timer on the |
- // FilePool itself that starts when a pending listen is enqueued and checks |
- // to see if progress has been made when it fires. |
- _timer = new Timer(new Duration(seconds: 60), _onTimeout); |
+ /// Emits a timeout exception. |
+ void timeout() { |
+ assert(_subscription == null); |
+ _controller.addError("FilePool deadlock: all file descriptors have been in " |
+ "use for too long.", new Trace.current().vmTrace); |
Bob Nystrom
2013/10/26 00:13:34
This makes it sound like the error is not related
nweiz
2013/10/28 23:56:43
This is less applicable now, since the error comes
|
+ _controller.close(); |
+ } |
- // Tell the pool that this file is waiting. |
- _pool._pendingListens.add(this); |
+ /// Forwards an error from the underlying file stream. |
+ void _onError(Object exception, StackTrace stackTrace) { |
+ _controller.addError(exception, stackTrace); |
+ _onDone(); |
} |
/// Handles the underlying file stream finishing. |
void _onDone() { |
_subscription = null; |
- |
- _controller.close(); |
- _pool._retryPendingListen(); |
- } |
- |
- /// If this file failed to be read because there were too many open files and |
- /// no file was closed in time to retry, this handles giving up. |
- void _onTimeout() { |
- assert(_subscription == null); |
- assert(_exception != null); |
- |
- // We failed to open in time, so just fail with the original error. |
- _pool._pendingListens.remove(this); |
- _controller.addError(_exception, _stackTrace); |
_controller.close(); |
- |
- _timer = null; |
- _exception = null; |
- _stackTrace = null; |
+ _pool._startPendingListen(); |
} |
} |