Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(339)

Unified Diff: pkg/barback/lib/src/file_pool.dart

Issue 36213002: Only run at most 10 transformers at once in barback. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Bug fixes Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | pkg/barback/lib/src/package_graph.dart » ('j') | pkg/barback/lib/src/pool.dart » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: pkg/barback/lib/src/file_pool.dart
diff --git a/pkg/barback/lib/src/file_pool.dart b/pkg/barback/lib/src/file_pool.dart
index 89e3e80ea24c172159cc1e3f92336acd516986c0..7e95c68e4325c299d36342e0a1c82eedbdd6c790 100644
--- a/pkg/barback/lib/src/file_pool.dart
+++ b/pkg/barback/lib/src/file_pool.dart
@@ -9,8 +9,7 @@ import 'dart:collection';
import 'dart:convert';
import 'dart:io';
-import 'package:stack_trace/stack_trace.dart';
-
+import 'pool.dart';
import 'utils.dart';
/// Manages a pool of files that are opened for reading to cope with maximum
@@ -21,26 +20,12 @@ import 'utils.dart';
/// again. If this doesn't succeed after a certain amount of time, the open
/// will fail and the original "too many files" exception will be thrown.
class FilePool {
- /// [_FileReader]s whose last [listen] call failed and that are waiting for
- /// another file to close so they can be retried.
- final _pendingListens = new Queue<_FileReader>();
-
- /// The timeout timer.
- ///
- /// This timer is set as soon as the file limit is reached and is reset every
- /// time a file finishes being read or a new file is opened. If it fires, that
- /// indicates that the caller became deadlocked, likely due to files waiting
- /// for additional files to be read before they could be closed.
- Timer _timer;
-
- /// The number of files currently open in the pool.
- int _openFiles = 0;
-
- /// The maximum number of file descriptors that the pool will allocate.
+ /// The underlying pool.
///
- /// This is based on empirical tests that indicate that beyond 32, additional
- /// file reads don't provide substantial additional throughput.
- final int _maxOpenFiles = 32;
+ /// The maximum number of allocated descriptors is based on empirical tests
+ /// that indicate that beyond 32, additional file reads don't provide
+ /// substantial additional throughput.
+ final Pool _pool = new Pool(32, timeout: new Duration(seconds: 60));
/// Opens [file] for reading.
///
@@ -48,15 +33,13 @@ class FilePool {
/// open, this will wait for a previously opened file to be closed and then
/// try again.
Stream<List<int>> openRead(File file) {
- var reader = new _FileReader(this, file);
- if (_openFiles < _maxOpenFiles) {
- _openFiles++;
- reader.start();
- } else {
- _pendingListens.add(reader);
- _heartbeat();
- }
- return reader.stream;
+ return futureStream(_pool.checkOut().then((resource) {
+ return file.openRead().transform(new StreamTransformer.fromHandlers(
+ handleDone: (sink) {
+ sink.close();
+ resource.release();
+ }));
+ }));
}
/// Reads [file] as a string using [encoding].
@@ -79,107 +62,4 @@ class FilePool {
return completer.future;
}
-
- /// If there are any file reads that are waiting for available descriptors,
- /// this will allow the oldest one to start reading.
- void _startPendingListen() {
- if (_pendingListens.isEmpty) {
- _openFiles--;
- if (_timer != null) {
- _timer.cancel();
- _timer = null;
- }
- return;
- }
-
- _heartbeat();
- var pending = _pendingListens.removeFirst();
- pending.start();
- }
-
- /// Indicates that some external action has occurred and the timer should be
- /// restarted.
- void _heartbeat() {
- if (_timer != null) _timer.cancel();
- _timer = new Timer(new Duration(seconds: 60), _onTimeout);
- }
-
- /// Handles [_timer] timing out by causing all pending file readers to emit
- /// exceptions.
- void _onTimeout() {
- for (var reader in _pendingListens) {
- reader.timeout();
- }
- _pendingListens.clear();
- _timer = null;
- }
-}
-
-/// Wraps a raw file reading stream in a stream that handles "too many files"
-/// errors.
-///
-/// This also notifies the pool when the underlying file stream is closed so
-/// that it can try to open a waiting file.
-class _FileReader {
- final FilePool _pool;
- final File _file;
-
- /// Whether the caller has paused this reader's stream.
- bool _isPaused = false;
-
- /// The underyling file stream.
- Stream<List<int>> _fileStream;
-
- /// The controller for the stream wrapper.
- StreamController<List<int>> _controller;
-
- /// The current subscription to the underlying file stream.
- ///
- /// This will only be non-null while the wrapped stream is being listened to.
- StreamSubscription _subscription;
-
- /// The wrapped stream that the file can be read from.
- Stream<List<int>> get stream => _controller.stream;
-
- _FileReader(this._pool, this._file) {
- _controller = new StreamController<List<int>>(onPause: () {
- _isPaused = true;
- if (_subscription != null) _subscription.pause();
- }, onResume: () {
- _isPaused = false;
- if (_subscription != null) _subscription.resume();
- }, onCancel: () {
- if (_subscription != null) _subscription.cancel();
- _subscription = null;
- }, sync: true);
- }
-
- /// Starts listening to the underlying file stream.
- void start() {
- _fileStream = _file.openRead();
- _subscription = _fileStream.listen(_controller.add,
- onError: _onError, onDone: _onDone, cancelOnError: true);
- if (_isPaused) _subscription.pause();
- }
-
- /// Emits a timeout exception.
- void timeout() {
- assert(_subscription == null);
- _controller.addError("FilePool deadlock: all file descriptors have been in "
- "use for too long.", new Trace.current().vmTrace);
- _controller.close();
- }
-
- /// Forwards an error from the underlying file stream.
- void _onError(Object exception, StackTrace stackTrace) {
- _controller.addError(exception, stackTrace);
- _onDone();
- }
-
- /// Handles the underlying file stream finishing.
- void _onDone() {
- _subscription = null;
- _controller.close();
- _pool._startPendingListen();
- }
}
« no previous file with comments | « no previous file | pkg/barback/lib/src/package_graph.dart » ('j') | pkg/barback/lib/src/pool.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698