Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(573)

Unified Diff: sdk/lib/io/file_impl.dart

Issue 12316036: Merge IO v2 branch to bleeding edge (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebased to r18818 Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/io/file.dart ('k') | sdk/lib/io/http.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/file_impl.dart
diff --git a/sdk/lib/io/file_impl.dart b/sdk/lib/io/file_impl.dart
index f6cade265b754d6e6655c9be39f3e57b4750c928..ce60209d129e6eb8f842f077d208d37a25602c29 100644
--- a/sdk/lib/io/file_impl.dart
+++ b/sdk/lib/io/file_impl.dart
@@ -4,293 +4,202 @@
part of dart.io;
-class _FileInputStream extends _BaseDataInputStream implements InputStream {
- _FileInputStream(String name)
- : _data = const [],
- _position = 0,
- _filePosition = 0 {
- var file = new File(name);
- var future = file.open(FileMode.READ);
- future.then(_setupOpenedFile)
- .catchError((e) {
- _reportError(e.error);
- });
- }
-
- _FileInputStream.fromStdio(int fd)
- : _data = const [],
- _position = 0,
- _filePosition = 0 {
- assert(fd == 0);
- _setupOpenedFile(_File._openStdioSync(fd));
- }
-
- void _setupOpenedFile(RandomAccessFile openedFile) {
- _openedFile = openedFile;
- if (_streamMarkedClosed) {
- // This input stream has already been closed.
- _fileLength = 0;
- _closeFile();
- return;
- }
- var futureOpen = _openedFile.length();
- futureOpen
- .then((len) {
- _fileLength = len;
- _fillBuffer();
- })
- .catchError((e) {
- _reportError(e.error);
- });
- }
- void _closeFile() {
- if (_openedFile == null) {
- _streamMarkedClosed = true;
- return;
- }
- if (available() == 0) _cancelScheduledDataCallback();
- if (!_openedFile.closed) {
- _openedFile.close().then((ignore) {
- _streamMarkedClosed = true;
- _checkScheduleCallbacks();
- });
- }
- }
+class _FileStream extends Stream<List<int>> {
+ // Stream controller.
+ StreamController<List<int>> _controller;
- void _fillBuffer() {
- Expect.equals(_position, _data.length);
- if (_openedFile == null) return; // Called before the file is opened.
- int size = min(_bufferLength, _fileLength - _filePosition);
- if (size == 0) {
- _closeFile();
- return;
- }
- // If there is currently a _fillBuffer call waiting on read,
- // let it fill the buffer instead of us.
- if (_activeFillBufferCall) return;
- _activeFillBufferCall = true;
- var future = _openedFile.read(size);
- future.then((data) {
- _data = data;
- _position = 0;
- _filePosition += _data.length;
- _activeFillBufferCall = false;
-
- if (_fileLength == _filePosition) {
- _closeFile();
- }
- _checkScheduleCallbacks();
- }).catchError((e) {
- _activeFillBufferCall = false;
- _reportError(e.error);
- });
- }
+ // Read the file in blocks of size 64k.
+ final int _blockSize = 64 * 1024;
- int available() {
- return closed ? 0 : _data.length - _position;
- }
+ // Information about the underlying file.
+ String _name;
+ RandomAccessFile _openedFile;
+ int _position;
- void pipe(OutputStream output, {bool close: true}) {
- _pipe(this, output, close: close);
- }
+ // Has the stream been paused or unsubscribed?
+ bool _paused = false;
+ bool _unsubscribed = false;
- void _finishRead() {
- if (_position == _data.length && !_streamMarkedClosed) {
- _fillBuffer();
- } else {
- _checkScheduleCallbacks();
- }
- }
+ // Is there a read currently in progress?
+ bool _readInProgress = false;
- List<int> _read(int bytesToRead) {
- List<int> result;
- if (_position == 0 && bytesToRead == _data.length) {
- result = _data;
- _data = const [];
- } else {
- result = new Uint8List(bytesToRead);
- result.setRange(0, bytesToRead, _data, _position);
- _position += bytesToRead;
- }
- _finishRead();
- return result;
- }
+ // Block read but not yet send because stream is paused.
+ List<int> _currentBlock;
- int _readInto(List<int> buffer, int offset, int len) {
- buffer.setRange(offset, len, _data, _position);
- _position += len;
- _finishRead();
- return len;
+ _FileStream(String this._name) : _position = 0 {
+ _setupController();
}
- void _close() {
- _data = const [];
- _position = 0;
- _filePosition = 0;
- _fileLength = 0;
- _closeFile();
+ _FileStream.forStdin() : _position = 0 {
+ _setupController();
}
- static const int _bufferLength = 64 * 1024;
-
- RandomAccessFile _openedFile;
- List<int> _data;
- int _position;
- int _filePosition;
- int _fileLength;
- bool _activeFillBufferCall = false;
-}
-
-
-class _PendingOperation {
- const _PendingOperation(this._id);
- static const _PendingOperation CLOSE = const _PendingOperation(0);
- static const _PendingOperation FLUSH = const _PendingOperation(1);
- final int _id;
-}
-
-
-class _FileOutputStream extends _BaseOutputStream implements OutputStream {
- _FileOutputStream(String name, FileMode mode) {
- _pendingOperations = new List();
- var f = new File(name);
- var openFuture = f.open(mode);
- openFuture.then((openedFile) {
- _file = openedFile;
- _processPendingOperations();
- }).catchError((e) {
- _reportError(e.error);
- });
+ StreamSubscription<List<int>> listen(void onData(List<int> event),
+ {void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError}) {
+ return _controller.stream.listen(onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
}
- _FileOutputStream.fromStdio(int fd) {
- assert(1 <= fd && fd <= 2);
- _file = _File._openStdioSync(fd);
+ void _setupController() {
+ _controller = new StreamController<List<int>>(
+ onSubscriptionStateChange: _onSubscriptionStateChange,
+ onPauseStateChange: _onPauseStateChange);
}
- bool write(List<int> buffer, [bool copyBuffer = false]) {
- var data = buffer;
- if (copyBuffer) {
- var length = buffer.length;
- data = new Uint8List(length);
- data.setRange(0, length, buffer, 0);
- }
- if (_file == null) {
- _pendingOperations.add(data);
+ Future _closeFile() {
+ Future closeFuture;
+ if (_openedFile != null) {
+ Future closeFuture = _openedFile.close();
+ _openedFile = null;
+ return closeFuture;
} else {
- _write(data, 0, data.length);
+ return new Future.immediate(null);
}
- return false;
}
- bool writeFrom(List<int> buffer, [int offset = 0, int len]) {
- // A copy is required by the interface.
- var length = buffer.length - offset;
- if (len != null) {
- if (len > length) throw new RangeError.value(len);
- length = len;
- }
- var copy = new Uint8List(length);
- copy.setRange(0, length, buffer, offset);
- return write(copy);
+ void _readBlock() {
+ // Don't start a new read if one is already in progress.
+ if (_readInProgress) return;
+ _readInProgress = true;
+ _openedFile.length()
+ .then((length) {
+ if (_position >= length) {
+ _readInProgress = false;
+ if (!_unsubscribed) {
+ _closeFile().then((_) { _controller.close(); });
+ _unsubscribed = true;
+ }
+ return null;
+ } else {
+ return _openedFile.read(_blockSize);
+ }
+ })
+ .then((block) {
+ _readInProgress = false;
+ if (block == null || _unsubscribed) {
+ return;
+ }
+ _position += block.length;
+ if (_paused) {
+ _currentBlock = block;
+ } else {
+ _controller.add(block);
+ _readBlock();
+ }
+ })
+ .catchError((e) {
+ if (!_unsubscribed) {
+ _controller.signalError(e);
+ _closeFile().then((_) { _controller.close(); });
+ _unsubscribed = true;
+ }
+ });
}
-
- void flush() {
- if (_file == null) {
- _pendingOperations.add(_PendingOperation.FLUSH);
+ void _start() {
+ Future<RandomAccessFile> openFuture;
+ if (_name != null) {
+ openFuture = new File(_name).open(FileMode.READ);
} else {
- _file.flush().then((ignored) => null);
+ openFuture = new Future.immediate(_File._openStdioSync(0));
}
- }
-
-
- void close() {
- _streamMarkedClosed = true;
- if (_file == null) {
- _pendingOperations.add(_PendingOperation.CLOSE);
- } else if (!_closeCallbackScheduled) {
- _file.close().then((ignore) {
- if (_onClosed != null) _onClosed();
+ openFuture
+ .then((RandomAccessFile opened) {
+ _openedFile = opened;
+ _readBlock();
+ })
+ .catchError((e) {
+ _controller.signalError(e);
+ _controller.close();
});
- _closeCallbackScheduled = true;
- }
}
- void set onNoPendingWrites(void callback()) {
- _onNoPendingWrites = callback;
- if ((_pendingOperations == null || _pendingOperations.length == 0) &&
- outstandingWrites == 0 &&
- !_streamMarkedClosed &&
- _onNoPendingWrites != null) {
- Timer.run(() {
- if (_onNoPendingWrites != null) {
- _onNoPendingWrites();
- }
- });
+ void _resume() {
+ _paused = false;
+ if (_currentBlock != null) {
+ _controller.add(_currentBlock);
+ _currentBlock = null;
}
+ // Resume reading unless we are already done.
+ if (_openedFile != null) _readBlock();
}
- void set onClosed(void callback()) {
- _onClosed = callback;
+ void _onSubscriptionStateChange() {
+ if (_controller.hasSubscribers) {
+ _start();
+ } else {
+ _unsubscribed = true;
+ _closeFile();
+ }
}
- void _processPendingOperations() {
- _pendingOperations.forEach((buffer) {
- if (buffer is _PendingOperation) {
- if (identical(buffer, _PendingOperation.CLOSE)) {
- close();
- } else {
- assert(identical(buffer, _PendingOperation.FLUSH));
- flush();
- }
- } else {
- write(buffer);
- }
- });
- _pendingOperations = null;
- }
-
- void _write(List<int> buffer, int offset, int len) {
- outstandingWrites++;
- var writeListFuture = _file.writeList(buffer, offset, len);
- writeListFuture.then((ignore) {
- outstandingWrites--;
- if (outstandingWrites == 0 &&
- !_streamMarkedClosed &&
- _onNoPendingWrites != null) {
- _onNoPendingWrites();
- }
- }).catchError((e) {
- outstandingWrites--;
- _reportError(e.error);
- });
+ void _onPauseStateChange() {
+ if (_controller.isPaused) {
+ _paused = true;
+ } else {
+ _resume();
+ }
}
+}
- bool get closed => _streamMarkedClosed;
-
- RandomAccessFile _file;
-
- // When this is set to true the stream is marked closed. When a
- // stream is marked closed no more data can be written.
- bool _streamMarkedClosed = false;
-
- // When this is set to true, the close callback has been scheduled and the
- // stream will be fully closed once it's called.
- bool _closeCallbackScheduled = false;
+class _FileStreamConsumer extends StreamConsumer<List<int>, File> {
+ File _file;
+ Future<RandomAccessFile> _openFuture;
+ StreamSubscription _subscription;
- // Number of writes that have not yet completed.
- int outstandingWrites = 0;
+ _FileStreamConsumer(File this._file, FileMode mode) {
+ _openFuture = _file.open(mode);
+ }
- // List of pending writes that were issued before the underlying
- // file was successfully opened.
- List _pendingOperations;
+ _FileStreamConsumer.fromStdio(int fd) {
+ assert(1 <= fd && fd <= 2);
+ _openFuture = new Future.immediate(_File._openStdioSync(fd));
+ }
- Function _onNoPendingWrites;
- Function _onClosed;
+ Future<File> consume(Stream<List<int>> stream) {
+ Completer<File> completer = new Completer<File>();
+ _openFuture
+ .then((openedFile) {
+ _subscription = stream.listen(
+ (d) {
+ _subscription.pause();
+ openedFile.writeList(d, 0, d.length)
+ .then((_) => _subscription.resume())
+ .catchError((e) {
+ openedFile.close();
+ completer.completeError(e);
+ });
+ },
+ onDone: () {
+ // Wait for the file to close (and therefore flush) before
+ // completing the future.
+ openedFile.close()
+ .then((_) {
+ completer.complete(_file);
+ })
+ .catchError((e) {
+ completer.completeError(e);
+ });
+ },
+ onError: (e) {
+ openedFile.close();
+ completer.completeError(e);
+ },
+ unsubscribeOnError: true);
+ })
+ .catchError((e) {
+ completer.completeError(e);
+ });
+ return completer.future;
+ }
}
+
const int _EXISTS_REQUEST = 0;
const int _CREATE_REQUEST = 1;
const int _DELETE_REQUEST = 2;
@@ -555,36 +464,35 @@ class _File extends _FileBase implements File {
return result;
}
- InputStream openInputStream() {
- return new _FileInputStream(_name);
+ Stream<List<int>> openRead() {
+ return new _FileStream(_name);
}
- OutputStream openOutputStream([FileMode mode = FileMode.WRITE]) {
+ IOSink<File> openWrite([FileMode mode = FileMode.WRITE]) {
if (mode != FileMode.WRITE &&
mode != FileMode.APPEND) {
throw new FileIOException(
"Wrong FileMode. Use FileMode.WRITE or FileMode.APPEND");
}
- return new _FileOutputStream(_name, mode);
+ var consumer = new _FileStreamConsumer(this, mode);
+ return new IOSink<File>(consumer);
}
Future<List<int>> readAsBytes() {
_ensureFileService();
Completer<List<int>> completer = new Completer<List<int>>();
var chunks = new _BufferList();
- var stream = openInputStream();
- stream.onClosed = () {
- var result = chunks.readBytes(chunks.length);
- if (result == null) result = <int>[];
- completer.complete(result);
- };
- stream.onData = () {
- var chunk = stream.read();
- chunks.add(chunk);
- };
- stream.onError = (e) {
- completer.completeError(e);
- };
+ openRead().listen(
+ (d) => chunks.add(d),
+ onDone: () {
+ var result = chunks.readBytes(chunks.length);
+ if (result == null) result = <int>[];
+ completer.complete(result);
+ },
+ onError: (e) {
+ completer.completeError(e);
+ },
+ unsubscribeOnError: true);
return completer.future;
}
@@ -603,67 +511,54 @@ class _File extends _FileBase implements File {
Future<String> readAsString([Encoding encoding = Encoding.UTF_8]) {
_ensureFileService();
return readAsBytes().then((bytes) {
- if (bytes.length == 0) return "";
- var decoder = _StringDecoders.decoder(encoding);
- decoder.write(bytes);
- return decoder.decoded();
+ return _decodeString(bytes, encoding);
});
}
String readAsStringSync([Encoding encoding = Encoding.UTF_8]) {
- var decoder = _StringDecoders.decoder(encoding);
List<int> bytes = readAsBytesSync();
- if (bytes.length == 0) return "";
- decoder.write(bytes);
- return decoder.decoded();
+ return _decodeString(bytes, encoding);
}
- List<String> _getDecodedLines(_StringDecoder decoder) {
- List<String> result = [];
- var line = decoder.decodedLine;
- while (line != null) {
- result.add(line);
- line = decoder.decodedLine;
- }
- // If there is more data with no terminating line break we treat
- // it as the last line.
- var data = decoder.decoded();
- if (data != null) {
- result.add(data);
- }
- return result;
+ static List<String> _decodeLines(List<int> bytes, Encoding encoding) {
+ if (bytes.length == 0) return [];
+ var list = [];
+ var controller = new StreamController();
+ controller.stream
+ .transform(new StringDecoder(encoding))
+ .transform(new LineTransformer())
+ .listen((line) => list.add(line));
+ controller.add(bytes);
+ controller.close();
+ return list;
}
Future<List<String>> readAsLines([Encoding encoding = Encoding.UTF_8]) {
_ensureFileService();
Completer<List<String>> completer = new Completer<List<String>>();
return readAsBytes().then((bytes) {
- var decoder = _StringDecoders.decoder(encoding);
- decoder.write(bytes);
- return _getDecodedLines(decoder);
+ return _decodeLines(bytes, encoding);
});
}
List<String> readAsLinesSync([Encoding encoding = Encoding.UTF_8]) {
- var decoder = _StringDecoders.decoder(encoding);
- List<int> bytes = readAsBytesSync();
- decoder.write(bytes);
- return _getDecodedLines(decoder);
+ return _decodeLines(readAsBytesSync(), encoding);
}
Future<File> writeAsBytes(List<int> bytes,
[FileMode mode = FileMode.WRITE]) {
Completer<File> completer = new Completer<File>();
try {
- var stream = openOutputStream(mode);
- stream.write(bytes);
+ var stream = openWrite(mode);
+ stream.add(bytes);
stream.close();
- stream.onClosed = () {
- completer.complete(this);
- };
- stream.onError = (e) {
- completer.completeError(e);
- };
+ stream.done
+ .then((_) {
+ completer.complete(this);
+ })
+ .catchError((e) {
+ completer.completeError(e);
+ });
} catch (e) {
Timer.run(() => completer.completeError(e));
return completer.future;
@@ -681,8 +576,7 @@ class _File extends _FileBase implements File {
{FileMode mode: FileMode.WRITE,
Encoding encoding: Encoding.UTF_8}) {
try {
- var data = _StringEncoders.encoder(encoding).encodeString(contents);
- return writeAsBytes(data, mode);
+ return writeAsBytes(_encodeString(contents, encoding), mode);
} catch (e) {
var completer = new Completer();
Timer.run(() => completer.completeError(e));
@@ -693,8 +587,7 @@ class _File extends _FileBase implements File {
void writeAsStringSync(String contents,
{FileMode mode: FileMode.WRITE,
Encoding encoding: Encoding.UTF_8}) {
- var data = _StringEncoders.encoder(encoding).encodeString(contents);
- writeAsBytesSync(data, mode);
+ writeAsBytesSync(_encodeString(contents, encoding), mode);
}
String get name => _name;
@@ -987,7 +880,7 @@ class _RandomAccessFile extends _FileBase implements RandomAccessFile {
});
return completer.future;
}
- var data = _StringEncoders.encoder(encoding).encodeString(string);
+ var data = _encodeString(string, encoding);
return writeList(data, 0, data.length);
}
@@ -996,7 +889,7 @@ class _RandomAccessFile extends _FileBase implements RandomAccessFile {
throw new FileIOException(
"Invalid encoding in writeStringSync: $encoding");
}
- var data = _StringEncoders.encoder(encoding).encodeString(string);
+ var data = _encodeString(string, encoding);
return writeListSync(data, 0, data.length);
}
« no previous file with comments | « sdk/lib/io/file.dart ('k') | sdk/lib/io/http.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698