Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(839)

Side by Side Diff: pkg/barback/lib/src/file_pool.dart

Issue 28733009: Make sure barback's FilePool doesn't take up *all* the available FDs. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | pkg/barback/lib/src/utils.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library barback.file_pool; 5 library barback.file_pool;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 import 'dart:convert'; 9 import 'dart:convert';
10 import 'dart:io'; 10 import 'dart:io';
11 11
12 import 'package:stack_trace/stack_trace.dart';
13
14 import 'utils.dart';
15
12 /// Manages a pool of files that are opened for reading to cope with maximum 16 /// Manages a pool of files that are opened for reading to cope with maximum
13 /// file descriptor limits. 17 /// file descriptor limits.
14 /// 18 ///
15 /// If a file cannot be opened because too many files are already open, this 19 /// If a file cannot be opened because too many files are already open, this
16 /// will defer the open until a previously opened file is closed and then try 20 /// will defer the open until a previously opened file is closed and then try
17 /// again. If this doesn't succeed after a certain amount of time, the open 21 /// again. If this doesn't succeed after a certain amount of time, the open
18 /// will fail and the original "too many files" exception will be thrown. 22 /// will fail and the original "too many files" exception will be thrown.
19 class FilePool { 23 class FilePool {
20 /// [_FileReader]s whose last [listen] call failed and that are waiting for 24 /// [_FileReader]s whose last [listen] call failed and that are waiting for
21 /// another file to close so they can be retried. 25 /// another file to close so they can be retried.
22 final _pendingListens = new Queue<_FileReader>(); 26 final _pendingListens = new Queue<_FileReader>();
23 27
28 /// The timeout timer.
29 ///
30 /// This timer is set as soon as the file limit is reached and is reset every
31 /// time a file finishes being read or a new file is opened. If it fires, that
32 /// indicates that the caller became deadlocked, likely due to files waiting
33 /// for additional files to be read before they could be closed.
34 Timer _timer;
35
36 /// The number of files currently open in the pool.
37 int _openFiles = 0;
38
39 /// The maximum number of file descriptors that the pool will allocate.
40 ///
41 /// Barback may only use half the available file descriptors.
42 int get _maxOpenFiles => (maxFileDescriptors / 2).floor();
43
24 /// Opens [file] for reading. 44 /// Opens [file] for reading.
25 /// 45 ///
26 /// When the returned stream is listened to, if there are too many files 46 /// When the returned stream is listened to, if there are too many files
27 /// open, this will wait for a previously opened file to be closed and then 47 /// open, this will wait for a previously opened file to be closed and then
28 /// try again. 48 /// try again.
29 Stream<List<int>> openRead(File file) => new _FileReader(this, file).stream; 49 Stream<List<int>> openRead(File file) {
50 var reader = new _FileReader(this, file);
51 if (_openFiles < _maxOpenFiles) {
52 _openFiles++;
53 reader.start();
54 } else {
55 _pendingListens.add(reader);
56 _heartbeat();
57 }
58 return reader.stream;
59 }
30 60
31 /// Reads [file] as a string using [encoding]. 61 /// Reads [file] as a string using [encoding].
32 /// 62 ///
33 /// If there are too many files open and the read fails, this will wait for 63 /// If there are too many files open and the read fails, this will wait for
34 /// a previously opened file to be closed and then try again. 64 /// a previously opened file to be closed and then try again.
35 Future<String> readAsString(File file, Encoding encoding) { 65 Future<String> readAsString(File file, Encoding encoding) {
36 return _readAsBytes(file).then(encoding.decode); 66 return _readAsBytes(file).then(encoding.decode);
37 } 67 }
38 68
39 /// Reads [file] as a list of bytes, using [openRead] to retry if there are 69 /// Reads [file] as a list of bytes, using [openRead] to retry if there are
40 /// failures. 70 /// failures.
41 Future<List<int>> _readAsBytes(File file) { 71 Future<List<int>> _readAsBytes(File file) {
42 var completer = new Completer<List<int>>(); 72 var completer = new Completer<List<int>>();
43 var builder = new BytesBuilder(); 73 var builder = new BytesBuilder();
44 74
45 openRead(file).listen(builder.add, onDone: () { 75 openRead(file).listen(builder.add, onDone: () {
46 completer.complete(builder.takeBytes()); 76 completer.complete(builder.takeBytes());
47 }, onError: completer.completeError, cancelOnError: true); 77 }, onError: completer.completeError, cancelOnError: true);
48 78
49 return completer.future; 79 return completer.future;
50 } 80 }
51 81
52 /// Tries to re-listen to the next pending file reader if there are any. 82 /// If there are any file reads that are waiting for available descriptors,
53 void _retryPendingListen() { 83 /// this will allow the oldest one to start reading.
54 if (_pendingListens.isEmpty) return; 84 void _startPendingListen() {
85 if (_pendingListens.isEmpty) {
86 _openFiles--;
87 if (_timer != null) {
Alan Knight 2013/10/21 18:41:33 It seems a bit awkward to have the explicit cancel
nweiz 2013/10/22 18:37:28 We want to avoid restarting the timer here, since
88 _timer.cancel();
89 _timer = null;
90 }
91 return;
92 }
55 93
94 _heartbeat();
56 var pending = _pendingListens.removeFirst(); 95 var pending = _pendingListens.removeFirst();
57 pending._listen(); 96 pending.start();
97 }
98
99 /// Indicates that some external action has occurred and the timer should be
100 /// restarted.
101 void _heartbeat() {
Bob Nystrom 2013/10/26 00:13:34 How about "_restartTimer"? It wasn't obvious to me
nweiz 2013/10/28 23:56:43 Done.
102 if (_timer != null) _timer.cancel();
103 _timer = new Timer(new Duration(seconds: 60), _onTimeout);
104 }
105
106 /// Handles [_timer] timing out by causing all pending file readers to emit
107 /// exceptions.
108 void _onTimeout() {
109 for (var reader in _pendingListens) {
110 reader.timeout();
111 }
112 _pendingListens.clear();
113 _timer = null;
58 } 114 }
59 } 115 }
60 116
61 /// Wraps a raw file reading stream in a stream that handles "too many files" 117 /// Wraps a raw file reading stream in a stream that handles "too many files"
62 /// errors. 118 /// errors.
63 /// 119 ///
64 /// This also notifies the pool when the underlying file stream is closed so 120 /// This also notifies the pool when the underlying file stream is closed so
65 /// that it can try to open a waiting file. 121 /// that it can try to open a waiting file.
66 class _FileReader { 122 class _FileReader {
67 final FilePool _pool; 123 final FilePool _pool;
68 final File _file; 124 final File _file;
69 125
126 /// Whether the caller has paused this reader's stream.
127 bool _isPaused = false;
128
70 /// The underyling file stream. 129 /// The underyling file stream.
71 Stream<List<int>> _fileStream; 130 Stream<List<int>> _fileStream;
72 131
73 /// The controller for the stream wrapper. 132 /// The controller for the stream wrapper.
74 StreamController<List<int>> _controller; 133 StreamController<List<int>> _controller;
75 134
76 /// The current subscription to the underlying file stream. 135 /// The current subscription to the underlying file stream.
77 /// 136 ///
78 /// This will only be non-null while the wrapped stream is being listened to. 137 /// This will only be non-null while the wrapped stream is being listened to.
79 StreamSubscription _subscription; 138 StreamSubscription _subscription;
80 139
81 /// The timeout timer.
82 ///
83 /// If this timer fires before the listen is retried, it gives up and throws
84 /// the original error.
85 Timer _timer;
86
87 /// When a [listen] call has thrown a "too many files" error, this will be
88 /// the exception object.
89 Object _exception;
90
91 /// When a [listen] call has thrown a "too many files" error, this will be
92 /// the captured stack trace.
93 Object _stackTrace;
94
95 /// The wrapped stream that the file can be read from. 140 /// The wrapped stream that the file can be read from.
96 Stream<List<int>> get stream => _controller.stream; 141 Stream<List<int>> get stream => _controller.stream;
97 142
98 _FileReader(this._pool, this._file) { 143 _FileReader(this._pool, this._file) {
99 _controller = new StreamController<List<int>>(onListen: _listen, 144 _controller = new StreamController<List<int>>(onPause: () {
100 onPause: () { 145 _isPaused = true;
101 _subscription.pause(); 146 if (_subscription != null) _subscription.pause();
102 }, onResume: () { 147 }, onResume: () {
103 _subscription.resume(); 148 _isPaused = false;
149 if (_subscription != null) _subscription.resume();
104 }, onCancel: () { 150 }, onCancel: () {
105 if (_subscription != null) _subscription.cancel(); 151 if (_subscription != null) _subscription.cancel();
106 _subscription = null; 152 _subscription = null;
107 }, sync: true); 153 }, sync: true);
108 } 154 }
109 155
110 /// Starts listening to the underlying file stream. 156 /// Starts listening to the underlying file stream.
111 void _listen() { 157 void start() {
112 if (_timer != null) {
113 _timer.cancel();
114 _timer = null;
115 }
116
117 _exception = null;
118 _stackTrace = null;
119
120 _fileStream = _file.openRead(); 158 _fileStream = _file.openRead();
121 _subscription = _fileStream.listen(_controller.add, 159 _subscription = _fileStream.listen(_controller.add,
122 onError: _onError, onDone: _onDone, cancelOnError: true); 160 onError: _onError, onDone: _onDone, cancelOnError: true);
161 if (_isPaused) _subscription.pause();
123 } 162 }
124 163
125 /// Handles an error from the underlying file stream. 164 /// Emits a timeout exception.
126 /// 165 void timeout() {
127 /// "Too many file" errors are caught so that we can retry later. Other 166 assert(_subscription == null);
128 /// errors are passed to the wrapped stream and the underlying stream 167 _controller.addError("FilePool deadlock: all file descriptors have been in "
129 /// subscription is canceled. 168 "use for too long.", new Trace.current().vmTrace);
Bob Nystrom 2013/10/26 00:13:34 This makes it sound like the error is not related
nweiz 2013/10/28 23:56:43 This is less applicable now, since the error comes
130 void _onError(Object exception, Object stackTrace) { 169 _controller.close();
131 assert(_subscription != null); 170 }
132 assert(_exception == null);
133 171
134 // The subscription is canceled after an error. 172 /// Forwards an error from the underlying file stream.
135 _subscription = null; 173 void _onError(Object exception, StackTrace stackTrace) {
136 174 _controller.addError(exception, stackTrace);
137 // We only handle "Too many open files errors". 175 _onDone();
138 if (exception is! FileException || exception.osError.errorCode != 24) {
139 _controller.addError(exception, stackTrace);
140 return;
141 }
142
143 _exception = exception;
144 _stackTrace = stackTrace;
145
146 // We'll try to defer the listen in the hopes that another file will close
147 // and we can try. If that doesn't happen after a while, give up and just
148 // throw the original error.
149 // TODO(rnystrom): The point of this timer is to not get stuck forever in
150 // a deadlock scenario. But this can also erroneously fire if there is a
151 // large number of slow reads that do incrementally finish. A file may not
152 // move to the front of the queue in time even though it is making
153 // progress. A better solution is to have a single deadlock timer on the
154 // FilePool itself that starts when a pending listen is enqueued and checks
155 // to see if progress has been made when it fires.
156 _timer = new Timer(new Duration(seconds: 60), _onTimeout);
157
158 // Tell the pool that this file is waiting.
159 _pool._pendingListens.add(this);
160 } 176 }
161 177
162 /// Handles the underlying file stream finishing. 178 /// Handles the underlying file stream finishing.
163 void _onDone() { 179 void _onDone() {
164 _subscription = null; 180 _subscription = null;
165
166 _controller.close(); 181 _controller.close();
167 _pool._retryPendingListen(); 182 _pool._startPendingListen();
168 }
169
170 /// If this file failed to be read because there were too many open files and
171 /// no file was closed in time to retry, this handles giving up.
172 void _onTimeout() {
173 assert(_subscription == null);
174 assert(_exception != null);
175
176 // We failed to open in time, so just fail with the original error.
177 _pool._pendingListens.remove(this);
178 _controller.addError(_exception, _stackTrace);
179 _controller.close();
180
181 _timer = null;
182 _exception = null;
183 _stackTrace = null;
184 } 183 }
185 } 184 }
OLDNEW
« no previous file with comments | « no previous file | pkg/barback/lib/src/utils.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698