Index: sdk/lib/io/file_impl.dart |
diff --git a/sdk/lib/io/file_impl.dart b/sdk/lib/io/file_impl.dart |
index f6cade265b754d6e6655c9be39f3e57b4750c928..ce60209d129e6eb8f842f077d208d37a25602c29 100644 |
--- a/sdk/lib/io/file_impl.dart |
+++ b/sdk/lib/io/file_impl.dart |
@@ -4,293 +4,202 @@ |
part of dart.io; |
-class _FileInputStream extends _BaseDataInputStream implements InputStream { |
- _FileInputStream(String name) |
- : _data = const [], |
- _position = 0, |
- _filePosition = 0 { |
- var file = new File(name); |
- var future = file.open(FileMode.READ); |
- future.then(_setupOpenedFile) |
- .catchError((e) { |
- _reportError(e.error); |
- }); |
- } |
- |
- _FileInputStream.fromStdio(int fd) |
- : _data = const [], |
- _position = 0, |
- _filePosition = 0 { |
- assert(fd == 0); |
- _setupOpenedFile(_File._openStdioSync(fd)); |
- } |
- |
- void _setupOpenedFile(RandomAccessFile openedFile) { |
- _openedFile = openedFile; |
- if (_streamMarkedClosed) { |
- // This input stream has already been closed. |
- _fileLength = 0; |
- _closeFile(); |
- return; |
- } |
- var futureOpen = _openedFile.length(); |
- futureOpen |
- .then((len) { |
- _fileLength = len; |
- _fillBuffer(); |
- }) |
- .catchError((e) { |
- _reportError(e.error); |
- }); |
- } |
- void _closeFile() { |
- if (_openedFile == null) { |
- _streamMarkedClosed = true; |
- return; |
- } |
- if (available() == 0) _cancelScheduledDataCallback(); |
- if (!_openedFile.closed) { |
- _openedFile.close().then((ignore) { |
- _streamMarkedClosed = true; |
- _checkScheduleCallbacks(); |
- }); |
- } |
- } |
+class _FileStream extends Stream<List<int>> { |
+ // Stream controller. |
+ StreamController<List<int>> _controller; |
- void _fillBuffer() { |
- Expect.equals(_position, _data.length); |
- if (_openedFile == null) return; // Called before the file is opened. |
- int size = min(_bufferLength, _fileLength - _filePosition); |
- if (size == 0) { |
- _closeFile(); |
- return; |
- } |
- // If there is currently a _fillBuffer call waiting on read, |
- // let it fill the buffer instead of us. |
- if (_activeFillBufferCall) return; |
- _activeFillBufferCall = true; |
- var future = _openedFile.read(size); |
- future.then((data) { |
- _data = data; |
- _position = 0; |
- _filePosition += _data.length; |
- _activeFillBufferCall = false; |
- |
- if (_fileLength == _filePosition) { |
- _closeFile(); |
- } |
- _checkScheduleCallbacks(); |
- }).catchError((e) { |
- _activeFillBufferCall = false; |
- _reportError(e.error); |
- }); |
- } |
+ // Read the file in blocks of size 64k. |
+ final int _blockSize = 64 * 1024; |
- int available() { |
- return closed ? 0 : _data.length - _position; |
- } |
+ // Information about the underlying file. |
+ String _name; |
+ RandomAccessFile _openedFile; |
+ int _position; |
- void pipe(OutputStream output, {bool close: true}) { |
- _pipe(this, output, close: close); |
- } |
+ // Has the stream been paused or unsubscribed? |
+ bool _paused = false; |
+ bool _unsubscribed = false; |
- void _finishRead() { |
- if (_position == _data.length && !_streamMarkedClosed) { |
- _fillBuffer(); |
- } else { |
- _checkScheduleCallbacks(); |
- } |
- } |
+ // Is there a read currently in progress? |
+ bool _readInProgress = false; |
- List<int> _read(int bytesToRead) { |
- List<int> result; |
- if (_position == 0 && bytesToRead == _data.length) { |
- result = _data; |
- _data = const []; |
- } else { |
- result = new Uint8List(bytesToRead); |
- result.setRange(0, bytesToRead, _data, _position); |
- _position += bytesToRead; |
- } |
- _finishRead(); |
- return result; |
- } |
+ // Block read but not yet send because stream is paused. |
+ List<int> _currentBlock; |
- int _readInto(List<int> buffer, int offset, int len) { |
- buffer.setRange(offset, len, _data, _position); |
- _position += len; |
- _finishRead(); |
- return len; |
+ _FileStream(String this._name) : _position = 0 { |
+ _setupController(); |
} |
- void _close() { |
- _data = const []; |
- _position = 0; |
- _filePosition = 0; |
- _fileLength = 0; |
- _closeFile(); |
+ _FileStream.forStdin() : _position = 0 { |
+ _setupController(); |
} |
- static const int _bufferLength = 64 * 1024; |
- |
- RandomAccessFile _openedFile; |
- List<int> _data; |
- int _position; |
- int _filePosition; |
- int _fileLength; |
- bool _activeFillBufferCall = false; |
-} |
- |
- |
-class _PendingOperation { |
- const _PendingOperation(this._id); |
- static const _PendingOperation CLOSE = const _PendingOperation(0); |
- static const _PendingOperation FLUSH = const _PendingOperation(1); |
- final int _id; |
-} |
- |
- |
-class _FileOutputStream extends _BaseOutputStream implements OutputStream { |
- _FileOutputStream(String name, FileMode mode) { |
- _pendingOperations = new List(); |
- var f = new File(name); |
- var openFuture = f.open(mode); |
- openFuture.then((openedFile) { |
- _file = openedFile; |
- _processPendingOperations(); |
- }).catchError((e) { |
- _reportError(e.error); |
- }); |
+ StreamSubscription<List<int>> listen(void onData(List<int> event), |
+ {void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError}) { |
+ return _controller.stream.listen(onData, |
+ onError: onError, |
+ onDone: onDone, |
+ unsubscribeOnError: unsubscribeOnError); |
} |
- _FileOutputStream.fromStdio(int fd) { |
- assert(1 <= fd && fd <= 2); |
- _file = _File._openStdioSync(fd); |
+ void _setupController() { |
+ _controller = new StreamController<List<int>>( |
+ onSubscriptionStateChange: _onSubscriptionStateChange, |
+ onPauseStateChange: _onPauseStateChange); |
} |
- bool write(List<int> buffer, [bool copyBuffer = false]) { |
- var data = buffer; |
- if (copyBuffer) { |
- var length = buffer.length; |
- data = new Uint8List(length); |
- data.setRange(0, length, buffer, 0); |
- } |
- if (_file == null) { |
- _pendingOperations.add(data); |
+ Future _closeFile() { |
+ Future closeFuture; |
+ if (_openedFile != null) { |
+ Future closeFuture = _openedFile.close(); |
+ _openedFile = null; |
+ return closeFuture; |
} else { |
- _write(data, 0, data.length); |
+ return new Future.immediate(null); |
} |
- return false; |
} |
- bool writeFrom(List<int> buffer, [int offset = 0, int len]) { |
- // A copy is required by the interface. |
- var length = buffer.length - offset; |
- if (len != null) { |
- if (len > length) throw new RangeError.value(len); |
- length = len; |
- } |
- var copy = new Uint8List(length); |
- copy.setRange(0, length, buffer, offset); |
- return write(copy); |
+ void _readBlock() { |
+ // Don't start a new read if one is already in progress. |
+ if (_readInProgress) return; |
+ _readInProgress = true; |
+ _openedFile.length() |
+ .then((length) { |
+ if (_position >= length) { |
+ _readInProgress = false; |
+ if (!_unsubscribed) { |
+ _closeFile().then((_) { _controller.close(); }); |
+ _unsubscribed = true; |
+ } |
+ return null; |
+ } else { |
+ return _openedFile.read(_blockSize); |
+ } |
+ }) |
+ .then((block) { |
+ _readInProgress = false; |
+ if (block == null || _unsubscribed) { |
+ return; |
+ } |
+ _position += block.length; |
+ if (_paused) { |
+ _currentBlock = block; |
+ } else { |
+ _controller.add(block); |
+ _readBlock(); |
+ } |
+ }) |
+ .catchError((e) { |
+ if (!_unsubscribed) { |
+ _controller.signalError(e); |
+ _closeFile().then((_) { _controller.close(); }); |
+ _unsubscribed = true; |
+ } |
+ }); |
} |
- |
- void flush() { |
- if (_file == null) { |
- _pendingOperations.add(_PendingOperation.FLUSH); |
+ void _start() { |
+ Future<RandomAccessFile> openFuture; |
+ if (_name != null) { |
+ openFuture = new File(_name).open(FileMode.READ); |
} else { |
- _file.flush().then((ignored) => null); |
+ openFuture = new Future.immediate(_File._openStdioSync(0)); |
} |
- } |
- |
- |
- void close() { |
- _streamMarkedClosed = true; |
- if (_file == null) { |
- _pendingOperations.add(_PendingOperation.CLOSE); |
- } else if (!_closeCallbackScheduled) { |
- _file.close().then((ignore) { |
- if (_onClosed != null) _onClosed(); |
+ openFuture |
+ .then((RandomAccessFile opened) { |
+ _openedFile = opened; |
+ _readBlock(); |
+ }) |
+ .catchError((e) { |
+ _controller.signalError(e); |
+ _controller.close(); |
}); |
- _closeCallbackScheduled = true; |
- } |
} |
- void set onNoPendingWrites(void callback()) { |
- _onNoPendingWrites = callback; |
- if ((_pendingOperations == null || _pendingOperations.length == 0) && |
- outstandingWrites == 0 && |
- !_streamMarkedClosed && |
- _onNoPendingWrites != null) { |
- Timer.run(() { |
- if (_onNoPendingWrites != null) { |
- _onNoPendingWrites(); |
- } |
- }); |
+ void _resume() { |
+ _paused = false; |
+ if (_currentBlock != null) { |
+ _controller.add(_currentBlock); |
+ _currentBlock = null; |
} |
+ // Resume reading unless we are already done. |
+ if (_openedFile != null) _readBlock(); |
} |
- void set onClosed(void callback()) { |
- _onClosed = callback; |
+ void _onSubscriptionStateChange() { |
+ if (_controller.hasSubscribers) { |
+ _start(); |
+ } else { |
+ _unsubscribed = true; |
+ _closeFile(); |
+ } |
} |
- void _processPendingOperations() { |
- _pendingOperations.forEach((buffer) { |
- if (buffer is _PendingOperation) { |
- if (identical(buffer, _PendingOperation.CLOSE)) { |
- close(); |
- } else { |
- assert(identical(buffer, _PendingOperation.FLUSH)); |
- flush(); |
- } |
- } else { |
- write(buffer); |
- } |
- }); |
- _pendingOperations = null; |
- } |
- |
- void _write(List<int> buffer, int offset, int len) { |
- outstandingWrites++; |
- var writeListFuture = _file.writeList(buffer, offset, len); |
- writeListFuture.then((ignore) { |
- outstandingWrites--; |
- if (outstandingWrites == 0 && |
- !_streamMarkedClosed && |
- _onNoPendingWrites != null) { |
- _onNoPendingWrites(); |
- } |
- }).catchError((e) { |
- outstandingWrites--; |
- _reportError(e.error); |
- }); |
+ void _onPauseStateChange() { |
+ if (_controller.isPaused) { |
+ _paused = true; |
+ } else { |
+ _resume(); |
+ } |
} |
+} |
- bool get closed => _streamMarkedClosed; |
- |
- RandomAccessFile _file; |
- |
- // When this is set to true the stream is marked closed. When a |
- // stream is marked closed no more data can be written. |
- bool _streamMarkedClosed = false; |
- |
- // When this is set to true, the close callback has been scheduled and the |
- // stream will be fully closed once it's called. |
- bool _closeCallbackScheduled = false; |
+class _FileStreamConsumer extends StreamConsumer<List<int>, File> { |
+ File _file; |
+ Future<RandomAccessFile> _openFuture; |
+ StreamSubscription _subscription; |
- // Number of writes that have not yet completed. |
- int outstandingWrites = 0; |
+ _FileStreamConsumer(File this._file, FileMode mode) { |
+ _openFuture = _file.open(mode); |
+ } |
- // List of pending writes that were issued before the underlying |
- // file was successfully opened. |
- List _pendingOperations; |
+ _FileStreamConsumer.fromStdio(int fd) { |
+ assert(1 <= fd && fd <= 2); |
+ _openFuture = new Future.immediate(_File._openStdioSync(fd)); |
+ } |
- Function _onNoPendingWrites; |
- Function _onClosed; |
+ Future<File> consume(Stream<List<int>> stream) { |
+ Completer<File> completer = new Completer<File>(); |
+ _openFuture |
+ .then((openedFile) { |
+ _subscription = stream.listen( |
+ (d) { |
+ _subscription.pause(); |
+ openedFile.writeList(d, 0, d.length) |
+ .then((_) => _subscription.resume()) |
+ .catchError((e) { |
+ openedFile.close(); |
+ completer.completeError(e); |
+ }); |
+ }, |
+ onDone: () { |
+ // Wait for the file to close (and therefore flush) before |
+ // completing the future. |
+ openedFile.close() |
+ .then((_) { |
+ completer.complete(_file); |
+ }) |
+ .catchError((e) { |
+ completer.completeError(e); |
+ }); |
+ }, |
+ onError: (e) { |
+ openedFile.close(); |
+ completer.completeError(e); |
+ }, |
+ unsubscribeOnError: true); |
+ }) |
+ .catchError((e) { |
+ completer.completeError(e); |
+ }); |
+ return completer.future; |
+ } |
} |
+ |
const int _EXISTS_REQUEST = 0; |
const int _CREATE_REQUEST = 1; |
const int _DELETE_REQUEST = 2; |
@@ -555,36 +464,35 @@ class _File extends _FileBase implements File { |
return result; |
} |
- InputStream openInputStream() { |
- return new _FileInputStream(_name); |
+ Stream<List<int>> openRead() { |
+ return new _FileStream(_name); |
} |
- OutputStream openOutputStream([FileMode mode = FileMode.WRITE]) { |
+ IOSink<File> openWrite([FileMode mode = FileMode.WRITE]) { |
if (mode != FileMode.WRITE && |
mode != FileMode.APPEND) { |
throw new FileIOException( |
"Wrong FileMode. Use FileMode.WRITE or FileMode.APPEND"); |
} |
- return new _FileOutputStream(_name, mode); |
+ var consumer = new _FileStreamConsumer(this, mode); |
+ return new IOSink<File>(consumer); |
} |
Future<List<int>> readAsBytes() { |
_ensureFileService(); |
Completer<List<int>> completer = new Completer<List<int>>(); |
var chunks = new _BufferList(); |
- var stream = openInputStream(); |
- stream.onClosed = () { |
- var result = chunks.readBytes(chunks.length); |
- if (result == null) result = <int>[]; |
- completer.complete(result); |
- }; |
- stream.onData = () { |
- var chunk = stream.read(); |
- chunks.add(chunk); |
- }; |
- stream.onError = (e) { |
- completer.completeError(e); |
- }; |
+ openRead().listen( |
+ (d) => chunks.add(d), |
+ onDone: () { |
+ var result = chunks.readBytes(chunks.length); |
+ if (result == null) result = <int>[]; |
+ completer.complete(result); |
+ }, |
+ onError: (e) { |
+ completer.completeError(e); |
+ }, |
+ unsubscribeOnError: true); |
return completer.future; |
} |
@@ -603,67 +511,54 @@ class _File extends _FileBase implements File { |
Future<String> readAsString([Encoding encoding = Encoding.UTF_8]) { |
_ensureFileService(); |
return readAsBytes().then((bytes) { |
- if (bytes.length == 0) return ""; |
- var decoder = _StringDecoders.decoder(encoding); |
- decoder.write(bytes); |
- return decoder.decoded(); |
+ return _decodeString(bytes, encoding); |
}); |
} |
String readAsStringSync([Encoding encoding = Encoding.UTF_8]) { |
- var decoder = _StringDecoders.decoder(encoding); |
List<int> bytes = readAsBytesSync(); |
- if (bytes.length == 0) return ""; |
- decoder.write(bytes); |
- return decoder.decoded(); |
+ return _decodeString(bytes, encoding); |
} |
- List<String> _getDecodedLines(_StringDecoder decoder) { |
- List<String> result = []; |
- var line = decoder.decodedLine; |
- while (line != null) { |
- result.add(line); |
- line = decoder.decodedLine; |
- } |
- // If there is more data with no terminating line break we treat |
- // it as the last line. |
- var data = decoder.decoded(); |
- if (data != null) { |
- result.add(data); |
- } |
- return result; |
+ static List<String> _decodeLines(List<int> bytes, Encoding encoding) { |
+ if (bytes.length == 0) return []; |
+ var list = []; |
+ var controller = new StreamController(); |
+ controller.stream |
+ .transform(new StringDecoder(encoding)) |
+ .transform(new LineTransformer()) |
+ .listen((line) => list.add(line)); |
+ controller.add(bytes); |
+ controller.close(); |
+ return list; |
} |
Future<List<String>> readAsLines([Encoding encoding = Encoding.UTF_8]) { |
_ensureFileService(); |
Completer<List<String>> completer = new Completer<List<String>>(); |
return readAsBytes().then((bytes) { |
- var decoder = _StringDecoders.decoder(encoding); |
- decoder.write(bytes); |
- return _getDecodedLines(decoder); |
+ return _decodeLines(bytes, encoding); |
}); |
} |
List<String> readAsLinesSync([Encoding encoding = Encoding.UTF_8]) { |
- var decoder = _StringDecoders.decoder(encoding); |
- List<int> bytes = readAsBytesSync(); |
- decoder.write(bytes); |
- return _getDecodedLines(decoder); |
+ return _decodeLines(readAsBytesSync(), encoding); |
} |
Future<File> writeAsBytes(List<int> bytes, |
[FileMode mode = FileMode.WRITE]) { |
Completer<File> completer = new Completer<File>(); |
try { |
- var stream = openOutputStream(mode); |
- stream.write(bytes); |
+ var stream = openWrite(mode); |
+ stream.add(bytes); |
stream.close(); |
- stream.onClosed = () { |
- completer.complete(this); |
- }; |
- stream.onError = (e) { |
- completer.completeError(e); |
- }; |
+ stream.done |
+ .then((_) { |
+ completer.complete(this); |
+ }) |
+ .catchError((e) { |
+ completer.completeError(e); |
+ }); |
} catch (e) { |
Timer.run(() => completer.completeError(e)); |
return completer.future; |
@@ -681,8 +576,7 @@ class _File extends _FileBase implements File { |
{FileMode mode: FileMode.WRITE, |
Encoding encoding: Encoding.UTF_8}) { |
try { |
- var data = _StringEncoders.encoder(encoding).encodeString(contents); |
- return writeAsBytes(data, mode); |
+ return writeAsBytes(_encodeString(contents, encoding), mode); |
} catch (e) { |
var completer = new Completer(); |
Timer.run(() => completer.completeError(e)); |
@@ -693,8 +587,7 @@ class _File extends _FileBase implements File { |
void writeAsStringSync(String contents, |
{FileMode mode: FileMode.WRITE, |
Encoding encoding: Encoding.UTF_8}) { |
- var data = _StringEncoders.encoder(encoding).encodeString(contents); |
- writeAsBytesSync(data, mode); |
+ writeAsBytesSync(_encodeString(contents, encoding), mode); |
} |
String get name => _name; |
@@ -987,7 +880,7 @@ class _RandomAccessFile extends _FileBase implements RandomAccessFile { |
}); |
return completer.future; |
} |
- var data = _StringEncoders.encoder(encoding).encodeString(string); |
+ var data = _encodeString(string, encoding); |
return writeList(data, 0, data.length); |
} |
@@ -996,7 +889,7 @@ class _RandomAccessFile extends _FileBase implements RandomAccessFile { |
throw new FileIOException( |
"Invalid encoding in writeStringSync: $encoding"); |
} |
- var data = _StringEncoders.encoder(encoding).encodeString(string); |
+ var data = _encodeString(string, encoding); |
return writeListSync(data, 0, data.length); |
} |