Index: sdk/lib/io/file_impl.dart |
diff --git a/sdk/lib/io/file_impl.dart b/sdk/lib/io/file_impl.dart |
index 4e8e1cc16573d66795075fa6c895e96a1c5dc986..aa2e495645abb0a6464a060656daad4b7adf2f42 100644 |
--- a/sdk/lib/io/file_impl.dart |
+++ b/sdk/lib/io/file_impl.dart |
@@ -20,28 +20,25 @@ class _FileStream extends Stream<List<int>> { |
final Completer _closeCompleter = new Completer(); |
// Has the stream been paused or unsubscribed? |
- bool _paused = false; |
bool _unsubscribed = false; |
// Is there a read currently in progress? |
- bool _readInProgress = false; |
+ bool _readInProgress = true; |
bool _closed = false; |
- // Block read but not yet send because stream is paused. |
- List<int> _currentBlock; |
+ bool _atEnd = false; |
_FileStream(this._path, this._position, this._end) { |
- _setupController(); |
+ if (_position == null) _position = 0; |
} |
- _FileStream.forStdin() : _position = 0 { |
- _setupController(); |
- } |
+ _FileStream.forStdin() : _position = 0; |
StreamSubscription<List<int>> listen(void onData(List<int> event), |
{Function onError, |
void onDone(), |
bool cancelOnError}) { |
+ _setupController(); |
return _controller.stream.listen(onData, |
onError: onError, |
onDone: onDone, |
@@ -51,8 +48,7 @@ class _FileStream extends Stream<List<int>> { |
void _setupController() { |
_controller = new StreamController<List<int>>(sync: true, |
onListen: _start, |
- onPause: () => _paused = true, |
- onResume: _resume, |
+ onResume: _readBlock, |
onCancel: () { |
_unsubscribed = true; |
return _closeFile(); |
@@ -64,24 +60,25 @@ class _FileStream extends Stream<List<int>> { |
return _closeCompleter.future; |
} |
_closed = true; |
+ |
void done() { |
_closeCompleter.complete(); |
_controller.close(); |
} |
- if (_openedFile != null) { |
- _openedFile.close() |
- .catchError(_controller.addError) |
- .whenComplete(done); |
- _openedFile = null; |
- } else { |
- done(); |
- } |
+ |
+ _openedFile.close() |
+ .catchError(_controller.addError) |
+ .whenComplete(done); |
return _closeCompleter.future; |
} |
void _readBlock() { |
// Don't start a new read if one is already in progress. |
if (_readInProgress) return; |
+ if (_atEnd) { |
+ _closeFile(); |
+ return; |
+ } |
_readInProgress = true; |
int readBytes = _BLOCK_SIZE; |
if (_end != null) { |
@@ -97,32 +94,25 @@ class _FileStream extends Stream<List<int>> { |
} |
} |
_openedFile.read(readBytes) |
- .whenComplete(() { |
- _readInProgress = false; |
- }) |
.then((block) { |
if (_unsubscribed) { |
_closeFile(); |
return; |
} |
- if (block.length == 0) { |
- if (!_unsubscribed) { |
- _closeFile(); |
- _unsubscribed = true; |
- } |
- return; |
- } |
+ _readInProgress = false; |
_position += block.length; |
- if (_paused) { |
- _currentBlock = block; |
- } else { |
- _controller.add(block); |
+ if (block.length < readBytes || |
+ (_end != null && _position == _end)) { |
+ _atEnd = true; |
+ } |
+ _controller.add(block); |
+ if (!_controller.isPaused) { |
_readBlock(); |
} |
}) |
- .catchError((e) { |
+ .catchError((e, s) { |
if (!_unsubscribed) { |
- _controller.addError(e); |
+ _controller.addError(e, s); |
_closeFile(); |
_unsubscribed = true; |
} |
@@ -130,45 +120,47 @@ class _FileStream extends Stream<List<int>> { |
} |
void _start() { |
- if (_position == null) { |
- _position = 0; |
- } else if (_position < 0) { |
+ if (_position < 0) { |
_controller.addError(new RangeError("Bad start position: $_position")); |
_controller.close(); |
return; |
} |
- Future<RandomAccessFile> openFuture; |
- if (_path != null) { |
- openFuture = new File(_path).open(mode: FileMode.READ); |
- } else { |
- openFuture = new Future.value(_File._openStdioSync(0)); |
+ |
+ void onReady(RandomAccessFile file) { |
+ _openedFile = file; |
+ _readInProgress = false; |
+ _readBlock(); |
+ } |
+ |
+ void onOpenFile(RandomAccessFile file) { |
+ if (_position > 0) { |
+ file.setPosition(_position) |
+ .then(onReady, onError: (e, s) { |
+ _controller.addError(e, s); |
+ _readInProgress = false; |
+ _closeFile(); |
+ }); |
+ } else { |
+ onReady(file); |
+ } |
+ } |
+ |
+ void openFailed(error, stackTrace) { |
+ _controller.addError(error, stackTrace); |
+ _controller.close(); |
+ _closeCompleter.complete(); |
} |
- _readInProgress = true; |
- openFuture |
- .then((RandomAccessFile opened) { |
- _openedFile = opened; |
- if (_position > 0) { |
- return opened.setPosition(_position); |
- } |
- }) |
- .whenComplete(() { |
- _readInProgress = false; |
- }) |
- .then((_) => _readBlock()) |
- .catchError((e) { |
- _controller.addError(e); |
- _closeFile(); |
- }); |
- } |
- void _resume() { |
- _paused = false; |
- if (_currentBlock != null) { |
- _controller.add(_currentBlock); |
- _currentBlock = null; |
+ if (_path != null) { |
+ new File(_path).open(mode: FileMode.READ) |
+ .then(onOpenFile, onError: openFailed); |
+ } else { |
+ try { |
+ onOpenFile(_File._openStdioSync(0)); |
+ } catch (e, s) { |
+ openFailed(e, s); |
+ } |
} |
- // Resume reading unless we are already done. |
- if (_openedFile != null) _readBlock(); |
} |
} |