Chromium Code Reviews| Index: pkg/barback/lib/src/file_pool.dart |
| diff --git a/pkg/barback/lib/src/file_pool.dart b/pkg/barback/lib/src/file_pool.dart |
| index 06192e766ea4fc122dc6318637fbe0bd091448dd..64aa404e10b3fc8ec04f1147c129a955b7b615a7 100644 |
| --- a/pkg/barback/lib/src/file_pool.dart |
| +++ b/pkg/barback/lib/src/file_pool.dart |
| @@ -9,6 +9,10 @@ import 'dart:collection'; |
| import 'dart:convert'; |
| import 'dart:io'; |
| +import 'package:stack_trace/stack_trace.dart'; |
| + |
| +import 'utils.dart'; |
| + |
| /// Manages a pool of files that are opened for reading to cope with maximum |
| /// file descriptor limits. |
| /// |
| @@ -21,12 +25,38 @@ class FilePool { |
| /// another file to close so they can be retried. |
| final _pendingListens = new Queue<_FileReader>(); |
| + /// The timeout timer. |
| + /// |
| + /// This timer is set as soon as the file limit is reached and is reset every |
| + /// time a file finishes being read or a new file is opened. If it fires, that |
| + /// indicates that the caller became deadlocked, likely due to files waiting |
| + /// for additional files to be read before they could be closed. |
| + Timer _timer; |
| + |
| + /// The number of files currently open in the pool. |
| + int _openFiles = 0; |
| + |
| + /// The maximum number of file descriptors that the pool will allocate. |
| + /// |
| + /// Barback may only use half the available file descriptors. |
| + int get _maxOpenFiles => (maxFileDescriptors / 2).floor(); |
| + |
| /// Opens [file] for reading. |
| /// |
| /// When the returned stream is listened to, if there are too many files |
| /// open, this will wait for a previously opened file to be closed and then |
| /// try again. |
| - Stream<List<int>> openRead(File file) => new _FileReader(this, file).stream; |
| + Stream<List<int>> openRead(File file) { |
| + var reader = new _FileReader(this, file); |
| + if (_openFiles < _maxOpenFiles) { |
| + _openFiles++; |
| + reader.start(); |
| + } else { |
| + _pendingListens.add(reader); |
| + _heartbeat(); |
| + } |
| + return reader.stream; |
| + } |
| /// Reads [file] as a string using [encoding]. |
| /// |
| @@ -49,12 +79,38 @@ class FilePool { |
| return completer.future; |
| } |
| - /// Tries to re-listen to the next pending file reader if there are any. |
| - void _retryPendingListen() { |
| - if (_pendingListens.isEmpty) return; |
| + /// If there are any file reads that are waiting for available descriptors, |
| + /// this will allow the oldest one to start reading. |
| + void _startPendingListen() { |
| + if (_pendingListens.isEmpty) { |
| + _openFiles--; |
| + if (_timer != null) { |
|
Alan Knight
2013/10/21 18:41:33
It seems a bit awkward to have the explicit cancel
nweiz
2013/10/22 18:37:28
We want to avoid restarting the timer here, since
|
| + _timer.cancel(); |
| + _timer = null; |
| + } |
| + return; |
| + } |
| + _heartbeat(); |
| var pending = _pendingListens.removeFirst(); |
| - pending._listen(); |
| + pending.start(); |
| + } |
| + |
| + /// Indicates that some external action has occurred and the timer should be |
| + /// restarted. |
| + void _heartbeat() { |
|
Bob Nystrom
2013/10/26 00:13:34
How about "_restartTimer"? It wasn't obvious to me
nweiz
2013/10/28 23:56:43
Done.
|
| + if (_timer != null) _timer.cancel(); |
| + _timer = new Timer(new Duration(seconds: 60), _onTimeout); |
| + } |
| + |
| + /// Handles [_timer] timing out by causing all pending file readers to emit |
| + /// exceptions. |
| + void _onTimeout() { |
| + for (var reader in _pendingListens) { |
| + reader.timeout(); |
| + } |
| + _pendingListens.clear(); |
| + _timer = null; |
| } |
| } |
| @@ -67,6 +123,9 @@ class _FileReader { |
| final FilePool _pool; |
| final File _file; |
| + /// Whether the caller has paused this reader's stream. |
| + bool _isPaused = false; |
| + |
| /// The underyling file stream. |
| Stream<List<int>> _fileStream; |
| @@ -78,29 +137,16 @@ class _FileReader { |
| /// This will only be non-null while the wrapped stream is being listened to. |
| StreamSubscription _subscription; |
| - /// The timeout timer. |
| - /// |
| - /// If this timer fires before the listen is retried, it gives up and throws |
| - /// the original error. |
| - Timer _timer; |
| - |
| - /// When a [listen] call has thrown a "too many files" error, this will be |
| - /// the exception object. |
| - Object _exception; |
| - |
| - /// When a [listen] call has thrown a "too many files" error, this will be |
| - /// the captured stack trace. |
| - Object _stackTrace; |
| - |
| /// The wrapped stream that the file can be read from. |
| Stream<List<int>> get stream => _controller.stream; |
| _FileReader(this._pool, this._file) { |
| - _controller = new StreamController<List<int>>(onListen: _listen, |
| - onPause: () { |
| - _subscription.pause(); |
| + _controller = new StreamController<List<int>>(onPause: () { |
| + _isPaused = true; |
| + if (_subscription != null) _subscription.pause(); |
| }, onResume: () { |
| - _subscription.resume(); |
| + _isPaused = false; |
| + if (_subscription != null) _subscription.resume(); |
| }, onCancel: () { |
| if (_subscription != null) _subscription.cancel(); |
| _subscription = null; |
| @@ -108,78 +154,31 @@ class _FileReader { |
| } |
| /// Starts listening to the underlying file stream. |
| - void _listen() { |
| - if (_timer != null) { |
| - _timer.cancel(); |
| - _timer = null; |
| - } |
| - |
| - _exception = null; |
| - _stackTrace = null; |
| - |
| + void start() { |
| _fileStream = _file.openRead(); |
| _subscription = _fileStream.listen(_controller.add, |
| onError: _onError, onDone: _onDone, cancelOnError: true); |
| + if (_isPaused) _subscription.pause(); |
| } |
| - /// Handles an error from the underlying file stream. |
| - /// |
| - /// "Too many file" errors are caught so that we can retry later. Other |
| - /// errors are passed to the wrapped stream and the underlying stream |
| - /// subscription is canceled. |
| - void _onError(Object exception, Object stackTrace) { |
| - assert(_subscription != null); |
| - assert(_exception == null); |
| - |
| - // The subscription is canceled after an error. |
| - _subscription = null; |
| - |
| - // We only handle "Too many open files errors". |
| - if (exception is! FileException || exception.osError.errorCode != 24) { |
| - _controller.addError(exception, stackTrace); |
| - return; |
| - } |
| - |
| - _exception = exception; |
| - _stackTrace = stackTrace; |
| - |
| - // We'll try to defer the listen in the hopes that another file will close |
| - // and we can try. If that doesn't happen after a while, give up and just |
| - // throw the original error. |
| - // TODO(rnystrom): The point of this timer is to not get stuck forever in |
| - // a deadlock scenario. But this can also erroneously fire if there is a |
| - // large number of slow reads that do incrementally finish. A file may not |
| - // move to the front of the queue in time even though it is making |
| - // progress. A better solution is to have a single deadlock timer on the |
| - // FilePool itself that starts when a pending listen is enqueued and checks |
| - // to see if progress has been made when it fires. |
| - _timer = new Timer(new Duration(seconds: 60), _onTimeout); |
| + /// Emits a timeout exception. |
| + void timeout() { |
| + assert(_subscription == null); |
| + _controller.addError("FilePool deadlock: all file descriptors have been in " |
| + "use for too long.", new Trace.current().vmTrace); |
|
Bob Nystrom
2013/10/26 00:13:34
This makes it sound like the error is not related
nweiz
2013/10/28 23:56:43
This is less applicable now, since the error comes
|
| + _controller.close(); |
| + } |
| - // Tell the pool that this file is waiting. |
| - _pool._pendingListens.add(this); |
| + /// Forwards an error from the underlying file stream. |
| + void _onError(Object exception, StackTrace stackTrace) { |
| + _controller.addError(exception, stackTrace); |
| + _onDone(); |
| } |
| /// Handles the underlying file stream finishing. |
| void _onDone() { |
| _subscription = null; |
| - |
| - _controller.close(); |
| - _pool._retryPendingListen(); |
| - } |
| - |
| - /// If this file failed to be read because there were too many open files and |
| - /// no file was closed in time to retry, this handles giving up. |
| - void _onTimeout() { |
| - assert(_subscription == null); |
| - assert(_exception != null); |
| - |
| - // We failed to open in time, so just fail with the original error. |
| - _pool._pendingListens.remove(this); |
| - _controller.addError(_exception, _stackTrace); |
| _controller.close(); |
| - |
| - _timer = null; |
| - _exception = null; |
| - _stackTrace = null; |
| + _pool._startPendingListen(); |
| } |
| } |