Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(65)

Unified Diff: pkg/barback/lib/src/file_pool.dart

Issue 28733009: Make sure barback's FilePool doesn't take up *all* the available FDs. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | pkg/barback/lib/src/utils.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: pkg/barback/lib/src/file_pool.dart
diff --git a/pkg/barback/lib/src/file_pool.dart b/pkg/barback/lib/src/file_pool.dart
index 06192e766ea4fc122dc6318637fbe0bd091448dd..64aa404e10b3fc8ec04f1147c129a955b7b615a7 100644
--- a/pkg/barback/lib/src/file_pool.dart
+++ b/pkg/barback/lib/src/file_pool.dart
@@ -9,6 +9,10 @@ import 'dart:collection';
import 'dart:convert';
import 'dart:io';
+import 'package:stack_trace/stack_trace.dart';
+
+import 'utils.dart';
+
/// Manages a pool of files that are opened for reading to cope with maximum
/// file descriptor limits.
///
@@ -21,12 +25,38 @@ class FilePool {
/// another file to close so they can be retried.
final _pendingListens = new Queue<_FileReader>();
+ /// The timeout timer.
+ ///
+ /// This timer is set as soon as the file limit is reached and is reset every
+ /// time a file finishes being read or a new file is opened. If it fires, that
+ /// indicates that the caller became deadlocked, likely due to files waiting
+ /// for additional files to be read before they could be closed.
+ Timer _timer;
+
+ /// The number of files currently open in the pool.
+ int _openFiles = 0;
+
+ /// The maximum number of file descriptors that the pool will allocate.
+ ///
+ /// Barback may only use half the available file descriptors.
+ int get _maxOpenFiles => (maxFileDescriptors / 2).floor();
+
/// Opens [file] for reading.
///
/// When the returned stream is listened to, if there are too many files
/// open, this will wait for a previously opened file to be closed and then
/// try again.
- Stream<List<int>> openRead(File file) => new _FileReader(this, file).stream;
+ Stream<List<int>> openRead(File file) {
+ var reader = new _FileReader(this, file);
+ if (_openFiles < _maxOpenFiles) {
+ _openFiles++;
+ reader.start();
+ } else {
+ _pendingListens.add(reader);
+ _heartbeat();
+ }
+ return reader.stream;
+ }
/// Reads [file] as a string using [encoding].
///
@@ -49,12 +79,38 @@ class FilePool {
return completer.future;
}
- /// Tries to re-listen to the next pending file reader if there are any.
- void _retryPendingListen() {
- if (_pendingListens.isEmpty) return;
+ /// If there are any file reads that are waiting for available descriptors,
+ /// this will allow the oldest one to start reading.
+ void _startPendingListen() {
+ if (_pendingListens.isEmpty) {
+ _openFiles--;
+ if (_timer != null) {
Alan Knight 2013/10/21 18:41:33 It seems a bit awkward to have the explicit cancel
nweiz 2013/10/22 18:37:28 We want to avoid restarting the timer here, since
+ _timer.cancel();
+ _timer = null;
+ }
+ return;
+ }
+ _heartbeat();
var pending = _pendingListens.removeFirst();
- pending._listen();
+ pending.start();
+ }
+
+ /// Indicates that some external action has occurred and the timer should be
+ /// restarted.
+ void _heartbeat() {
Bob Nystrom 2013/10/26 00:13:34 How about "_restartTimer"? It wasn't obvious to me
nweiz 2013/10/28 23:56:43 Done.
+ if (_timer != null) _timer.cancel();
+ _timer = new Timer(new Duration(seconds: 60), _onTimeout);
+ }
+
+ /// Handles [_timer] timing out by causing all pending file readers to emit
+ /// exceptions.
+ void _onTimeout() {
+ for (var reader in _pendingListens) {
+ reader.timeout();
+ }
+ _pendingListens.clear();
+ _timer = null;
}
}
@@ -67,6 +123,9 @@ class _FileReader {
final FilePool _pool;
final File _file;
+ /// Whether the caller has paused this reader's stream.
+ bool _isPaused = false;
+
/// The underyling file stream.
Stream<List<int>> _fileStream;
@@ -78,29 +137,16 @@ class _FileReader {
/// This will only be non-null while the wrapped stream is being listened to.
StreamSubscription _subscription;
- /// The timeout timer.
- ///
- /// If this timer fires before the listen is retried, it gives up and throws
- /// the original error.
- Timer _timer;
-
- /// When a [listen] call has thrown a "too many files" error, this will be
- /// the exception object.
- Object _exception;
-
- /// When a [listen] call has thrown a "too many files" error, this will be
- /// the captured stack trace.
- Object _stackTrace;
-
/// The wrapped stream that the file can be read from.
Stream<List<int>> get stream => _controller.stream;
_FileReader(this._pool, this._file) {
- _controller = new StreamController<List<int>>(onListen: _listen,
- onPause: () {
- _subscription.pause();
+ _controller = new StreamController<List<int>>(onPause: () {
+ _isPaused = true;
+ if (_subscription != null) _subscription.pause();
}, onResume: () {
- _subscription.resume();
+ _isPaused = false;
+ if (_subscription != null) _subscription.resume();
}, onCancel: () {
if (_subscription != null) _subscription.cancel();
_subscription = null;
@@ -108,78 +154,31 @@ class _FileReader {
}
/// Starts listening to the underlying file stream.
- void _listen() {
- if (_timer != null) {
- _timer.cancel();
- _timer = null;
- }
-
- _exception = null;
- _stackTrace = null;
-
+ void start() {
_fileStream = _file.openRead();
_subscription = _fileStream.listen(_controller.add,
onError: _onError, onDone: _onDone, cancelOnError: true);
+ if (_isPaused) _subscription.pause();
}
- /// Handles an error from the underlying file stream.
- ///
- /// "Too many file" errors are caught so that we can retry later. Other
- /// errors are passed to the wrapped stream and the underlying stream
- /// subscription is canceled.
- void _onError(Object exception, Object stackTrace) {
- assert(_subscription != null);
- assert(_exception == null);
-
- // The subscription is canceled after an error.
- _subscription = null;
-
- // We only handle "Too many open files errors".
- if (exception is! FileException || exception.osError.errorCode != 24) {
- _controller.addError(exception, stackTrace);
- return;
- }
-
- _exception = exception;
- _stackTrace = stackTrace;
-
- // We'll try to defer the listen in the hopes that another file will close
- // and we can try. If that doesn't happen after a while, give up and just
- // throw the original error.
- // TODO(rnystrom): The point of this timer is to not get stuck forever in
- // a deadlock scenario. But this can also erroneously fire if there is a
- // large number of slow reads that do incrementally finish. A file may not
- // move to the front of the queue in time even though it is making
- // progress. A better solution is to have a single deadlock timer on the
- // FilePool itself that starts when a pending listen is enqueued and checks
- // to see if progress has been made when it fires.
- _timer = new Timer(new Duration(seconds: 60), _onTimeout);
+ /// Emits a timeout exception.
+ void timeout() {
+ assert(_subscription == null);
+ _controller.addError("FilePool deadlock: all file descriptors have been in "
+ "use for too long.", new Trace.current().vmTrace);
Bob Nystrom 2013/10/26 00:13:34 This makes it sound like the error is not related
nweiz 2013/10/28 23:56:43 This is less applicable now, since the error comes
+ _controller.close();
+ }
- // Tell the pool that this file is waiting.
- _pool._pendingListens.add(this);
+ /// Forwards an error from the underlying file stream.
+ void _onError(Object exception, StackTrace stackTrace) {
+ _controller.addError(exception, stackTrace);
+ _onDone();
}
/// Handles the underlying file stream finishing.
void _onDone() {
_subscription = null;
-
- _controller.close();
- _pool._retryPendingListen();
- }
-
- /// If this file failed to be read because there were too many open files and
- /// no file was closed in time to retry, this handles giving up.
- void _onTimeout() {
- assert(_subscription == null);
- assert(_exception != null);
-
- // We failed to open in time, so just fail with the original error.
- _pool._pendingListens.remove(this);
- _controller.addError(_exception, _stackTrace);
_controller.close();
-
- _timer = null;
- _exception = null;
- _stackTrace = null;
+ _pool._startPendingListen();
}
}
« no previous file with comments | « no previous file | pkg/barback/lib/src/utils.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698