Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(51)

Unified Diff: sdk/lib/io/file_impl.dart

Issue 227733002: Don't call read, if last read returned less bytes than was being read. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/file_impl.dart
diff --git a/sdk/lib/io/file_impl.dart b/sdk/lib/io/file_impl.dart
index 4e8e1cc16573d66795075fa6c895e96a1c5dc986..aa2e495645abb0a6464a060656daad4b7adf2f42 100644
--- a/sdk/lib/io/file_impl.dart
+++ b/sdk/lib/io/file_impl.dart
@@ -20,28 +20,25 @@ class _FileStream extends Stream<List<int>> {
final Completer _closeCompleter = new Completer();
// Has the stream been paused or unsubscribed?
- bool _paused = false;
bool _unsubscribed = false;
// Is there a read currently in progress?
- bool _readInProgress = false;
+ bool _readInProgress = true;
bool _closed = false;
- // Block read but not yet send because stream is paused.
- List<int> _currentBlock;
+ bool _atEnd = false;
_FileStream(this._path, this._position, this._end) {
- _setupController();
+ if (_position == null) _position = 0;
}
- _FileStream.forStdin() : _position = 0 {
- _setupController();
- }
+ _FileStream.forStdin() : _position = 0;
StreamSubscription<List<int>> listen(void onData(List<int> event),
{Function onError,
void onDone(),
bool cancelOnError}) {
+ _setupController();
return _controller.stream.listen(onData,
onError: onError,
onDone: onDone,
@@ -51,8 +48,7 @@ class _FileStream extends Stream<List<int>> {
void _setupController() {
_controller = new StreamController<List<int>>(sync: true,
onListen: _start,
- onPause: () => _paused = true,
- onResume: _resume,
+ onResume: _readBlock,
onCancel: () {
_unsubscribed = true;
return _closeFile();
@@ -64,24 +60,25 @@ class _FileStream extends Stream<List<int>> {
return _closeCompleter.future;
}
_closed = true;
+
void done() {
_closeCompleter.complete();
_controller.close();
}
- if (_openedFile != null) {
- _openedFile.close()
- .catchError(_controller.addError)
- .whenComplete(done);
- _openedFile = null;
- } else {
- done();
- }
+
+ _openedFile.close()
+ .catchError(_controller.addError)
+ .whenComplete(done);
return _closeCompleter.future;
}
void _readBlock() {
// Don't start a new read if one is already in progress.
if (_readInProgress) return;
+ if (_atEnd) {
+ _closeFile();
+ return;
+ }
_readInProgress = true;
int readBytes = _BLOCK_SIZE;
if (_end != null) {
@@ -97,32 +94,25 @@ class _FileStream extends Stream<List<int>> {
}
}
_openedFile.read(readBytes)
- .whenComplete(() {
- _readInProgress = false;
- })
.then((block) {
if (_unsubscribed) {
_closeFile();
return;
}
- if (block.length == 0) {
- if (!_unsubscribed) {
- _closeFile();
- _unsubscribed = true;
- }
- return;
- }
+ _readInProgress = false;
_position += block.length;
- if (_paused) {
- _currentBlock = block;
- } else {
- _controller.add(block);
+ if (block.length < readBytes ||
+ (_end != null && _position == _end)) {
+ _atEnd = true;
+ }
+ _controller.add(block);
+ if (!_controller.isPaused) {
_readBlock();
}
})
- .catchError((e) {
+ .catchError((e, s) {
if (!_unsubscribed) {
- _controller.addError(e);
+ _controller.addError(e, s);
_closeFile();
_unsubscribed = true;
}
@@ -130,45 +120,47 @@ class _FileStream extends Stream<List<int>> {
}
void _start() {
- if (_position == null) {
- _position = 0;
- } else if (_position < 0) {
+ if (_position < 0) {
_controller.addError(new RangeError("Bad start position: $_position"));
_controller.close();
return;
}
- Future<RandomAccessFile> openFuture;
- if (_path != null) {
- openFuture = new File(_path).open(mode: FileMode.READ);
- } else {
- openFuture = new Future.value(_File._openStdioSync(0));
+
+ void onReady(RandomAccessFile file) {
+ _openedFile = file;
+ _readInProgress = false;
+ _readBlock();
+ }
+
+ void onOpenFile(RandomAccessFile file) {
+ if (_position > 0) {
+ file.setPosition(_position)
+ .then(onReady, onError: (e, s) {
+ _controller.addError(e, s);
+ _readInProgress = false;
+ _closeFile();
+ });
+ } else {
+ onReady(file);
+ }
+ }
+
+ void openFailed(error, stackTrace) {
+ _controller.addError(error, stackTrace);
+ _controller.close();
+ _closeCompleter.complete();
}
- _readInProgress = true;
- openFuture
- .then((RandomAccessFile opened) {
- _openedFile = opened;
- if (_position > 0) {
- return opened.setPosition(_position);
- }
- })
- .whenComplete(() {
- _readInProgress = false;
- })
- .then((_) => _readBlock())
- .catchError((e) {
- _controller.addError(e);
- _closeFile();
- });
- }
- void _resume() {
- _paused = false;
- if (_currentBlock != null) {
- _controller.add(_currentBlock);
- _currentBlock = null;
+ if (_path != null) {
+ new File(_path).open(mode: FileMode.READ)
+ .then(onOpenFile, onError: openFailed);
+ } else {
+ try {
+ onOpenFile(_File._openStdioSync(0));
+ } catch (e, s) {
+ openFailed(e, s);
+ }
}
- // Resume reading unless we are already done.
- if (_openedFile != null) _readBlock();
}
}
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698