Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(531)

Side by Side Diff: pkg/barback/lib/src/file_pool.dart

Issue 36213002: Only run at most 10 transformers at once in barback. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Bug fixes Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | pkg/barback/lib/src/package_graph.dart » ('j') | pkg/barback/lib/src/pool.dart » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library barback.file_pool; 5 library barback.file_pool;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 import 'dart:convert'; 9 import 'dart:convert';
10 import 'dart:io'; 10 import 'dart:io';
11 11
12 import 'package:stack_trace/stack_trace.dart'; 12 import 'pool.dart';
13
14 import 'utils.dart'; 13 import 'utils.dart';
15 14
16 /// Manages a pool of files that are opened for reading to cope with maximum 15 /// Manages a pool of files that are opened for reading to cope with maximum
17 /// file descriptor limits. 16 /// file descriptor limits.
18 /// 17 ///
19 /// If a file cannot be opened because too many files are already open, this 18 /// If a file cannot be opened because too many files are already open, this
20 /// will defer the open until a previously opened file is closed and then try 19 /// will defer the open until a previously opened file is closed and then try
21 /// again. If this doesn't succeed after a certain amount of time, the open 20 /// again. If this doesn't succeed after a certain amount of time, the open
22 /// will fail and the original "too many files" exception will be thrown. 21 /// will fail and the original "too many files" exception will be thrown.
23 class FilePool { 22 class FilePool {
24 /// [_FileReader]s whose last [listen] call failed and that are waiting for 23 /// The underlying pool.
25 /// another file to close so they can be retried.
26 final _pendingListens = new Queue<_FileReader>();
27
28 /// The timeout timer.
29 /// 24 ///
30 /// This timer is set as soon as the file limit is reached and is reset every 25 /// The maximum number of allocated descriptors is based on empirical tests
31 /// time a file finishes being read or a new file is opened. If it fires, that 26 /// that indicate that beyond 32, additional file reads don't provide
32 /// indicates that the caller became deadlocked, likely due to files waiting 27 /// substantial additional throughput.
33 /// for additional files to be read before they could be closed. 28 final Pool _pool = new Pool(32, timeout: new Duration(seconds: 60));
34 Timer _timer;
35
36 /// The number of files currently open in the pool.
37 int _openFiles = 0;
38
39 /// The maximum number of file descriptors that the pool will allocate.
40 ///
41 /// This is based on empirical tests that indicate that beyond 32, additional
42 /// file reads don't provide substantial additional throughput.
43 final int _maxOpenFiles = 32;
44 29
45 /// Opens [file] for reading. 30 /// Opens [file] for reading.
46 /// 31 ///
47 /// When the returned stream is listened to, if there are too many files 32 /// When the returned stream is listened to, if there are too many files
48 /// open, this will wait for a previously opened file to be closed and then 33 /// open, this will wait for a previously opened file to be closed and then
49 /// try again. 34 /// try again.
50 Stream<List<int>> openRead(File file) { 35 Stream<List<int>> openRead(File file) {
51 var reader = new _FileReader(this, file); 36 return futureStream(_pool.checkOut().then((resource) {
52 if (_openFiles < _maxOpenFiles) { 37 return file.openRead().transform(new StreamTransformer.fromHandlers(
53 _openFiles++; 38 handleDone: (sink) {
54 reader.start(); 39 sink.close();
55 } else { 40 resource.release();
56 _pendingListens.add(reader); 41 }));
57 _heartbeat(); 42 }));
58 }
59 return reader.stream;
60 } 43 }
61 44
62 /// Reads [file] as a string using [encoding]. 45 /// Reads [file] as a string using [encoding].
63 /// 46 ///
64 /// If there are too many files open and the read fails, this will wait for 47 /// If there are too many files open and the read fails, this will wait for
65 /// a previously opened file to be closed and then try again. 48 /// a previously opened file to be closed and then try again.
66 Future<String> readAsString(File file, Encoding encoding) { 49 Future<String> readAsString(File file, Encoding encoding) {
67 return _readAsBytes(file).then(encoding.decode); 50 return _readAsBytes(file).then(encoding.decode);
68 } 51 }
69 52
70 /// Reads [file] as a list of bytes, using [openRead] to retry if there are 53 /// Reads [file] as a list of bytes, using [openRead] to retry if there are
71 /// failures. 54 /// failures.
72 Future<List<int>> _readAsBytes(File file) { 55 Future<List<int>> _readAsBytes(File file) {
73 var completer = new Completer<List<int>>(); 56 var completer = new Completer<List<int>>();
74 var builder = new BytesBuilder(); 57 var builder = new BytesBuilder();
75 58
76 openRead(file).listen(builder.add, onDone: () { 59 openRead(file).listen(builder.add, onDone: () {
77 completer.complete(builder.takeBytes()); 60 completer.complete(builder.takeBytes());
78 }, onError: completer.completeError, cancelOnError: true); 61 }, onError: completer.completeError, cancelOnError: true);
79 62
80 return completer.future; 63 return completer.future;
81 } 64 }
82
83 /// If there are any file reads that are waiting for available descriptors,
84 /// this will allow the oldest one to start reading.
85 void _startPendingListen() {
86 if (_pendingListens.isEmpty) {
87 _openFiles--;
88 if (_timer != null) {
89 _timer.cancel();
90 _timer = null;
91 }
92 return;
93 }
94
95 _heartbeat();
96 var pending = _pendingListens.removeFirst();
97 pending.start();
98 }
99
100 /// Indicates that some external action has occurred and the timer should be
101 /// restarted.
102 void _heartbeat() {
103 if (_timer != null) _timer.cancel();
104 _timer = new Timer(new Duration(seconds: 60), _onTimeout);
105 }
106
107 /// Handles [_timer] timing out by causing all pending file readers to emit
108 /// exceptions.
109 void _onTimeout() {
110 for (var reader in _pendingListens) {
111 reader.timeout();
112 }
113 _pendingListens.clear();
114 _timer = null;
115 }
116 } 65 }
117
118 /// Wraps a raw file reading stream in a stream that handles "too many files"
119 /// errors.
120 ///
121 /// This also notifies the pool when the underlying file stream is closed so
122 /// that it can try to open a waiting file.
123 class _FileReader {
124 final FilePool _pool;
125 final File _file;
126
127 /// Whether the caller has paused this reader's stream.
128 bool _isPaused = false;
129
130 /// The underyling file stream.
131 Stream<List<int>> _fileStream;
132
133 /// The controller for the stream wrapper.
134 StreamController<List<int>> _controller;
135
136 /// The current subscription to the underlying file stream.
137 ///
138 /// This will only be non-null while the wrapped stream is being listened to.
139 StreamSubscription _subscription;
140
141 /// The wrapped stream that the file can be read from.
142 Stream<List<int>> get stream => _controller.stream;
143
144 _FileReader(this._pool, this._file) {
145 _controller = new StreamController<List<int>>(onPause: () {
146 _isPaused = true;
147 if (_subscription != null) _subscription.pause();
148 }, onResume: () {
149 _isPaused = false;
150 if (_subscription != null) _subscription.resume();
151 }, onCancel: () {
152 if (_subscription != null) _subscription.cancel();
153 _subscription = null;
154 }, sync: true);
155 }
156
157 /// Starts listening to the underlying file stream.
158 void start() {
159 _fileStream = _file.openRead();
160 _subscription = _fileStream.listen(_controller.add,
161 onError: _onError, onDone: _onDone, cancelOnError: true);
162 if (_isPaused) _subscription.pause();
163 }
164
165 /// Emits a timeout exception.
166 void timeout() {
167 assert(_subscription == null);
168 _controller.addError("FilePool deadlock: all file descriptors have been in "
169 "use for too long.", new Trace.current().vmTrace);
170 _controller.close();
171 }
172
173 /// Forwards an error from the underlying file stream.
174 void _onError(Object exception, StackTrace stackTrace) {
175 _controller.addError(exception, stackTrace);
176 _onDone();
177 }
178
179 /// Handles the underlying file stream finishing.
180 void _onDone() {
181 _subscription = null;
182 _controller.close();
183 _pool._startPendingListen();
184 }
185 }
OLDNEW
« no previous file with comments | « no previous file | pkg/barback/lib/src/package_graph.dart » ('j') | pkg/barback/lib/src/pool.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698