| Index: sdk/lib/io/file_impl.dart
|
| diff --git a/sdk/lib/io/file_impl.dart b/sdk/lib/io/file_impl.dart
|
| index 4e8e1cc16573d66795075fa6c895e96a1c5dc986..aa2e495645abb0a6464a060656daad4b7adf2f42 100644
|
| --- a/sdk/lib/io/file_impl.dart
|
| +++ b/sdk/lib/io/file_impl.dart
|
| @@ -20,28 +20,25 @@ class _FileStream extends Stream<List<int>> {
|
| final Completer _closeCompleter = new Completer();
|
|
|
| // Has the stream been paused or unsubscribed?
|
| - bool _paused = false;
|
| bool _unsubscribed = false;
|
|
|
| // Is there a read currently in progress?
|
| - bool _readInProgress = false;
|
| + bool _readInProgress = true;
|
| bool _closed = false;
|
|
|
| - // Block read but not yet send because stream is paused.
|
| - List<int> _currentBlock;
|
| + bool _atEnd = false;
|
|
|
| _FileStream(this._path, this._position, this._end) {
|
| - _setupController();
|
| + if (_position == null) _position = 0;
|
| }
|
|
|
| - _FileStream.forStdin() : _position = 0 {
|
| - _setupController();
|
| - }
|
| + _FileStream.forStdin() : _position = 0;
|
|
|
| StreamSubscription<List<int>> listen(void onData(List<int> event),
|
| {Function onError,
|
| void onDone(),
|
| bool cancelOnError}) {
|
| + _setupController();
|
| return _controller.stream.listen(onData,
|
| onError: onError,
|
| onDone: onDone,
|
| @@ -51,8 +48,7 @@ class _FileStream extends Stream<List<int>> {
|
| void _setupController() {
|
| _controller = new StreamController<List<int>>(sync: true,
|
| onListen: _start,
|
| - onPause: () => _paused = true,
|
| - onResume: _resume,
|
| + onResume: _readBlock,
|
| onCancel: () {
|
| _unsubscribed = true;
|
| return _closeFile();
|
| @@ -64,24 +60,25 @@ class _FileStream extends Stream<List<int>> {
|
| return _closeCompleter.future;
|
| }
|
| _closed = true;
|
| +
|
| void done() {
|
| _closeCompleter.complete();
|
| _controller.close();
|
| }
|
| - if (_openedFile != null) {
|
| - _openedFile.close()
|
| - .catchError(_controller.addError)
|
| - .whenComplete(done);
|
| - _openedFile = null;
|
| - } else {
|
| - done();
|
| - }
|
| +
|
| + _openedFile.close()
|
| + .catchError(_controller.addError)
|
| + .whenComplete(done);
|
| return _closeCompleter.future;
|
| }
|
|
|
| void _readBlock() {
|
| // Don't start a new read if one is already in progress.
|
| if (_readInProgress) return;
|
| + if (_atEnd) {
|
| + _closeFile();
|
| + return;
|
| + }
|
| _readInProgress = true;
|
| int readBytes = _BLOCK_SIZE;
|
| if (_end != null) {
|
| @@ -97,32 +94,25 @@ class _FileStream extends Stream<List<int>> {
|
| }
|
| }
|
| _openedFile.read(readBytes)
|
| - .whenComplete(() {
|
| - _readInProgress = false;
|
| - })
|
| .then((block) {
|
| if (_unsubscribed) {
|
| _closeFile();
|
| return;
|
| }
|
| - if (block.length == 0) {
|
| - if (!_unsubscribed) {
|
| - _closeFile();
|
| - _unsubscribed = true;
|
| - }
|
| - return;
|
| - }
|
| + _readInProgress = false;
|
| _position += block.length;
|
| - if (_paused) {
|
| - _currentBlock = block;
|
| - } else {
|
| - _controller.add(block);
|
| + if (block.length < readBytes ||
|
| + (_end != null && _position == _end)) {
|
| + _atEnd = true;
|
| + }
|
| + _controller.add(block);
|
| + if (!_controller.isPaused) {
|
| _readBlock();
|
| }
|
| })
|
| - .catchError((e) {
|
| + .catchError((e, s) {
|
| if (!_unsubscribed) {
|
| - _controller.addError(e);
|
| + _controller.addError(e, s);
|
| _closeFile();
|
| _unsubscribed = true;
|
| }
|
| @@ -130,45 +120,47 @@ class _FileStream extends Stream<List<int>> {
|
| }
|
|
|
| void _start() {
|
| - if (_position == null) {
|
| - _position = 0;
|
| - } else if (_position < 0) {
|
| + if (_position < 0) {
|
| _controller.addError(new RangeError("Bad start position: $_position"));
|
| _controller.close();
|
| return;
|
| }
|
| - Future<RandomAccessFile> openFuture;
|
| - if (_path != null) {
|
| - openFuture = new File(_path).open(mode: FileMode.READ);
|
| - } else {
|
| - openFuture = new Future.value(_File._openStdioSync(0));
|
| +
|
| + void onReady(RandomAccessFile file) {
|
| + _openedFile = file;
|
| + _readInProgress = false;
|
| + _readBlock();
|
| + }
|
| +
|
| + void onOpenFile(RandomAccessFile file) {
|
| + if (_position > 0) {
|
| + file.setPosition(_position)
|
| + .then(onReady, onError: (e, s) {
|
| + _controller.addError(e, s);
|
| + _readInProgress = false;
|
| + _closeFile();
|
| + });
|
| + } else {
|
| + onReady(file);
|
| + }
|
| + }
|
| +
|
| + void openFailed(error, stackTrace) {
|
| + _controller.addError(error, stackTrace);
|
| + _controller.close();
|
| + _closeCompleter.complete();
|
| }
|
| - _readInProgress = true;
|
| - openFuture
|
| - .then((RandomAccessFile opened) {
|
| - _openedFile = opened;
|
| - if (_position > 0) {
|
| - return opened.setPosition(_position);
|
| - }
|
| - })
|
| - .whenComplete(() {
|
| - _readInProgress = false;
|
| - })
|
| - .then((_) => _readBlock())
|
| - .catchError((e) {
|
| - _controller.addError(e);
|
| - _closeFile();
|
| - });
|
| - }
|
|
|
| - void _resume() {
|
| - _paused = false;
|
| - if (_currentBlock != null) {
|
| - _controller.add(_currentBlock);
|
| - _currentBlock = null;
|
| + if (_path != null) {
|
| + new File(_path).open(mode: FileMode.READ)
|
| + .then(onOpenFile, onError: openFailed);
|
| + } else {
|
| + try {
|
| + onOpenFile(_File._openStdioSync(0));
|
| + } catch (e, s) {
|
| + openFailed(e, s);
|
| + }
|
| }
|
| - // Resume reading unless we are already done.
|
| - if (_openedFile != null) _readBlock();
|
| }
|
| }
|
|
|
|
|