Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library barback.file_pool; | 5 library barback.file_pool; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection'; | 8 import 'dart:collection'; |
| 9 import 'dart:convert'; | 9 import 'dart:convert'; |
| 10 import 'dart:io'; | 10 import 'dart:io'; |
| 11 | 11 |
| 12 import 'package:stack_trace/stack_trace.dart'; | |
| 13 | |
| 14 import 'utils.dart'; | |
| 15 | |
| 12 /// Manages a pool of files that are opened for reading to cope with maximum | 16 /// Manages a pool of files that are opened for reading to cope with maximum |
| 13 /// file descriptor limits. | 17 /// file descriptor limits. |
| 14 /// | 18 /// |
| 15 /// If a file cannot be opened because too many files are already open, this | 19 /// If a file cannot be opened because too many files are already open, this |
| 16 /// will defer the open until a previously opened file is closed and then try | 20 /// will defer the open until a previously opened file is closed and then try |
| 17 /// again. If this doesn't succeed after a certain amount of time, the open | 21 /// again. If this doesn't succeed after a certain amount of time, the open |
| 18 /// will fail and the original "too many files" exception will be thrown. | 22 /// will fail and the original "too many files" exception will be thrown. |
| 19 class FilePool { | 23 class FilePool { |
| 20 /// [_FileReader]s whose last [listen] call failed and that are waiting for | 24 /// [_FileReader]s whose last [listen] call failed and that are waiting for |
| 21 /// another file to close so they can be retried. | 25 /// another file to close so they can be retried. |
| 22 final _pendingListens = new Queue<_FileReader>(); | 26 final _pendingListens = new Queue<_FileReader>(); |
| 23 | 27 |
| 28 /// The timeout timer. | |
| 29 /// | |
| 30 /// This timer is set as soon as the file limit is reached and is reset every | |
| 31 /// time a file finishes being read or a new file is opened. If it fires, that | |
| 32 /// indicates that the caller became deadlocked, likely due to files waiting | |
| 33 /// for additional files to be read before they could be closed. | |
| 34 Timer _timer; | |
| 35 | |
| 36 /// The number of files currently open in the pool. | |
| 37 int _openFiles = 0; | |
| 38 | |
| 39 /// The maximum number of file descriptors that the pool will allocate. | |
| 40 /// | |
| 41 /// Barback may only use half the available file descriptors. | |
| 42 int get _maxOpenFiles => (maxFileDescriptors / 2).floor(); | |
| 43 | |
| 24 /// Opens [file] for reading. | 44 /// Opens [file] for reading. |
| 25 /// | 45 /// |
| 26 /// When the returned stream is listened to, if there are too many files | 46 /// When the returned stream is listened to, if there are too many files |
| 27 /// open, this will wait for a previously opened file to be closed and then | 47 /// open, this will wait for a previously opened file to be closed and then |
| 28 /// try again. | 48 /// try again. |
| 29 Stream<List<int>> openRead(File file) => new _FileReader(this, file).stream; | 49 Stream<List<int>> openRead(File file) { |
| 50 var reader = new _FileReader(this, file); | |
| 51 if (_openFiles < _maxOpenFiles) { | |
| 52 _openFiles++; | |
| 53 reader.start(); | |
| 54 } else { | |
| 55 _pendingListens.add(reader); | |
| 56 _heartbeat(); | |
| 57 } | |
| 58 return reader.stream; | |
| 59 } | |
| 30 | 60 |
| 31 /// Reads [file] as a string using [encoding]. | 61 /// Reads [file] as a string using [encoding]. |
| 32 /// | 62 /// |
| 33 /// If there are too many files open and the read fails, this will wait for | 63 /// If there are too many files open and the read fails, this will wait for |
| 34 /// a previously opened file to be closed and then try again. | 64 /// a previously opened file to be closed and then try again. |
| 35 Future<String> readAsString(File file, Encoding encoding) { | 65 Future<String> readAsString(File file, Encoding encoding) { |
| 36 return _readAsBytes(file).then(encoding.decode); | 66 return _readAsBytes(file).then(encoding.decode); |
| 37 } | 67 } |
| 38 | 68 |
| 39 /// Reads [file] as a list of bytes, using [openRead] to retry if there are | 69 /// Reads [file] as a list of bytes, using [openRead] to retry if there are |
| 40 /// failures. | 70 /// failures. |
| 41 Future<List<int>> _readAsBytes(File file) { | 71 Future<List<int>> _readAsBytes(File file) { |
| 42 var completer = new Completer<List<int>>(); | 72 var completer = new Completer<List<int>>(); |
| 43 var builder = new BytesBuilder(); | 73 var builder = new BytesBuilder(); |
| 44 | 74 |
| 45 openRead(file).listen(builder.add, onDone: () { | 75 openRead(file).listen(builder.add, onDone: () { |
| 46 completer.complete(builder.takeBytes()); | 76 completer.complete(builder.takeBytes()); |
| 47 }, onError: completer.completeError, cancelOnError: true); | 77 }, onError: completer.completeError, cancelOnError: true); |
| 48 | 78 |
| 49 return completer.future; | 79 return completer.future; |
| 50 } | 80 } |
| 51 | 81 |
| 52 /// Tries to re-listen to the next pending file reader if there are any. | 82 /// If there are any file reads that are waiting for available descriptors, |
| 53 void _retryPendingListen() { | 83 /// this will allow the oldest one to start reading. |
| 54 if (_pendingListens.isEmpty) return; | 84 void _startPendingListen() { |
| 85 if (_pendingListens.isEmpty) { | |
| 86 _openFiles--; | |
| 87 if (_timer != null) { | |
|
Alan Knight
2013/10/21 18:41:33
It seems a bit awkward to have the explicit cancel
nweiz
2013/10/22 18:37:28
We want to avoid restarting the timer here, since
| |
| 88 _timer.cancel(); | |
| 89 _timer = null; | |
| 90 } | |
| 91 return; | |
| 92 } | |
| 55 | 93 |
| 94 _heartbeat(); | |
| 56 var pending = _pendingListens.removeFirst(); | 95 var pending = _pendingListens.removeFirst(); |
| 57 pending._listen(); | 96 pending.start(); |
| 97 } | |
| 98 | |
| 99 /// Indicates that some external action has occurred and the timer should be | |
| 100 /// restarted. | |
| 101 void _heartbeat() { | |
|
Bob Nystrom
2013/10/26 00:13:34
How about "_restartTimer"? It wasn't obvious to me
nweiz
2013/10/28 23:56:43
Done.
| |
| 102 if (_timer != null) _timer.cancel(); | |
| 103 _timer = new Timer(new Duration(seconds: 60), _onTimeout); | |
| 104 } | |
| 105 | |
| 106 /// Handles [_timer] timing out by causing all pending file readers to emit | |
| 107 /// exceptions. | |
| 108 void _onTimeout() { | |
| 109 for (var reader in _pendingListens) { | |
| 110 reader.timeout(); | |
| 111 } | |
| 112 _pendingListens.clear(); | |
| 113 _timer = null; | |
| 58 } | 114 } |
| 59 } | 115 } |
| 60 | 116 |
| 61 /// Wraps a raw file reading stream in a stream that handles "too many files" | 117 /// Wraps a raw file reading stream in a stream that handles "too many files" |
| 62 /// errors. | 118 /// errors. |
| 63 /// | 119 /// |
| 64 /// This also notifies the pool when the underlying file stream is closed so | 120 /// This also notifies the pool when the underlying file stream is closed so |
| 65 /// that it can try to open a waiting file. | 121 /// that it can try to open a waiting file. |
| 66 class _FileReader { | 122 class _FileReader { |
| 67 final FilePool _pool; | 123 final FilePool _pool; |
| 68 final File _file; | 124 final File _file; |
| 69 | 125 |
| 126 /// Whether the caller has paused this reader's stream. | |
| 127 bool _isPaused = false; | |
| 128 | |
| 70 /// The underyling file stream. | 129 /// The underyling file stream. |
| 71 Stream<List<int>> _fileStream; | 130 Stream<List<int>> _fileStream; |
| 72 | 131 |
| 73 /// The controller for the stream wrapper. | 132 /// The controller for the stream wrapper. |
| 74 StreamController<List<int>> _controller; | 133 StreamController<List<int>> _controller; |
| 75 | 134 |
| 76 /// The current subscription to the underlying file stream. | 135 /// The current subscription to the underlying file stream. |
| 77 /// | 136 /// |
| 78 /// This will only be non-null while the wrapped stream is being listened to. | 137 /// This will only be non-null while the wrapped stream is being listened to. |
| 79 StreamSubscription _subscription; | 138 StreamSubscription _subscription; |
| 80 | 139 |
| 81 /// The timeout timer. | |
| 82 /// | |
| 83 /// If this timer fires before the listen is retried, it gives up and throws | |
| 84 /// the original error. | |
| 85 Timer _timer; | |
| 86 | |
| 87 /// When a [listen] call has thrown a "too many files" error, this will be | |
| 88 /// the exception object. | |
| 89 Object _exception; | |
| 90 | |
| 91 /// When a [listen] call has thrown a "too many files" error, this will be | |
| 92 /// the captured stack trace. | |
| 93 Object _stackTrace; | |
| 94 | |
| 95 /// The wrapped stream that the file can be read from. | 140 /// The wrapped stream that the file can be read from. |
| 96 Stream<List<int>> get stream => _controller.stream; | 141 Stream<List<int>> get stream => _controller.stream; |
| 97 | 142 |
| 98 _FileReader(this._pool, this._file) { | 143 _FileReader(this._pool, this._file) { |
| 99 _controller = new StreamController<List<int>>(onListen: _listen, | 144 _controller = new StreamController<List<int>>(onPause: () { |
| 100 onPause: () { | 145 _isPaused = true; |
| 101 _subscription.pause(); | 146 if (_subscription != null) _subscription.pause(); |
| 102 }, onResume: () { | 147 }, onResume: () { |
| 103 _subscription.resume(); | 148 _isPaused = false; |
| 149 if (_subscription != null) _subscription.resume(); | |
| 104 }, onCancel: () { | 150 }, onCancel: () { |
| 105 if (_subscription != null) _subscription.cancel(); | 151 if (_subscription != null) _subscription.cancel(); |
| 106 _subscription = null; | 152 _subscription = null; |
| 107 }, sync: true); | 153 }, sync: true); |
| 108 } | 154 } |
| 109 | 155 |
| 110 /// Starts listening to the underlying file stream. | 156 /// Starts listening to the underlying file stream. |
| 111 void _listen() { | 157 void start() { |
| 112 if (_timer != null) { | |
| 113 _timer.cancel(); | |
| 114 _timer = null; | |
| 115 } | |
| 116 | |
| 117 _exception = null; | |
| 118 _stackTrace = null; | |
| 119 | |
| 120 _fileStream = _file.openRead(); | 158 _fileStream = _file.openRead(); |
| 121 _subscription = _fileStream.listen(_controller.add, | 159 _subscription = _fileStream.listen(_controller.add, |
| 122 onError: _onError, onDone: _onDone, cancelOnError: true); | 160 onError: _onError, onDone: _onDone, cancelOnError: true); |
| 161 if (_isPaused) _subscription.pause(); | |
| 123 } | 162 } |
| 124 | 163 |
| 125 /// Handles an error from the underlying file stream. | 164 /// Emits a timeout exception. |
| 126 /// | 165 void timeout() { |
| 127 /// "Too many file" errors are caught so that we can retry later. Other | 166 assert(_subscription == null); |
| 128 /// errors are passed to the wrapped stream and the underlying stream | 167 _controller.addError("FilePool deadlock: all file descriptors have been in " |
| 129 /// subscription is canceled. | 168 "use for too long.", new Trace.current().vmTrace); |
|
Bob Nystrom
2013/10/26 00:13:34
This makes it sound like the error is not related
nweiz
2013/10/28 23:56:43
This is less applicable now, since the error comes
| |
| 130 void _onError(Object exception, Object stackTrace) { | 169 _controller.close(); |
| 131 assert(_subscription != null); | 170 } |
| 132 assert(_exception == null); | |
| 133 | 171 |
| 134 // The subscription is canceled after an error. | 172 /// Forwards an error from the underlying file stream. |
| 135 _subscription = null; | 173 void _onError(Object exception, StackTrace stackTrace) { |
| 136 | 174 _controller.addError(exception, stackTrace); |
| 137 // We only handle "Too many open files errors". | 175 _onDone(); |
| 138 if (exception is! FileException || exception.osError.errorCode != 24) { | |
| 139 _controller.addError(exception, stackTrace); | |
| 140 return; | |
| 141 } | |
| 142 | |
| 143 _exception = exception; | |
| 144 _stackTrace = stackTrace; | |
| 145 | |
| 146 // We'll try to defer the listen in the hopes that another file will close | |
| 147 // and we can try. If that doesn't happen after a while, give up and just | |
| 148 // throw the original error. | |
| 149 // TODO(rnystrom): The point of this timer is to not get stuck forever in | |
| 150 // a deadlock scenario. But this can also erroneously fire if there is a | |
| 151 // large number of slow reads that do incrementally finish. A file may not | |
| 152 // move to the front of the queue in time even though it is making | |
| 153 // progress. A better solution is to have a single deadlock timer on the | |
| 154 // FilePool itself that starts when a pending listen is enqueued and checks | |
| 155 // to see if progress has been made when it fires. | |
| 156 _timer = new Timer(new Duration(seconds: 60), _onTimeout); | |
| 157 | |
| 158 // Tell the pool that this file is waiting. | |
| 159 _pool._pendingListens.add(this); | |
| 160 } | 176 } |
| 161 | 177 |
| 162 /// Handles the underlying file stream finishing. | 178 /// Handles the underlying file stream finishing. |
| 163 void _onDone() { | 179 void _onDone() { |
| 164 _subscription = null; | 180 _subscription = null; |
| 165 | |
| 166 _controller.close(); | 181 _controller.close(); |
| 167 _pool._retryPendingListen(); | 182 _pool._startPendingListen(); |
| 168 } | |
| 169 | |
| 170 /// If this file failed to be read because there were too many open files and | |
| 171 /// no file was closed in time to retry, this handles giving up. | |
| 172 void _onTimeout() { | |
| 173 assert(_subscription == null); | |
| 174 assert(_exception != null); | |
| 175 | |
| 176 // We failed to open in time, so just fail with the original error. | |
| 177 _pool._pendingListens.remove(this); | |
| 178 _controller.addError(_exception, _stackTrace); | |
| 179 _controller.close(); | |
| 180 | |
| 181 _timer = null; | |
| 182 _exception = null; | |
| 183 _stackTrace = null; | |
| 184 } | 183 } |
| 185 } | 184 } |
| OLD | NEW |