| Index: sdk/lib/io/file_impl.dart
|
| diff --git a/sdk/lib/io/file_impl.dart b/sdk/lib/io/file_impl.dart
|
| index f6cade265b754d6e6655c9be39f3e57b4750c928..ce60209d129e6eb8f842f077d208d37a25602c29 100644
|
| --- a/sdk/lib/io/file_impl.dart
|
| +++ b/sdk/lib/io/file_impl.dart
|
| @@ -4,293 +4,202 @@
|
|
|
| part of dart.io;
|
|
|
| -class _FileInputStream extends _BaseDataInputStream implements InputStream {
|
| - _FileInputStream(String name)
|
| - : _data = const [],
|
| - _position = 0,
|
| - _filePosition = 0 {
|
| - var file = new File(name);
|
| - var future = file.open(FileMode.READ);
|
| - future.then(_setupOpenedFile)
|
| - .catchError((e) {
|
| - _reportError(e.error);
|
| - });
|
| - }
|
| -
|
| - _FileInputStream.fromStdio(int fd)
|
| - : _data = const [],
|
| - _position = 0,
|
| - _filePosition = 0 {
|
| - assert(fd == 0);
|
| - _setupOpenedFile(_File._openStdioSync(fd));
|
| - }
|
| -
|
| - void _setupOpenedFile(RandomAccessFile openedFile) {
|
| - _openedFile = openedFile;
|
| - if (_streamMarkedClosed) {
|
| - // This input stream has already been closed.
|
| - _fileLength = 0;
|
| - _closeFile();
|
| - return;
|
| - }
|
| - var futureOpen = _openedFile.length();
|
| - futureOpen
|
| - .then((len) {
|
| - _fileLength = len;
|
| - _fillBuffer();
|
| - })
|
| - .catchError((e) {
|
| - _reportError(e.error);
|
| - });
|
| - }
|
|
|
| - void _closeFile() {
|
| - if (_openedFile == null) {
|
| - _streamMarkedClosed = true;
|
| - return;
|
| - }
|
| - if (available() == 0) _cancelScheduledDataCallback();
|
| - if (!_openedFile.closed) {
|
| - _openedFile.close().then((ignore) {
|
| - _streamMarkedClosed = true;
|
| - _checkScheduleCallbacks();
|
| - });
|
| - }
|
| - }
|
| +class _FileStream extends Stream<List<int>> {
|
| + // Stream controller.
|
| + StreamController<List<int>> _controller;
|
|
|
| - void _fillBuffer() {
|
| - Expect.equals(_position, _data.length);
|
| - if (_openedFile == null) return; // Called before the file is opened.
|
| - int size = min(_bufferLength, _fileLength - _filePosition);
|
| - if (size == 0) {
|
| - _closeFile();
|
| - return;
|
| - }
|
| - // If there is currently a _fillBuffer call waiting on read,
|
| - // let it fill the buffer instead of us.
|
| - if (_activeFillBufferCall) return;
|
| - _activeFillBufferCall = true;
|
| - var future = _openedFile.read(size);
|
| - future.then((data) {
|
| - _data = data;
|
| - _position = 0;
|
| - _filePosition += _data.length;
|
| - _activeFillBufferCall = false;
|
| -
|
| - if (_fileLength == _filePosition) {
|
| - _closeFile();
|
| - }
|
| - _checkScheduleCallbacks();
|
| - }).catchError((e) {
|
| - _activeFillBufferCall = false;
|
| - _reportError(e.error);
|
| - });
|
| - }
|
| + // Read the file in blocks of size 64k.
|
| + final int _blockSize = 64 * 1024;
|
|
|
| - int available() {
|
| - return closed ? 0 : _data.length - _position;
|
| - }
|
| + // Information about the underlying file.
|
| + String _name;
|
| + RandomAccessFile _openedFile;
|
| + int _position;
|
|
|
| - void pipe(OutputStream output, {bool close: true}) {
|
| - _pipe(this, output, close: close);
|
| - }
|
| + // Has the stream been paused or unsubscribed?
|
| + bool _paused = false;
|
| + bool _unsubscribed = false;
|
|
|
| - void _finishRead() {
|
| - if (_position == _data.length && !_streamMarkedClosed) {
|
| - _fillBuffer();
|
| - } else {
|
| - _checkScheduleCallbacks();
|
| - }
|
| - }
|
| + // Is there a read currently in progress?
|
| + bool _readInProgress = false;
|
|
|
| - List<int> _read(int bytesToRead) {
|
| - List<int> result;
|
| - if (_position == 0 && bytesToRead == _data.length) {
|
| - result = _data;
|
| - _data = const [];
|
| - } else {
|
| - result = new Uint8List(bytesToRead);
|
| - result.setRange(0, bytesToRead, _data, _position);
|
| - _position += bytesToRead;
|
| - }
|
| - _finishRead();
|
| - return result;
|
| - }
|
| + // Block read but not yet send because stream is paused.
|
| + List<int> _currentBlock;
|
|
|
| - int _readInto(List<int> buffer, int offset, int len) {
|
| - buffer.setRange(offset, len, _data, _position);
|
| - _position += len;
|
| - _finishRead();
|
| - return len;
|
| + _FileStream(String this._name) : _position = 0 {
|
| + _setupController();
|
| }
|
|
|
| - void _close() {
|
| - _data = const [];
|
| - _position = 0;
|
| - _filePosition = 0;
|
| - _fileLength = 0;
|
| - _closeFile();
|
| + _FileStream.forStdin() : _position = 0 {
|
| + _setupController();
|
| }
|
|
|
| - static const int _bufferLength = 64 * 1024;
|
| -
|
| - RandomAccessFile _openedFile;
|
| - List<int> _data;
|
| - int _position;
|
| - int _filePosition;
|
| - int _fileLength;
|
| - bool _activeFillBufferCall = false;
|
| -}
|
| -
|
| -
|
| -class _PendingOperation {
|
| - const _PendingOperation(this._id);
|
| - static const _PendingOperation CLOSE = const _PendingOperation(0);
|
| - static const _PendingOperation FLUSH = const _PendingOperation(1);
|
| - final int _id;
|
| -}
|
| -
|
| -
|
| -class _FileOutputStream extends _BaseOutputStream implements OutputStream {
|
| - _FileOutputStream(String name, FileMode mode) {
|
| - _pendingOperations = new List();
|
| - var f = new File(name);
|
| - var openFuture = f.open(mode);
|
| - openFuture.then((openedFile) {
|
| - _file = openedFile;
|
| - _processPendingOperations();
|
| - }).catchError((e) {
|
| - _reportError(e.error);
|
| - });
|
| + StreamSubscription<List<int>> listen(void onData(List<int> event),
|
| + {void onError(AsyncError error),
|
| + void onDone(),
|
| + bool unsubscribeOnError}) {
|
| + return _controller.stream.listen(onData,
|
| + onError: onError,
|
| + onDone: onDone,
|
| + unsubscribeOnError: unsubscribeOnError);
|
| }
|
|
|
| - _FileOutputStream.fromStdio(int fd) {
|
| - assert(1 <= fd && fd <= 2);
|
| - _file = _File._openStdioSync(fd);
|
| + void _setupController() {
|
| + _controller = new StreamController<List<int>>(
|
| + onSubscriptionStateChange: _onSubscriptionStateChange,
|
| + onPauseStateChange: _onPauseStateChange);
|
| }
|
|
|
| - bool write(List<int> buffer, [bool copyBuffer = false]) {
|
| - var data = buffer;
|
| - if (copyBuffer) {
|
| - var length = buffer.length;
|
| - data = new Uint8List(length);
|
| - data.setRange(0, length, buffer, 0);
|
| - }
|
| - if (_file == null) {
|
| - _pendingOperations.add(data);
|
| + Future _closeFile() {
|
| + Future closeFuture;
|
| + if (_openedFile != null) {
|
| + Future closeFuture = _openedFile.close();
|
| + _openedFile = null;
|
| + return closeFuture;
|
| } else {
|
| - _write(data, 0, data.length);
|
| + return new Future.immediate(null);
|
| }
|
| - return false;
|
| }
|
|
|
| - bool writeFrom(List<int> buffer, [int offset = 0, int len]) {
|
| - // A copy is required by the interface.
|
| - var length = buffer.length - offset;
|
| - if (len != null) {
|
| - if (len > length) throw new RangeError.value(len);
|
| - length = len;
|
| - }
|
| - var copy = new Uint8List(length);
|
| - copy.setRange(0, length, buffer, offset);
|
| - return write(copy);
|
| + void _readBlock() {
|
| + // Don't start a new read if one is already in progress.
|
| + if (_readInProgress) return;
|
| + _readInProgress = true;
|
| + _openedFile.length()
|
| + .then((length) {
|
| + if (_position >= length) {
|
| + _readInProgress = false;
|
| + if (!_unsubscribed) {
|
| + _closeFile().then((_) { _controller.close(); });
|
| + _unsubscribed = true;
|
| + }
|
| + return null;
|
| + } else {
|
| + return _openedFile.read(_blockSize);
|
| + }
|
| + })
|
| + .then((block) {
|
| + _readInProgress = false;
|
| + if (block == null || _unsubscribed) {
|
| + return;
|
| + }
|
| + _position += block.length;
|
| + if (_paused) {
|
| + _currentBlock = block;
|
| + } else {
|
| + _controller.add(block);
|
| + _readBlock();
|
| + }
|
| + })
|
| + .catchError((e) {
|
| + if (!_unsubscribed) {
|
| + _controller.signalError(e);
|
| + _closeFile().then((_) { _controller.close(); });
|
| + _unsubscribed = true;
|
| + }
|
| + });
|
| }
|
|
|
| -
|
| - void flush() {
|
| - if (_file == null) {
|
| - _pendingOperations.add(_PendingOperation.FLUSH);
|
| + void _start() {
|
| + Future<RandomAccessFile> openFuture;
|
| + if (_name != null) {
|
| + openFuture = new File(_name).open(FileMode.READ);
|
| } else {
|
| - _file.flush().then((ignored) => null);
|
| + openFuture = new Future.immediate(_File._openStdioSync(0));
|
| }
|
| - }
|
| -
|
| -
|
| - void close() {
|
| - _streamMarkedClosed = true;
|
| - if (_file == null) {
|
| - _pendingOperations.add(_PendingOperation.CLOSE);
|
| - } else if (!_closeCallbackScheduled) {
|
| - _file.close().then((ignore) {
|
| - if (_onClosed != null) _onClosed();
|
| + openFuture
|
| + .then((RandomAccessFile opened) {
|
| + _openedFile = opened;
|
| + _readBlock();
|
| + })
|
| + .catchError((e) {
|
| + _controller.signalError(e);
|
| + _controller.close();
|
| });
|
| - _closeCallbackScheduled = true;
|
| - }
|
| }
|
|
|
| - void set onNoPendingWrites(void callback()) {
|
| - _onNoPendingWrites = callback;
|
| - if ((_pendingOperations == null || _pendingOperations.length == 0) &&
|
| - outstandingWrites == 0 &&
|
| - !_streamMarkedClosed &&
|
| - _onNoPendingWrites != null) {
|
| - Timer.run(() {
|
| - if (_onNoPendingWrites != null) {
|
| - _onNoPendingWrites();
|
| - }
|
| - });
|
| + void _resume() {
|
| + _paused = false;
|
| + if (_currentBlock != null) {
|
| + _controller.add(_currentBlock);
|
| + _currentBlock = null;
|
| }
|
| + // Resume reading unless we are already done.
|
| + if (_openedFile != null) _readBlock();
|
| }
|
|
|
| - void set onClosed(void callback()) {
|
| - _onClosed = callback;
|
| + void _onSubscriptionStateChange() {
|
| + if (_controller.hasSubscribers) {
|
| + _start();
|
| + } else {
|
| + _unsubscribed = true;
|
| + _closeFile();
|
| + }
|
| }
|
|
|
| - void _processPendingOperations() {
|
| - _pendingOperations.forEach((buffer) {
|
| - if (buffer is _PendingOperation) {
|
| - if (identical(buffer, _PendingOperation.CLOSE)) {
|
| - close();
|
| - } else {
|
| - assert(identical(buffer, _PendingOperation.FLUSH));
|
| - flush();
|
| - }
|
| - } else {
|
| - write(buffer);
|
| - }
|
| - });
|
| - _pendingOperations = null;
|
| - }
|
| -
|
| - void _write(List<int> buffer, int offset, int len) {
|
| - outstandingWrites++;
|
| - var writeListFuture = _file.writeList(buffer, offset, len);
|
| - writeListFuture.then((ignore) {
|
| - outstandingWrites--;
|
| - if (outstandingWrites == 0 &&
|
| - !_streamMarkedClosed &&
|
| - _onNoPendingWrites != null) {
|
| - _onNoPendingWrites();
|
| - }
|
| - }).catchError((e) {
|
| - outstandingWrites--;
|
| - _reportError(e.error);
|
| - });
|
| + void _onPauseStateChange() {
|
| + if (_controller.isPaused) {
|
| + _paused = true;
|
| + } else {
|
| + _resume();
|
| + }
|
| }
|
| +}
|
|
|
| - bool get closed => _streamMarkedClosed;
|
| -
|
| - RandomAccessFile _file;
|
| -
|
| - // When this is set to true the stream is marked closed. When a
|
| - // stream is marked closed no more data can be written.
|
| - bool _streamMarkedClosed = false;
|
| -
|
| - // When this is set to true, the close callback has been scheduled and the
|
| - // stream will be fully closed once it's called.
|
| - bool _closeCallbackScheduled = false;
|
| +class _FileStreamConsumer extends StreamConsumer<List<int>, File> {
|
| + File _file;
|
| + Future<RandomAccessFile> _openFuture;
|
| + StreamSubscription _subscription;
|
|
|
| - // Number of writes that have not yet completed.
|
| - int outstandingWrites = 0;
|
| + _FileStreamConsumer(File this._file, FileMode mode) {
|
| + _openFuture = _file.open(mode);
|
| + }
|
|
|
| - // List of pending writes that were issued before the underlying
|
| - // file was successfully opened.
|
| - List _pendingOperations;
|
| + _FileStreamConsumer.fromStdio(int fd) {
|
| + assert(1 <= fd && fd <= 2);
|
| + _openFuture = new Future.immediate(_File._openStdioSync(fd));
|
| + }
|
|
|
| - Function _onNoPendingWrites;
|
| - Function _onClosed;
|
| + Future<File> consume(Stream<List<int>> stream) {
|
| + Completer<File> completer = new Completer<File>();
|
| + _openFuture
|
| + .then((openedFile) {
|
| + _subscription = stream.listen(
|
| + (d) {
|
| + _subscription.pause();
|
| + openedFile.writeList(d, 0, d.length)
|
| + .then((_) => _subscription.resume())
|
| + .catchError((e) {
|
| + openedFile.close();
|
| + completer.completeError(e);
|
| + });
|
| + },
|
| + onDone: () {
|
| + // Wait for the file to close (and therefore flush) before
|
| + // completing the future.
|
| + openedFile.close()
|
| + .then((_) {
|
| + completer.complete(_file);
|
| + })
|
| + .catchError((e) {
|
| + completer.completeError(e);
|
| + });
|
| + },
|
| + onError: (e) {
|
| + openedFile.close();
|
| + completer.completeError(e);
|
| + },
|
| + unsubscribeOnError: true);
|
| + })
|
| + .catchError((e) {
|
| + completer.completeError(e);
|
| + });
|
| + return completer.future;
|
| + }
|
| }
|
|
|
| +
|
| const int _EXISTS_REQUEST = 0;
|
| const int _CREATE_REQUEST = 1;
|
| const int _DELETE_REQUEST = 2;
|
| @@ -555,36 +464,35 @@ class _File extends _FileBase implements File {
|
| return result;
|
| }
|
|
|
| - InputStream openInputStream() {
|
| - return new _FileInputStream(_name);
|
| + Stream<List<int>> openRead() {
|
| + return new _FileStream(_name);
|
| }
|
|
|
| - OutputStream openOutputStream([FileMode mode = FileMode.WRITE]) {
|
| + IOSink<File> openWrite([FileMode mode = FileMode.WRITE]) {
|
| if (mode != FileMode.WRITE &&
|
| mode != FileMode.APPEND) {
|
| throw new FileIOException(
|
| "Wrong FileMode. Use FileMode.WRITE or FileMode.APPEND");
|
| }
|
| - return new _FileOutputStream(_name, mode);
|
| + var consumer = new _FileStreamConsumer(this, mode);
|
| + return new IOSink<File>(consumer);
|
| }
|
|
|
| Future<List<int>> readAsBytes() {
|
| _ensureFileService();
|
| Completer<List<int>> completer = new Completer<List<int>>();
|
| var chunks = new _BufferList();
|
| - var stream = openInputStream();
|
| - stream.onClosed = () {
|
| - var result = chunks.readBytes(chunks.length);
|
| - if (result == null) result = <int>[];
|
| - completer.complete(result);
|
| - };
|
| - stream.onData = () {
|
| - var chunk = stream.read();
|
| - chunks.add(chunk);
|
| - };
|
| - stream.onError = (e) {
|
| - completer.completeError(e);
|
| - };
|
| + openRead().listen(
|
| + (d) => chunks.add(d),
|
| + onDone: () {
|
| + var result = chunks.readBytes(chunks.length);
|
| + if (result == null) result = <int>[];
|
| + completer.complete(result);
|
| + },
|
| + onError: (e) {
|
| + completer.completeError(e);
|
| + },
|
| + unsubscribeOnError: true);
|
| return completer.future;
|
| }
|
|
|
| @@ -603,67 +511,54 @@ class _File extends _FileBase implements File {
|
| Future<String> readAsString([Encoding encoding = Encoding.UTF_8]) {
|
| _ensureFileService();
|
| return readAsBytes().then((bytes) {
|
| - if (bytes.length == 0) return "";
|
| - var decoder = _StringDecoders.decoder(encoding);
|
| - decoder.write(bytes);
|
| - return decoder.decoded();
|
| + return _decodeString(bytes, encoding);
|
| });
|
| }
|
|
|
| String readAsStringSync([Encoding encoding = Encoding.UTF_8]) {
|
| - var decoder = _StringDecoders.decoder(encoding);
|
| List<int> bytes = readAsBytesSync();
|
| - if (bytes.length == 0) return "";
|
| - decoder.write(bytes);
|
| - return decoder.decoded();
|
| + return _decodeString(bytes, encoding);
|
| }
|
|
|
| - List<String> _getDecodedLines(_StringDecoder decoder) {
|
| - List<String> result = [];
|
| - var line = decoder.decodedLine;
|
| - while (line != null) {
|
| - result.add(line);
|
| - line = decoder.decodedLine;
|
| - }
|
| - // If there is more data with no terminating line break we treat
|
| - // it as the last line.
|
| - var data = decoder.decoded();
|
| - if (data != null) {
|
| - result.add(data);
|
| - }
|
| - return result;
|
| + static List<String> _decodeLines(List<int> bytes, Encoding encoding) {
|
| + if (bytes.length == 0) return [];
|
| + var list = [];
|
| + var controller = new StreamController();
|
| + controller.stream
|
| + .transform(new StringDecoder(encoding))
|
| + .transform(new LineTransformer())
|
| + .listen((line) => list.add(line));
|
| + controller.add(bytes);
|
| + controller.close();
|
| + return list;
|
| }
|
|
|
| Future<List<String>> readAsLines([Encoding encoding = Encoding.UTF_8]) {
|
| _ensureFileService();
|
| Completer<List<String>> completer = new Completer<List<String>>();
|
| return readAsBytes().then((bytes) {
|
| - var decoder = _StringDecoders.decoder(encoding);
|
| - decoder.write(bytes);
|
| - return _getDecodedLines(decoder);
|
| + return _decodeLines(bytes, encoding);
|
| });
|
| }
|
|
|
| List<String> readAsLinesSync([Encoding encoding = Encoding.UTF_8]) {
|
| - var decoder = _StringDecoders.decoder(encoding);
|
| - List<int> bytes = readAsBytesSync();
|
| - decoder.write(bytes);
|
| - return _getDecodedLines(decoder);
|
| + return _decodeLines(readAsBytesSync(), encoding);
|
| }
|
|
|
| Future<File> writeAsBytes(List<int> bytes,
|
| [FileMode mode = FileMode.WRITE]) {
|
| Completer<File> completer = new Completer<File>();
|
| try {
|
| - var stream = openOutputStream(mode);
|
| - stream.write(bytes);
|
| + var stream = openWrite(mode);
|
| + stream.add(bytes);
|
| stream.close();
|
| - stream.onClosed = () {
|
| - completer.complete(this);
|
| - };
|
| - stream.onError = (e) {
|
| - completer.completeError(e);
|
| - };
|
| + stream.done
|
| + .then((_) {
|
| + completer.complete(this);
|
| + })
|
| + .catchError((e) {
|
| + completer.completeError(e);
|
| + });
|
| } catch (e) {
|
| Timer.run(() => completer.completeError(e));
|
| return completer.future;
|
| @@ -681,8 +576,7 @@ class _File extends _FileBase implements File {
|
| {FileMode mode: FileMode.WRITE,
|
| Encoding encoding: Encoding.UTF_8}) {
|
| try {
|
| - var data = _StringEncoders.encoder(encoding).encodeString(contents);
|
| - return writeAsBytes(data, mode);
|
| + return writeAsBytes(_encodeString(contents, encoding), mode);
|
| } catch (e) {
|
| var completer = new Completer();
|
| Timer.run(() => completer.completeError(e));
|
| @@ -693,8 +587,7 @@ class _File extends _FileBase implements File {
|
| void writeAsStringSync(String contents,
|
| {FileMode mode: FileMode.WRITE,
|
| Encoding encoding: Encoding.UTF_8}) {
|
| - var data = _StringEncoders.encoder(encoding).encodeString(contents);
|
| - writeAsBytesSync(data, mode);
|
| + writeAsBytesSync(_encodeString(contents, encoding), mode);
|
| }
|
|
|
| String get name => _name;
|
| @@ -987,7 +880,7 @@ class _RandomAccessFile extends _FileBase implements RandomAccessFile {
|
| });
|
| return completer.future;
|
| }
|
| - var data = _StringEncoders.encoder(encoding).encodeString(string);
|
| + var data = _encodeString(string, encoding);
|
| return writeList(data, 0, data.length);
|
| }
|
|
|
| @@ -996,7 +889,7 @@ class _RandomAccessFile extends _FileBase implements RandomAccessFile {
|
| throw new FileIOException(
|
| "Invalid encoding in writeStringSync: $encoding");
|
| }
|
| - var data = _StringEncoders.encoder(encoding).encodeString(string);
|
| + var data = _encodeString(string, encoding);
|
| return writeListSync(data, 0, data.length);
|
| }
|
|
|
|
|