OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library barback.file_pool; | 5 library barback.file_pool; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 import 'dart:convert'; | 9 import 'dart:convert'; |
10 import 'dart:io'; | 10 import 'dart:io'; |
11 | 11 |
12 import 'package:stack_trace/stack_trace.dart'; | |
13 | |
14 import 'utils.dart'; | |
15 | |
12 /// Manages a pool of files that are opened for reading to cope with maximum | 16 /// Manages a pool of files that are opened for reading to cope with maximum |
13 /// file descriptor limits. | 17 /// file descriptor limits. |
14 /// | 18 /// |
15 /// If a file cannot be opened because too many files are already open, this | 19 /// If a file cannot be opened because too many files are already open, this |
16 /// will defer the open until a previously opened file is closed and then try | 20 /// will defer the open until a previously opened file is closed and then try |
17 /// again. If this doesn't succeed after a certain amount of time, the open | 21 /// again. If this doesn't succeed after a certain amount of time, the open |
18 /// will fail and the original "too many files" exception will be thrown. | 22 /// will fail and the original "too many files" exception will be thrown. |
19 class FilePool { | 23 class FilePool { |
20 /// [_FileReader]s whose last [listen] call failed and that are waiting for | 24 /// [_FileReader]s whose last [listen] call failed and that are waiting for |
21 /// another file to close so they can be retried. | 25 /// another file to close so they can be retried. |
22 final _pendingListens = new Queue<_FileReader>(); | 26 final _pendingListens = new Queue<_FileReader>(); |
23 | 27 |
28 /// The timeout timer. | |
29 /// | |
30 /// This timer is set as soon as the file limit is reached and is reset every | |
31 /// time a file finishes being read or a new file is opened. If it fires, that | |
32 /// indicates that the caller became deadlocked, likely due to files waiting | |
33 /// for additional files to be read before they could be closed. | |
34 Timer _timer; | |
35 | |
36 /// The number of files currently open in the pool. | |
37 int _openFiles = 0; | |
38 | |
39 /// The maximum number of file descriptors that the pool will allocate. | |
40 /// | |
41 /// Barback may only use half the available file descriptors. | |
42 int get _maxOpenFiles => (maxFileDescriptors / 2).floor(); | |
43 | |
24 /// Opens [file] for reading. | 44 /// Opens [file] for reading. |
25 /// | 45 /// |
26 /// When the returned stream is listened to, if there are too many files | 46 /// When the returned stream is listened to, if there are too many files |
27 /// open, this will wait for a previously opened file to be closed and then | 47 /// open, this will wait for a previously opened file to be closed and then |
28 /// try again. | 48 /// try again. |
29 Stream<List<int>> openRead(File file) => new _FileReader(this, file).stream; | 49 Stream<List<int>> openRead(File file) { |
50 var reader = new _FileReader(this, file); | |
51 if (_openFiles < _maxOpenFiles) { | |
52 _openFiles++; | |
53 reader.start(); | |
54 } else { | |
55 _pendingListens.add(reader); | |
56 _heartbeat(); | |
57 } | |
58 return reader.stream; | |
59 } | |
30 | 60 |
31 /// Reads [file] as a string using [encoding]. | 61 /// Reads [file] as a string using [encoding]. |
32 /// | 62 /// |
33 /// If there are too many files open and the read fails, this will wait for | 63 /// If there are too many files open and the read fails, this will wait for |
34 /// a previously opened file to be closed and then try again. | 64 /// a previously opened file to be closed and then try again. |
35 Future<String> readAsString(File file, Encoding encoding) { | 65 Future<String> readAsString(File file, Encoding encoding) { |
36 return _readAsBytes(file).then(encoding.decode); | 66 return _readAsBytes(file).then(encoding.decode); |
37 } | 67 } |
38 | 68 |
39 /// Reads [file] as a list of bytes, using [openRead] to retry if there are | 69 /// Reads [file] as a list of bytes, using [openRead] to retry if there are |
40 /// failures. | 70 /// failures. |
41 Future<List<int>> _readAsBytes(File file) { | 71 Future<List<int>> _readAsBytes(File file) { |
42 var completer = new Completer<List<int>>(); | 72 var completer = new Completer<List<int>>(); |
43 var builder = new BytesBuilder(); | 73 var builder = new BytesBuilder(); |
44 | 74 |
45 openRead(file).listen(builder.add, onDone: () { | 75 openRead(file).listen(builder.add, onDone: () { |
46 completer.complete(builder.takeBytes()); | 76 completer.complete(builder.takeBytes()); |
47 }, onError: completer.completeError, cancelOnError: true); | 77 }, onError: completer.completeError, cancelOnError: true); |
48 | 78 |
49 return completer.future; | 79 return completer.future; |
50 } | 80 } |
51 | 81 |
52 /// Tries to re-listen to the next pending file reader if there are any. | 82 /// If there are any file reads that are waiting for available descriptors, |
53 void _retryPendingListen() { | 83 /// this will allow the oldest one to start reading. |
54 if (_pendingListens.isEmpty) return; | 84 void _startPendingListen() { |
85 if (_pendingListens.isEmpty) { | |
86 _openFiles--; | |
87 if (_timer != null) { | |
Alan Knight
2013/10/21 18:41:33
It seems a bit awkward to have the explicit cancel
nweiz
2013/10/22 18:37:28
We want to avoid restarting the timer here, since
| |
88 _timer.cancel(); | |
89 _timer = null; | |
90 } | |
91 return; | |
92 } | |
55 | 93 |
94 _heartbeat(); | |
56 var pending = _pendingListens.removeFirst(); | 95 var pending = _pendingListens.removeFirst(); |
57 pending._listen(); | 96 pending.start(); |
97 } | |
98 | |
99 /// Indicates that some external action has occurred and the timer should be | |
100 /// restarted. | |
101 void _heartbeat() { | |
Bob Nystrom
2013/10/26 00:13:34
How about "_restartTimer"? It wasn't obvious to me
nweiz
2013/10/28 23:56:43
Done.
| |
102 if (_timer != null) _timer.cancel(); | |
103 _timer = new Timer(new Duration(seconds: 60), _onTimeout); | |
104 } | |
105 | |
106 /// Handles [_timer] timing out by causing all pending file readers to emit | |
107 /// exceptions. | |
108 void _onTimeout() { | |
109 for (var reader in _pendingListens) { | |
110 reader.timeout(); | |
111 } | |
112 _pendingListens.clear(); | |
113 _timer = null; | |
58 } | 114 } |
59 } | 115 } |
60 | 116 |
61 /// Wraps a raw file reading stream in a stream that handles "too many files" | 117 /// Wraps a raw file reading stream in a stream that handles "too many files" |
62 /// errors. | 118 /// errors. |
63 /// | 119 /// |
64 /// This also notifies the pool when the underlying file stream is closed so | 120 /// This also notifies the pool when the underlying file stream is closed so |
65 /// that it can try to open a waiting file. | 121 /// that it can try to open a waiting file. |
66 class _FileReader { | 122 class _FileReader { |
67 final FilePool _pool; | 123 final FilePool _pool; |
68 final File _file; | 124 final File _file; |
69 | 125 |
126 /// Whether the caller has paused this reader's stream. | |
127 bool _isPaused = false; | |
128 | |
70 /// The underyling file stream. | 129 /// The underyling file stream. |
71 Stream<List<int>> _fileStream; | 130 Stream<List<int>> _fileStream; |
72 | 131 |
73 /// The controller for the stream wrapper. | 132 /// The controller for the stream wrapper. |
74 StreamController<List<int>> _controller; | 133 StreamController<List<int>> _controller; |
75 | 134 |
76 /// The current subscription to the underlying file stream. | 135 /// The current subscription to the underlying file stream. |
77 /// | 136 /// |
78 /// This will only be non-null while the wrapped stream is being listened to. | 137 /// This will only be non-null while the wrapped stream is being listened to. |
79 StreamSubscription _subscription; | 138 StreamSubscription _subscription; |
80 | 139 |
81 /// The timeout timer. | |
82 /// | |
83 /// If this timer fires before the listen is retried, it gives up and throws | |
84 /// the original error. | |
85 Timer _timer; | |
86 | |
87 /// When a [listen] call has thrown a "too many files" error, this will be | |
88 /// the exception object. | |
89 Object _exception; | |
90 | |
91 /// When a [listen] call has thrown a "too many files" error, this will be | |
92 /// the captured stack trace. | |
93 Object _stackTrace; | |
94 | |
95 /// The wrapped stream that the file can be read from. | 140 /// The wrapped stream that the file can be read from. |
96 Stream<List<int>> get stream => _controller.stream; | 141 Stream<List<int>> get stream => _controller.stream; |
97 | 142 |
98 _FileReader(this._pool, this._file) { | 143 _FileReader(this._pool, this._file) { |
99 _controller = new StreamController<List<int>>(onListen: _listen, | 144 _controller = new StreamController<List<int>>(onPause: () { |
100 onPause: () { | 145 _isPaused = true; |
101 _subscription.pause(); | 146 if (_subscription != null) _subscription.pause(); |
102 }, onResume: () { | 147 }, onResume: () { |
103 _subscription.resume(); | 148 _isPaused = false; |
149 if (_subscription != null) _subscription.resume(); | |
104 }, onCancel: () { | 150 }, onCancel: () { |
105 if (_subscription != null) _subscription.cancel(); | 151 if (_subscription != null) _subscription.cancel(); |
106 _subscription = null; | 152 _subscription = null; |
107 }, sync: true); | 153 }, sync: true); |
108 } | 154 } |
109 | 155 |
110 /// Starts listening to the underlying file stream. | 156 /// Starts listening to the underlying file stream. |
111 void _listen() { | 157 void start() { |
112 if (_timer != null) { | |
113 _timer.cancel(); | |
114 _timer = null; | |
115 } | |
116 | |
117 _exception = null; | |
118 _stackTrace = null; | |
119 | |
120 _fileStream = _file.openRead(); | 158 _fileStream = _file.openRead(); |
121 _subscription = _fileStream.listen(_controller.add, | 159 _subscription = _fileStream.listen(_controller.add, |
122 onError: _onError, onDone: _onDone, cancelOnError: true); | 160 onError: _onError, onDone: _onDone, cancelOnError: true); |
161 if (_isPaused) _subscription.pause(); | |
123 } | 162 } |
124 | 163 |
125 /// Handles an error from the underlying file stream. | 164 /// Emits a timeout exception. |
126 /// | 165 void timeout() { |
127 /// "Too many file" errors are caught so that we can retry later. Other | 166 assert(_subscription == null); |
128 /// errors are passed to the wrapped stream and the underlying stream | 167 _controller.addError("FilePool deadlock: all file descriptors have been in " |
129 /// subscription is canceled. | 168 "use for too long.", new Trace.current().vmTrace); |
Bob Nystrom
2013/10/26 00:13:34
This makes it sound like the error is not related
nweiz
2013/10/28 23:56:43
This is less applicable now, since the error comes
| |
130 void _onError(Object exception, Object stackTrace) { | 169 _controller.close(); |
131 assert(_subscription != null); | 170 } |
132 assert(_exception == null); | |
133 | 171 |
134 // The subscription is canceled after an error. | 172 /// Forwards an error from the underlying file stream. |
135 _subscription = null; | 173 void _onError(Object exception, StackTrace stackTrace) { |
136 | 174 _controller.addError(exception, stackTrace); |
137 // We only handle "Too many open files errors". | 175 _onDone(); |
138 if (exception is! FileException || exception.osError.errorCode != 24) { | |
139 _controller.addError(exception, stackTrace); | |
140 return; | |
141 } | |
142 | |
143 _exception = exception; | |
144 _stackTrace = stackTrace; | |
145 | |
146 // We'll try to defer the listen in the hopes that another file will close | |
147 // and we can try. If that doesn't happen after a while, give up and just | |
148 // throw the original error. | |
149 // TODO(rnystrom): The point of this timer is to not get stuck forever in | |
150 // a deadlock scenario. But this can also erroneously fire if there is a | |
151 // large number of slow reads that do incrementally finish. A file may not | |
152 // move to the front of the queue in time even though it is making | |
153 // progress. A better solution is to have a single deadlock timer on the | |
154 // FilePool itself that starts when a pending listen is enqueued and checks | |
155 // to see if progress has been made when it fires. | |
156 _timer = new Timer(new Duration(seconds: 60), _onTimeout); | |
157 | |
158 // Tell the pool that this file is waiting. | |
159 _pool._pendingListens.add(this); | |
160 } | 176 } |
161 | 177 |
162 /// Handles the underlying file stream finishing. | 178 /// Handles the underlying file stream finishing. |
163 void _onDone() { | 179 void _onDone() { |
164 _subscription = null; | 180 _subscription = null; |
165 | |
166 _controller.close(); | 181 _controller.close(); |
167 _pool._retryPendingListen(); | 182 _pool._startPendingListen(); |
168 } | |
169 | |
170 /// If this file failed to be read because there were too many open files and | |
171 /// no file was closed in time to retry, this handles giving up. | |
172 void _onTimeout() { | |
173 assert(_subscription == null); | |
174 assert(_exception != null); | |
175 | |
176 // We failed to open in time, so just fail with the original error. | |
177 _pool._pendingListens.remove(this); | |
178 _controller.addError(_exception, _stackTrace); | |
179 _controller.close(); | |
180 | |
181 _timer = null; | |
182 _exception = null; | |
183 _stackTrace = null; | |
184 } | 183 } |
185 } | 184 } |
OLD | NEW |